This post outlines how to send JSON data from a client (browser or curl) through a NodeJS server and ultimately into a Kafka topic. This architecture is useful for ingesting data streams into a robust and scalable messaging system for further processing.

What we are trying to achieve?

  1. Send JSON data from a browser or using curl to a NodeJS server.
  2. The NodeJS server will receive the JSON data and redirect it to a Kafka topic.
  3. Further processing can then be performed on the data within the Kafka ecosystem (e.g., using Kafka Streams, Spark Streaming).
  4. Finally, verify the JSON data arrives correctly in Kafka using the kafka-console-consumer.sh script.

Step 1: Create json_nodejs_kafka.js

Create a file named json_nodejs_kafka.js with the following NodeJS code. This script sets up a simple HTTP server that listens for incoming JSON data and forwards it to Kafka.

/*
    Getting some 'http' power
*/
var http=require('http');

/*
    Setting where we are expecting the request to arrive.
    http://localhost:8125/upload

*/
var request = {
        hostname: 'localhost',
        port: 8125,
        path: '/upload',
        method: 'GET'
};

/*
    Lets create a server to wait for request.
*/
http.createServer(function(request, response)
{
    /*
        Making sure we are waiting for a JSON.
    */
    response.writeHeader(200, {"Content-Type": "application/json"});

    /*
        request.on waiting for data to arrive.
    */
    request.on('data', function (chunk)
    {

        /* 
            CHUNK which we recive from the clients
            For out request we are assuming its going to be a JSON data.
            We print it here on the console. 
        */
        console.log(chunk.toString('utf8'))

        /* 
            Using kafka-node - really nice library
            create a producer and connect to a Zookeeper to send the payloads.
        */
        var kafka = require('kafka-node'),
        Producer = kafka.Producer,
        client = new kafka.Client('kafka:2181'),
        producer = new Producer(client);

        /*
            Creating a payload, which takes below information
            'topic'  --> this is the topic we have created in kafka.
            'messages'  --> data which needs to be sent to kafka. (JSON in our case)
            'partition' --> which partition should we send the request to.
                            If there are multiple partition, then we optimize the code here,
                            so that we send request to different partitions. 

        */
            payloads = [
            { topic: 'test', messages: chunk.toString('utf8'), partition: 0 },
        ];

        /*
            producer 'on' ready to send payload to kafka.
        */
        producer.on('ready', function(){
            producer.send(payloads, function(err, data){
                    console.log(data)
            });
        });

        /*
            if we have some error.
        */
        producer.on('error', function(err){})

    });
    /*
        end of request
    */
    response.end();

/*
    Listen on port 8125
*/  
}).listen(8125);

Explanation:

  • require('http'): Imports the built-in NodeJS HTTP module.
  • http.createServer(...): Creates an HTTP server that listens for incoming requests.
  • response.writeHeader(200, {"Content-Type": "application/json"}): Sets the response header to indicate that the server expects and will handle JSON data.
  • request.on('data', function (chunk) { ... }): This event listener is triggered when data is received in the request. The data is received in chunks.
  • chunk.toString('utf8'): Converts the received data chunk (which is typically a buffer) to a UTF-8 string. This is essential for handling the JSON data.
  • kafka = require('kafka-node'): Imports the kafka-node library. Make sure you have installed it using npm install kafka-node.
  • client = new kafka.Client('kafka:2181'): Creates a Kafka client, connecting to the Zookeeper instance at kafka:2181. Important: Replace kafka:2181 with the actual address of your Zookeeper instance. Using kafka assumes you have configured your environment (e.g., using Docker Compose) so that kafka resolves to the correct IP address.
  • producer = new Producer(client): Creates a Kafka producer, which will send messages to Kafka.
  • payloads = [{ topic: 'test', messages: chunk.toString('utf8'), partition: 0 }]: Defines the payload to be sent to Kafka.
    • topic: The Kafka topic to which the message will be sent (in this case, test). You’ll need to create this topic in Kafka.
    • messages: The actual message content, which is the JSON data received from the client.
    • partition: The partition within the topic to which the message should be sent. Setting it to 0 sends all messages to the first partition. For production systems, you’ll want to implement a partitioning strategy for better throughput.
  • producer.on('ready', function() { ... }): This event listener is triggered when the producer is ready to send messages. The producer.send(payloads, ...) function actually sends the data to Kafka.
  • producer.on('error', function(err) { ... }): Handles any errors that occur during message production. It’s good practice to log these errors.
  • .listen(8125): Starts the HTTP server and listens for incoming requests on port 8125.

Step 2: Start the NodeJS Script

Open a terminal and navigate to the directory where you saved json_nodejs_kafka.js. Then, start the script using:

[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js

This will start the NodeJS server, and it will begin listening for incoming requests on port 8125.

Step 3: Send JSON Data using curl

Open another terminal and use the curl command to send a JSON payload to the NodeJS server:

[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
                               -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload

Explanation:

  • -H "Content-Type: application/json": Sets the Content-Type header to application/json, indicating that the request body contains JSON data. This is crucial so the server knows how to interpret the data.
  • -d '{"username":"xyz","password":"xyz"}': Specifies the JSON data to be sent in the request body.
  • http://localhost:8125/upload: The URL of the NodeJS server endpoint to which the request is sent.

Step 4: Verify Output on the NodeJS Console

In the terminal where you started the NodeJS script, you should see output similar to this:

[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
{"username":"xyz","password":"xyz"}
{ test: { '0': 29 } }
  • {"username":"xyz","password":"xyz"}: This is the JSON data received from the curl command. The console.log(chunk.toString('utf8')) line in the NodeJS script prints this.
  • { test: { '0': 29 } }: This is the response from the Kafka cluster, indicating that it has successfully received the JSON data. The '0' represents the partition and 29 is an offset.

Step 5: Verify Output on the Kafka Consumer Side

Open a new terminal on the Kafka server and use the kafka-console-consumer.sh script to consume messages from the test topic:

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
                                            --zookeeper localhost:2181 --topic test --from-beginning
{"username":"xyz","password":"xyz"}

Explanation:

  • bin/kafka-console-consumer.sh: The path to the Kafka console consumer script.
  • --zookeeper localhost:2181: Specifies the Zookeeper instance to connect to. Important: Replace localhost:2181 with the actual address of your Zookeeper instance.
  • --topic test: Specifies the Kafka topic to consume messages from.
  • --from-beginning: Starts consuming messages from the beginning of the topic.

You should see the JSON data you sent from the curl command:

{"username":"xyz","password":"xyz"}

This confirms that the JSON data was successfully sent from the browser/curl to the NodeJS server and then to the Kafka topic.

Important Considerations:

  • Error Handling: The provided NodeJS script has minimal error handling. In a production environment, you should add robust error handling to catch exceptions and log errors. This is particularly important for Kafka producer errors.
  • Kafka Configuration: Ensure your Kafka broker and Zookeeper instances are properly configured and accessible. The addresses used in the NodeJS script (kafka:2181 and localhost:2181) must be correct.
  • Dependencies: Make sure you have the kafka-node package installed. Run npm install kafka-node in your NodeJS project directory.
  • Asynchronous Nature: The Kafka producer operations are asynchronous. You should handle the asynchronous nature of these operations correctly to avoid potential issues with message delivery.
  • Partitioning: For production systems, carefully consider your partitioning strategy to distribute messages across multiple partitions for increased throughput and scalability. The current example uses a single partition.
  • Kafka Version: Ensure the kafka-node library is compatible with your Kafka version.
  • Security: For production systems, secure your Kafka cluster and NodeJS application.

This detailed guide provides a solid foundation for sending JSON data from a client to Kafka via NodeJS. Remember to adapt the code and configurations to your specific environment and requirements.