Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

The following does some stand-alone testing of the Kafka, to use before trying to use with the dataplane:

# start kafka monitoring a topic
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

# publish to the topic
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

# read the info from kafka
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

Now to do something similar to the above tests, but for data from CGNAT. First, you will need to:

a) configure CGNAT so it produces the Kafka Protobuf Logs and

b) do an action (e.g. create CGNAT sessions) in order to cause the logs to be produced.

Start handling topics that you have configured CGNAT to send on - note these commands are all executed under login "kafka":

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-session
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-subscriber
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-port-block-allocation
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cgnat-res-constraint 

Your /etc/hosts file must have an entry for the Kafka bootstrap server, as Kafka seems to check the name and ip address match.

For the consumer, we could use "kafka-console-consumer.sh", as follows, but that will not decode the data which is in protobuf format:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cgnat-session --property print.key=true --from-beginning

So instead we will do it using a python Kafka consumer, which will include the protobuf file used by CGNAT. This comes from the dataplane, and so need a version of the dataplane built with the same version of Debian that the vRouter has so that the files that the python files depend on are appropriate. If your workstation already has the correct ones (e.g. has the same version of debian as the vRouter), you can skip the lines below, and jump to the steps below on installing the protobuf package.

You need libvyatta-dataplane-proto-support, which comes from the vyatta-dataplane package, compiled for the Debian where you are running kafka-python.

Now do the following steps, to install the libvyatta-dataplane-proto-support package and packages it needs to be able to run the python script.

Remove any old dataplane protobuf package
    sudo dpkg --remove libvyatta-dataplane-proto-support
    rm ~/Downloads/libvyatta-dataplane-proto-support_*_all.deb

Copy/download the libvyatta-dataplane-proto-support deb file (e.g. libvyatta-dataplane-proto-support_3.8.23+2.1_all.deb) from your build environment.

Install the new one:
    sudo dpkg -i ~/Downloads/libvyatta-dataplane-proto-support_*_all.deb

Install required packages:
    sudo apt-get install python3-protobuf
    sudo pip3 install kafka-python

Now run the script consumer.py (shown below) with the appropriate topics passed in as parameters. Note that it contacts the Kafka broker on the local system:

!/usr/bin/env python3
# consumer.py

import sys
from kafka import KafkaConsumer
from vyatta.proto import CgnatLogging_pb2 as pb

if len(sys.argv) <= 1:
    print('Error: at least one topic is needed', file=sys.stderr)
    sys.exit(2)

params = sys.argv[1:]

cgnat_log = pb.CgnatLog()

consumer = KafkaConsumer(*params,
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group')

for message in consumer:
    print("Topic: {}".format(message.topic))
    print("Key: {}".format(message.key.decode('UTF-8')))
    print("")
    cgnat_log.ParseFromString(message.value)
    print(cgnat_log)
    print("---------")
./consumer.py cgnat-session cgnat-subscriber cgnat-port-block-allocation cgnat-res-constraint

Cause CGNAT log messages to be sent, and see something like the following, which shows decoding of "session start" protobuf messages:

Topic: cgnat-session
Key: vm-cgn-1

sessionLog {
  cgnInstance: "vm-cgn-1"
  eventType: EVENT_SESSION_CREATE
  sessionId: 2
  subSessionId: 1
  ifName: "dp0p1s2"
  protocol: 6
  direction: DIRECTION_OUT
  subscriberAddress: 168427779
  subscriberPort: 500
  natAllocatedAddress: 168428531
  natAllocatedPort: 1024
  destinationAddress: 167772160
  destinationPort: 80
  startTimestamp {
    seconds: 1569246812343
    nanos: 924000000
  }
  state: SESSION_OPENING
  stateHistory: 0
}
 
---------
Topic: cgnat-subscriber
Key: vm-cgn-1
 
subscriberLog {
  cgnInstance: "vm-cgn-1"
  eventType: EVENT_SUBSCRIBER_START
  subscriberAddress: 168427779
  startTimestamp {
    seconds: 1567679116
    nanos: 645000000
  }
}
  • No labels