This shows an example of setting up a Kafka broker on a debian system (preferably running DANOS) which can handle messages send by the CGNAT kafka protobuf logging. The consumer of the messages is a python script which prints the topic, key, and decodes the protobuf data.
First you need to install Kafka on the debian system which is acting as the Kafka broker. The following is information on doing that based on https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-debian-9 - with versions updated.
Code Block |
---|
# install java
sudo apt install default-jre
# check the version
java -version
# create a user for kafka
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
# switch to the user
su -l kafka
# download kafka tgz
mkdir ~/Downloads
curl "https://www.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz" -o ~/Downloads/kafka.tgz
# create directory to extract to
mkdir ~/kafka && cd ~/kafka
# extract the files
tar -xvzf ~/Downloads/kafka.tgz --strip 1
# configure to allow topic deletion
echo "" >> ~/kafka/config/server.properties
echo "delete.topic.enable = true" >> ~/kafka/config/server.properties
# create zookeeper service
sudo bash
cat <<EOF >/etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
EOF
# create kafka service file
cat <<EOF >/etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
EOF
systemctl start kafka
# check service started
journalctl -u kafka
# enable on boot
systemctl enable kafka
# exit from sudo bash |
The following does some stand-alone testing of the Kafka installation, to use which should be run to confirm Kafka is working before trying to use with the dataplaneread log packets sent by DANOS CGNAT Kafka messages:
Code Block |
---|
# start kafka monitoring a topic ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic # publish to the topic echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null # read the info from kafka ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning |
Now to do something similar to the above tests, but for data logs from CGNAT. First, you will need to:
...
Your /etc/hosts file must have an entry for the Kafka bootstrap server, as Kafka seems to check the name and ip IP address match.
For the consumer, we could use "kafka-console-consumer.sh", as follows, but that will not decode the data which is in protobuf format:
...
So instead we will do it using a python Kafka consumer, which will include make use of the protobuf file used by CGNAT. This comes from the dataplane, and so need a version of the dataplane built with the same version of Debian that the vRouter has so that the files that the python files depend on are appropriate. If your workstation already has the correct ones (e.g. has the same version of debian as the vRouter), you can skip the lines below, and jump to the steps below on installing the protobuf package.
You need libvyatta-dataplane-proto-support, which comes from the vyatta-dataplane package, compiled for the Debian where you are running kafka-python.
Now do the following steps, to install the libvyatta-dataplane-proto-support package and packages it needs to be able to run the python script.
...
. If your Kafka broker is also a DANOS system, then there are protobuf libraries installed which can be used. These are the files installed by the package “libvyatta-dataplane-proto-support”. If you are using a different system, when you should copy file /usr/share/vyatta-dataplane/protobuf/CgnatLogging.proto, and use the “protoc” package to create python libraries that can decode the CGNAT log messages. You must also install generic python protobuf package “python3-protobuf” and the package with a python interface to Kafka “kafka-python”. For example:
Code Block |
---|
sudo apt-get install python3-protobuf
sudo pip3 install kafka-python |
Now run the script consumer.py (shown below) with the appropriate topics passed in as parameters. Note that it contacts the Kafka broker on the local system, so that will need changed if running on a different system:
Code Block |
---|
!/usr/bin/env python3 # consumer.py import sys from kafka import KafkaConsumer from vyatta.proto import CgnatLogging_pb2 as pb if len(sys.argv) <= 1: print('Error: at least one topic is needed', file=sys.stderr) sys.exit(2) params = sys.argv[1:] cgnat_log = pb.CgnatLog() consumer = KafkaConsumer(*params, bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group') for message in consumer: print("Topic: {}".format(message.topic)) print("Key: {}".format(message.key.decode('UTF-8'))) print("") cgnat_log.ParseFromString(message.value) print(cgnat_log) print("---------") |
...
Cause CGNAT log messages to be sent, and then you should see something like the following, which shows decoding of a "session startcreate" protobuf messages:and “subscriber start” protobuf message.
Code Block |
---|
Topic: cgnat-session Key: vm-cgn-1 sessionLog { cgnInstance: "vm-cgn-1" eventType: EVENT_SESSION_CREATE sessionId: 2 subSessionId: 1 ifName: "dp0p1s2" protocol: 6 direction: DIRECTION_OUT subscriberAddress: 168427779 subscriberPort: 500 natAllocatedAddress: 168428531 natAllocatedPort: 1024 destinationAddress: 167772160 destinationPort: 80 startTimestamp { seconds: 1569246812343 nanos: 924000000 } state: SESSION_OPENING stateHistory: 0 } --------- Topic: cgnat-subscriber Key: vm-cgn-1 subscriberLog { cgnInstance: "vm-cgn-1" eventType: EVENT_SUBSCRIBER_START subscriberAddress: 168427779 startTimestamp { seconds: 1567679116 nanos: 645000000 } } |