Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This shows an example of setting up a Kafka broker on a debian system (preferably running DANOS) which can handle messages send by the CGNAT kafka protobuf logging. The consumer of the messages is a python script which prints the topic, key, and decodes the protobuf data.

First you need to install Kafka on the debian system which is acting as the Kafka broker. The following is information on doing that based on https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-debian-9 - with versions updated.

Code Block
# install java
sudo apt install default-jre
 
# check the version
java -version
 
# create a user for kafka
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
 
# switch to the user
su -l kafka
 
# download kafka tgz
mkdir ~/Downloads
curl "https://www.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz" -o ~/Downloads/kafka.tgz
 
# create directory to extract to
mkdir ~/kafka && cd ~/kafka
 
# extract the files
tar -xvzf ~/Downloads/kafka.tgz --strip 1
 
# configure to allow topic deletion
echo "" >> ~/kafka/config/server.properties
echo "delete.topic.enable = true" >> ~/kafka/config/server.properties
 
# create zookeeper service
sudo bash
 
cat <<EOF >/etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
 
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
 
[Install]
WantedBy=multi-user.target
 
EOF
 
# create kafka service file
 
cat <<EOF >/etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
 
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
 
[Install]
WantedBy=multi-user.target
EOF
 
systemctl start kafka
 
# check service started
journalctl -u kafka
 
# enable on boot
systemctl enable kafka
 
# exit from sudo bash

The following does some stand-alone testing of the Kafka installation, to use which should be run to confirm Kafka is working before trying to use with the dataplaneread log packets sent by DANOS CGNAT Kafka messages:

Code Block
# start kafka monitoring a topic
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

# publish to the topic
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

# read the info from kafka
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

Now to do something similar to the above tests, but for data logs from CGNAT. First, you will need to:

...

Your /etc/hosts file must have an entry for the Kafka bootstrap server, as Kafka seems to check the name and ip IP address match.

For the consumer, we could use "kafka-console-consumer.sh", as follows, but that will not decode the data which is in protobuf format:

...

So instead we will do it using a python Kafka consumer, which will include make use of the protobuf file used by CGNAT. This comes from the dataplane, and so need a version of the dataplane built with the same version of Debian that the vRouter has so that the files that the python files depend on are appropriate. If your workstation already has the correct ones (e.g. has the same version of debian as the vRouter), you can skip the lines below, and jump to the steps below on installing the protobuf package.

You need libvyatta-dataplane-proto-support, which comes from the vyatta-dataplane package, compiled for the Debian where you are running kafka-python.

Now do the following steps, to install the libvyatta-dataplane-proto-support package and packages it needs to be able to run the python script.

...

. If your Kafka broker is also a DANOS system, then there are protobuf libraries installed which can be used. These are the files installed by the package “libvyatta-dataplane-proto-support”. If you are using a different system, when you should copy file /usr/share/vyatta-dataplane/protobuf/CgnatLogging.proto, and use the “protoc” package to create python libraries that can decode the CGNAT log messages. You must also install generic python protobuf package “python3-protobuf” and the package with a python interface to Kafka “kafka-python”. For example:

Code Block
sudo apt-get install python3-protobuf
    sudo pip3 install kafka-python

Now run the script consumer.py (shown below) with the appropriate topics passed in as parameters. Note that it contacts the Kafka broker on the local system, so that will need changed if running on a different system:

Code Block
!/usr/bin/env python3
# consumer.py

import sys
from kafka import KafkaConsumer
from vyatta.proto import CgnatLogging_pb2 as pb

if len(sys.argv) <= 1:
    print('Error: at least one topic is needed', file=sys.stderr)
    sys.exit(2)

params = sys.argv[1:]

cgnat_log = pb.CgnatLog()

consumer = KafkaConsumer(*params,
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group')

for message in consumer:
    print("Topic: {}".format(message.topic))
    print("Key: {}".format(message.key.decode('UTF-8')))
    print("")
    cgnat_log.ParseFromString(message.value)
    print(cgnat_log)
    print("---------")

...

Cause CGNAT log messages to be sent, and then you should see something like the following, which shows decoding of a "session startcreate" protobuf messages:and “subscriber start” protobuf message.

Code Block
Topic: cgnat-session
Key: vm-cgn-1

sessionLog {
  cgnInstance: "vm-cgn-1"
  eventType: EVENT_SESSION_CREATE
  sessionId: 2
  subSessionId: 1
  ifName: "dp0p1s2"
  protocol: 6
  direction: DIRECTION_OUT
  subscriberAddress: 168427779
  subscriberPort: 500
  natAllocatedAddress: 168428531
  natAllocatedPort: 1024
  destinationAddress: 167772160
  destinationPort: 80
  startTimestamp {
    seconds: 1569246812343
    nanos: 924000000
  }
  state: SESSION_OPENING
  stateHistory: 0
}
 
---------
Topic: cgnat-subscriber
Key: vm-cgn-1
 
subscriberLog {
  cgnInstance: "vm-cgn-1"
  eventType: EVENT_SUBSCRIBER_START
  subscriberAddress: 168427779
  startTimestamp {
    seconds: 1567679116
    nanos: 645000000
  }
}