CGNAT and Kafka logging

This shows an example of setting up a Kafka broker on a debian system (preferably running DANOS) which can handle messages send by the CGNAT kafka protobuf logging. The consumer of the messages is a python script which prints the topic, key, and decodes the protobuf data.

First you need to install Kafka on the debian system which is acting as the Kafka broker. The following is information on doing that based on https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-debian-9 - with versions updated.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 # install java sudo apt install default-jre # check the version java -version # create a user for kafka sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo # switch to the user su -l kafka # download kafka tgz mkdir ~/Downloads curl "https://www.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz" -o ~/Downloads/kafka.tgz # create directory to extract to mkdir ~/kafka && cd ~/kafka # extract the files tar -xvzf ~/Downloads/kafka.tgz --strip 1 # configure to allow topic deletion echo "" >> ~/kafka/config/server.properties echo "delete.topic.enable = true" >> ~/kafka/config/server.properties # create zookeeper service sudo bash cat <<EOF >/etc/systemd/system/zookeeper.service [Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target EOF # create kafka service file cat <<EOF >/etc/systemd/system/kafka.service [Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target EOF systemctl start kafka # check service started journalctl -u kafka # enable on boot systemctl enable kafka # exit from sudo bash

The following does some stand-alone testing of the Kafka installation, which should be run to confirm Kafka is working before trying to read log packets sent by DANOS CGNAT Kafka messages:

1 2 3 4 5 6 7 8 # start kafka monitoring a topic ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic # publish to the topic echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null # read the info from kafka ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

Now to do something similar to the above tests, but for logs from CGNAT. First, you will need to:

a) configure CGNAT so it produces the Kafka Protobuf Logs and

b) do an action (e.g. create CGNAT sessions) in order to cause the logs to be produced.

Start handling topics that you have configured CGNAT to send on - note these commands are all executed under login "kafka":

1 2 3 4 ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-session ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-subscriber ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-port-block-allocation ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-res-constraint 

Your /etc/hosts file must have an entry for the Kafka bootstrap server, as Kafka seems to check the name and IP address match.

For the consumer, we could use "kafka-console-consumer.sh", as follows, but that will not decode the data which is in protobuf format:

1 ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cgnat-session --property print.key=true --from-beginning

So instead we will do it using a python Kafka consumer, which will make use of the protobuf file used by CGNAT. If your Kafka broker is also a DANOS system, then there are protobuf libraries installed which can be used. These are the files installed by the package “libvyatta-dataplane-proto-support”. If you are using a different system, when you should copy file /usr/share/vyatta-dataplane/protobuf/CgnatLogging.proto, and use the “protoc” package to create python libraries that can decode the CGNAT log messages. You must also install generic python protobuf package “python3-protobuf” and the package with a python interface to Kafka “kafka-python”. For example:

1 2 sudo apt-get install python3-protobuf     sudo pip3 install kafka-python

Now run the script consumer.py (shown below) with the appropriate topics passed in as parameters. Note that it contacts the Kafka broker on the local system, so that will need changed if running on a different system:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 !/usr/bin/env python3 # consumer.py import sys from kafka import KafkaConsumer from vyatta.proto import CgnatLogging_pb2 as pb if len(sys.argv) <= 1: print('Error: at least one topic is needed', file=sys.stderr) sys.exit(2) params = sys.argv[1:] cgnat_log = pb.CgnatLog() consumer = KafkaConsumer(*params, bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group') for message in consumer: print("Topic: {}".format(message.topic)) print("Key: {}".format(message.key.decode('UTF-8'))) print("") cgnat_log.ParseFromString(message.value) print(cgnat_log) print("---------")
1 ./consumer.py cgnat-session cgnat-subscriber cgnat-port-block-allocation cgnat-res-constraint

Cause CGNAT log messages to be sent, and then you should see something like the following, which shows decoding of a "session create" and “subscriber start” protobuf message.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Topic: cgnat-session Key: vm-cgn-1 sessionLog {   cgnInstance: "vm-cgn-1"   eventType: EVENT_SESSION_CREATE   sessionId: 2   subSessionId: 1   ifName: "dp0p1s2"   protocol: 6   direction: DIRECTION_OUT   subscriberAddress: 168427779   subscriberPort: 500   natAllocatedAddress: 168428531   natAllocatedPort: 1024   destinationAddress: 167772160   destinationPort: 80   startTimestamp {     seconds: 1569246812343     nanos: 924000000   }   state: SESSION_OPENING   stateHistory: 0 }   --------- Topic: cgnat-subscriber Key: vm-cgn-1   subscriberLog {   cgnInstance: "vm-cgn-1"   eventType: EVENT_SUBSCRIBER_START   subscriberAddress: 168427779   startTimestamp {     seconds: 1567679116     nanos: 645000000   } }