This blog post provides a comprehensive, step-by-step guide to integrating NodeJS with Kafka. Whether you’re building real-time data pipelines, implementing microservices architecture, or simply exploring the power of distributed messaging, this guide will equip you with the knowledge and practical instructions to get started. We’ll cover everything from setting up a single-node Kafka cluster to sending and receiving JSON data between your NodeJS application and Kafka topics.

Installing KAFKA Single Node - Quick Start

Before diving into the NodeJS integration, we need a running Kafka instance. This section will guide you through setting up a single-node Kafka cluster for development and testing.

Download and Extract

First, download the Kafka distribution and extract it to your desired location. This example assumes you’re working on a Linux-based system.

[kafka-admin@kafka Downloads]$ ls
jdk-7u75-linux-x64.rpm  kafka_2.9.2-0.8.2.0.tgz
[kafka-admin@kafka Downloads]$ sudo rpm -ivh jdk-7u75-linux-x64.rpm
...
[kafka-admin@kafka Downloads]$ sudo tar -xzf kafka_2.9.2-0.8.2.0.tgz -C /opt
[kafka-admin@kafka opt]$ cd /opt
[kafka-admin@kafka opt]$ sudo ln -s kafka_2.9.2-0.8.2.0 kafka
[kafka-admin@kafka opt]$ ls
kafka  kafka_2.9.2-0.8.2.0
[kafka-admin@kafka opt]$ sudo chmod kafka-admin:kafka-admin -R kafka

Explanation:

  • We first list the contents of the Downloads directory, showing the JDK and Kafka TGZ files.
  • Then, we install the JDK using rpm. Kafka requires Java to run. You may need to adapt this step depending on your system’s Java setup.
  • Next, we extract the Kafka TGZ archive to the /opt directory.
  • A symbolic link kafka is created for easier access to the Kafka directory.
  • Finally, we change the ownership of the Kafka directory to the kafka-admin user. Adjust this based on your user setup.

Now we are ready to start all the services required.

Kafka depends on Zookeeper. So we have to start that first.

[kafka-admin@kafka opt]$ cd kafka
[kafka-admin@kafka kafka]$ ls
bin  config  libs  LICENSE  logs  NOTICE
[kafka-admin@kafka kafka]$ bin/zookeeper-server-start.sh config/zookeeper.properties

This command starts Zookeeper, Kafka’s dependency. By default, it will run on localhost:2181. You can modify this in the config/zookeeper.properties file.

Important: If you plan to run Zookeeper on a separate machine, update the zookeeper.connect property in the config/server.properties file of your Kafka brokers to point to the correct Zookeeper address.

Next we start server.

[kafka-admin@kafka kafka]$ bin/kafka-server-start.sh config/server.properties

This starts the Kafka server using the default configuration.

Running Multiple Brokers:

To simulate a distributed Kafka cluster, you can run multiple Kafka brokers on a single machine. To do so, create multiple copies of the server.properties file (e.g., server-1.properties, server-2.properties), and modify the following properties in each file:

  1. broker.id: A unique integer identifier for each broker.
  2. port: The port on which the broker will listen for connections.
  3. log.dir: The directory where the broker will store its logs.

Example configurations:

config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2

Then, start each broker with its respective configuration file:

bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties

Creating Topics

Kafka organizes messages into topics. Before you can send or receive messages, you need to create a topic.

[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
                                                --replication-factor 1 --partitions 1 --topic test

This command creates a topic named “test” with a single partition and a replication factor of 1.

  • --create: Specifies that we want to create a topic.
  • --zookeeper localhost:2181: Specifies the Zookeeper connection string.
  • --replication-factor 1: Sets the replication factor to 1, meaning each message will be stored on one broker. For production environments, use a higher replication factor for fault tolerance.
  • --partitions 1: Sets the number of partitions to 1. Partitions allow for parallel processing of messages.
  • --topic test: Specifies the name of the topic to create.

To list existing topics, use the following command:

[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
[kafka-admin@kafka kafka]$

Send some message

Now that we have a Kafka cluster and a topic, let’s send some messages. Open a new terminal and use the Kafka console producer:

[kafka-admin@kafka kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is a message2

Each line you type will be sent as a separate message to the “test” topic. Press Enter after each message.

Start a Consumer

To receive the messages, open another new terminal and start the Kafka console consumer:

[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
                                                                        --topic test --from-beginning
This is a message
This is a message2

The --from-beginning option ensures that the consumer starts reading messages from the beginning of the topic, even if it has already consumed messages before. You should see the messages you sent using the producer.

Installing NodeJS on Centos 6.6

Now, let’s install NodeJS and npm, which are required for running our NodeJS application that will interact with Kafka.

Installing nodejs and npm on centos is very simple.

[nginx-admin@nginx ~]$ sudo su
[nginx-admin@nginx ~]# curl -sL https://rpm.nodesource.com/setup | bash -
[nginx-admin@nginx ~]# yum install -y nodejs

These commands install NodeJS and npm using the NodeSource repository. This is a common and reliable way to install NodeJS on CentOS.

Installing gcc-c++ and make.

These tools are needed to compile native modules, which some NodeJS packages may require.

[nginx-admin@nginx ~]$ sudo yum install gcc-c++ make
[sudo] password for nginx-admin:
Loaded plugins: fastestmirror, refresh-packagekit, security
Setting up Install Process
Loading mirror speeds from cached hostfile
 * base: mirrors.123host.vn
 * epel: ftp.cuhk.edu.hk
 * extras: centos-hn.viettelidc.com.vn
 * updates: mirrors.vonline.vn
Package 1:make-3.81-20.el6.x86_64 already installed and latest version
Resolving Dependencies
...

Complete!

Later on we will need kafka-node lets install that as well.

The kafka-node package is a popular NodeJS client for interacting with Kafka.

[nginx-admin@nginx ~]$ sudo npm install kafka-node
[sudo] password for nginx-admin:

> snappy@3.0.6 install /home/nginx-admin/node_modules/kafka-node/node_modules/snappy
> node-gyp rebuild

gyp WARN EACCES user "root" does not have permission to access the dev dir "/root/.node-gyp/0.10.36"
gyp WARN EACCES attempting to reinstall using temporary dev dir
                            "/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/.node-gyp"
make: Entering directory `/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/build'
  CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-sinksource.o
  CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-stubs-internal.o
  CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy.o
  AR(target) Release/obj.target/deps/snappy/snappy.a
  COPY Release/snappy.a
  CXX(target) Release/obj.target/binding/src/binding.o
  SOLINK_MODULE(target) Release/obj.target/binding.node
  SOLINK_MODULE(target) Release/obj.target/binding.node: Finished
  COPY Release/binding.node
make: Leaving directory `/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/build'
kafka-node@0.2.18 node_modules/kafka-node
├── buffer-crc32@0.2.5
├── retry@0.6.1
├── node-uuid@1.4.1
├── async@0.7.0
├── lodash@2.2.1
├── debug@2.1.1 (ms@0.6.2)
├── binary@0.3.0 (buffers@0.1.1, chainsaw@0.1.0)
├── node-zookeeper-client@0.2.0 (async@0.2.10, underscore@1.4.4)
├── buffermaker@1.2.0 (long@1.1.2)
└── snappy@3.0.6 (bindings@1.1.1, nan@1.5.3)
[nginx-admin@nginx ~]$ ls

Lets do a test.

To verify that NodeJS is installed correctly, let’s create a simple HTTP server:

Create a script called example.js with the following code:

var http = require('http');
http.createServer(function (req, res) {
  res.writeHead(200, {'Content-Type': 'text/plain'});
  res.end('Hello World\n');
}).listen(1337, '127.0.0.1');
console.log('Server running at http://127.0.0.1:1337/');

Start the server on a terminal:

[nginx-admin@nginx nodejs]$ node example.js
Server running at http://127.0.0.1:1337/

Open a web browser and navigate to http://127.0.0.1:1337/. You should see “Hello World” displayed.

Lets make some simple changes to exsisting script to handle JSON.

Let’s modify our NodeJS server to handle JSON data.

//  Getting some 'http' power
var http=require('http');

//  Setting where we are expecting the request to arrive.
//  http://localhost:8125/upload
var request = {
                    hostname: 'localhost',
                    port: 8125,
                    path: '/upload',
                    method: 'GET'
                };

//  Lets create a server to wait for request.
http.createServer(function(request, response)
{
    //  Making sure we are waiting for a JSON.
    response.writeHeader(200, {"Content-Type": "application/json"});

    //  request.on waiting for data to arrive.
    request.on('data', function (chunk)
    {
        //  CHUNK which we recive from the clients
        //  For out request we are assuming its going to be a JSON data.
        //  We print it here on the console.
        console.log(chunk.toString('utf8'))
    });
    //end of request
    response.end();
//  Listen on port 8125
}).listen(8125);

Save this code as node_recv_json.js and start the server:

[nginx-admin@nginx nodejs]$ node node_recv_json.js

In a new terminal, send a JSON payload to the server using curl:

[nginx-admin@nginx nodejs]$ curl -H "Content-Type: application/json" \
                               -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload

You should see the JSON data printed on the server’s console:

[nginx-admin@nginx nodejs]$ node node_recv_json.js
{"username":"xyz","password":"xyz"}

NodeJS Kafka Producer - Using kafka-node

Now that we have Kafka and NodeJS set up, let’s build a NodeJS application that sends data to Kafka.

The following code demonstrates a basic Kafka producer using the kafka-node library.

Step 1 - Copy the below script in a file called producer_nodejs.js.

/*
    Basic producer to send data to kafka from nodejs.
    More Information Here : https://www.npmjs.com/package/kafka-node
*/

//  Using kafka-node - really nice library
//  create a producer and connect to a Zookeeper to send the payloads.
var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    client = new kafka.Client('kafka:2181'),
    producer = new Producer(client);

    /*
        Creating a payload, which takes below information
        'topic'     -->    this is the topic we have created in kafka. (test)
        'messages'     -->    data which needs to be sent to kafka. (JSON in our case)
        'partition' -->    which partition should we send the request to. (default)

                            example command to create a topic in kafka:
                            [kafka@kafka kafka]$ bin/kafka-topics.sh \
                                        --create --zookeeper localhost:2181 \
                                        --replication-factor 1 \
                                        --partitions 1 \
                                        --topic test

                            If there are multiple partition, then we optimize the code here,
                            so that we send request to different partitions.

    */
    payloads = [
        { topic: 'test', messages: 'This is the First Message I am sending', partition: 0 },
    ];


//  producer 'on' ready to send payload to kafka.
producer.on('ready', function(){
    producer.send(payloads, function(err, data){
        console.log(data)
    });
});

producer.on('error', function(err){}

Explanation:

  • The code imports the kafka-node library.
  • It creates a Kafka client and a producer, connecting to the Zookeeper instance running on kafka:2181. Replace kafka with the actual hostname or IP address of your Zookeeper server.
  • A payloads array defines the messages to be sent. Each object in the array represents a message and includes the topic name, the message content, and the partition number.
  • The producer.on('ready', ...) event handler is called when the producer is ready to send messages. It sends the payloads to Kafka and logs the response.
  • The producer.on('error', ...) event handler is called if there is an error during the sending process.

Step 2 - Start the kafka cluster as we already did in Installation of Kafka. Assuming topic as test

Ensure your Kafka cluster is running and that the “test” topic has been created.

Step 3 - Start the consumer service as in the below command.

Start the Kafka console consumer to listen for messages on the “test” topic:

[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
                                                                    --topic test --from-beginning

Step 4 - Execute below command. This will send This is the First Message I am sending Message to the Kafka consumer.

Run the producer_nodejs.js script:

[nodejs-admin@nodejs nodejs]$ node producer_nodejs.js

Step 5 - Check on the consumer you will see the message sent from nodejs.

You should see the message “This is the First Message I am sending” appear in the Kafka console consumer:

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
                                            --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message here
This is the First Message I am sending

Sending JSON to NodeJS to Kafka.

Let’s create an example that:

  1. Receives JSON data from a client (e.g., a web browser or curl).
  2. Sends the JSON data to a Kafka topic using NodeJS.
  3. Verifies that the JSON data is received by the Kafka consumer.

Step 1 - Create a script called json_nodejs_kafka.js with below script.

/*
    Getting some 'http' power
*/
var http=require('http');

/*
    Setting where we are expecting the request to arrive.
    http://localhost:8125/upload

*/
var request = {
        hostname: 'localhost',
        port: 8125,
        path: '/upload',
        method: 'GET'
};

/*
    Lets create a server to wait for request.
*/
http.createServer(function(request, response)
{
    /*
        Making sure we are waiting for a JSON.
    */
    response.writeHeader(200, {"Content-Type": "application/json"});

    /*
        request.on waiting for data to arrive.
    */
    request.on('data', function (chunk)
    {

        /*
            CHUNK which we recive from the clients
            For out request we are assuming its going to be a JSON data.
            We print it here on the console.
        */
        console.log(chunk.toString('utf8'))

        /*
            Using kafka-node - really nice library
            create a producer and connect to a Zookeeper to send the payloads.
        */
        var kafka = require('kafka-node'),
        Producer = kafka.Producer,
        client = new kafka.Client('kafka:2181'),
        producer = new Producer(client);

        /*
            Creating a payload, which takes below information
            'topic'     -->    this is the topic we have created in kafka.
            'messages'     -->    data which needs to be sent to kafka. (JSON in our case)
            'partition' -->    which partition should we send the request to.
                            If there are multiple partition, then we optimize the code here,
                            so that we send request to different partitions.

        */
            payloads = [
            { topic: 'test', messages: chunk.toString('utf8'), partition: 0 },
        ];

        /*
            producer 'on' ready to send payload to kafka.
        */
        producer.on('ready', function(){
            producer.send(payloads, function(err, data){
                    console.log(data)
            });
        });

        /*
            if we have some error.
        */
        producer.on('error', function(err){})

    });
    /*
        end of request
    */
    response.end();

/*
    Listen on port 8125
*/
}).listen(8125);

Explanation:

  • This code combines the HTTP server and Kafka producer functionality into a single script.
  • When the server receives a request with JSON data, it extracts the data from the request body.
  • It then creates a Kafka producer and sends the JSON data to the “test” topic.
  • The console.log(chunk.toString('utf8')) line prints the received JSON data to the NodeJS console.
  • The console.log(data) line (inside producer.on('ready')) prints the Kafka response to the NodeJS console.

Step 2 - Start above script on the nodejs server.

[nodejs-admin@nodejs nodejs]$ vim json_nodejs_kafka.js
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js

Step 3 - Execute curl command to send the JSON to nodejs.

[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
                               -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload

Step 4 - Output on nodejs console

[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
{"username":"xyz","password":"xyz"}
{ test: { '0': 29 } }
  • {"username":"xyz","password":"xyz"} is the JSON request received from the curl command.
  • { test: { '0': 29 } } is the response from the Kafka cluster indicating that it has successfully received the JSON data. The ‘0’ indicates partition number and 29 is the offset.

Step 5 - Output on the kafka consumer side.

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
                                            --zookeeper localhost:2181 --topic test --from-beginning
{"username":"xyz","password":"xyz"}

You should see the JSON data that was sent from NodeJS displayed in the Kafka console consumer.