The following does some stand-alone testing of the Kafka, to use before trying to use with the dataplane:
# start kafka monitoring a topic ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic # publish to the topic echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null # read the info from kafka ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
Now to do something similar to the above tests, but for data from CGNAT. First, you will need to:
a) configure CGNAT so it produces the Kafka Protobuf Logs and
b) do an action (e.g. create CGNAT sessions) in order to cause the logs to be produced.
Start handling topics that you have configured CGNAT to send on - note these commands are all executed under login "kafka":
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-session ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-subscriber ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-port-block-allocation ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-res-constraint
Your /etc/hosts file must have an entry for the Kafka bootstrap server, as Kafka seems to check the name and ip address match.
For the consumer, we could use "kafka-console-consumer.sh", as follows, but that will not decode the data which is in protobuf format:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cgnat-session --property print.key=true --from-beginning
So instead we will do it using a python Kafka consumer, which will include the protobuf file used by CGNAT. This comes from the dataplane, and so need a version of the dataplane built with the same version of Debian that the vRouter has so that the files that the python files depend on are appropriate. If your workstation already has the correct ones (e.g. has the same version of debian as the vRouter), you can skip the lines below, and jump to the steps below on installing the protobuf package.
You need libvyatta-dataplane-proto-support, which comes from the vyatta-dataplane package, compiled for the Debian where you are running kafka-python.
Now do the following steps, to install the libvyatta-dataplane-proto-support package and packages it needs to be able to run the python script.
Remove any old dataplane protobuf package sudo dpkg --remove libvyatta-dataplane-proto-support rm ~/Downloads/libvyatta-dataplane-proto-support_*_all.deb Copy/download the libvyatta-dataplane-proto-support deb file (e.g. libvyatta-dataplane-proto-support_3.8.23+2.1_all.deb) from your build environment. Install the new one: sudo dpkg -i ~/Downloads/libvyatta-dataplane-proto-support_*_all.deb Install required packages: sudo apt-get install python3-protobuf sudo pip3 install kafka-python
Now run the script consumer.py (shown below) with the appropriate topics passed in as parameters. Note that it contacts the Kafka broker on the local system:
!/usr/bin/env python3 # consumer.py import sys from kafka import KafkaConsumer from vyatta.proto import CgnatLogging_pb2 as pb if len(sys.argv) <= 1: print('Error: at least one topic is needed', file=sys.stderr) sys.exit(2) params = sys.argv[1:] cgnat_log = pb.CgnatLog() consumer = KafkaConsumer(*params, bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group') for message in consumer: print("Topic: {}".format(message.topic)) print("Key: {}".format(message.key.decode('UTF-8'))) print("") cgnat_log.ParseFromString(message.value) print(cgnat_log) print("---------")
./consumer.py cgnat-session cgnat-subscriber cgnat-port-block-allocation cgnat-res-constraint
Cause CGNAT log messages to be sent, and see something like the following, which shows decoding of "session start" protobuf messages:
Topic: cgnat-session Key: vm-cgn-1 sessionLog { cgnInstance: "vm-cgn-1" eventType: EVENT_SESSION_CREATE sessionId: 2 subSessionId: 1 ifName: "dp0p1s2" protocol: 6 direction: DIRECTION_OUT subscriberAddress: 168427779 subscriberPort: 500 natAllocatedAddress: 168428531 natAllocatedPort: 1024 destinationAddress: 167772160 destinationPort: 80 startTimestamp { seconds: 1569246812343 nanos: 924000000 } state: SESSION_OPENING stateHistory: 0 } --------- Topic: cgnat-subscriber Key: vm-cgn-1 subscriberLog { cgnInstance: "vm-cgn-1" eventType: EVENT_SUBSCRIBER_START subscriberAddress: 168427779 startTimestamp { seconds: 1567679116 nanos: 645000000 } }