This blog post provides a comprehensive, step-by-step guide to integrating NodeJS with Kafka. Whether you’re building real-time data pipelines, implementing microservices architecture, or simply exploring the power of distributed messaging, this guide will equip you with the knowledge and practical instructions to get started. We’ll cover everything from setting up a single-node Kafka cluster to sending and receiving JSON data between your NodeJS application and Kafka topics.
Installing KAFKA Single Node - Quick Start
Before diving into the NodeJS integration, we need a running Kafka instance. This section will guide you through setting up a single-node Kafka cluster for development and testing.
Download and Extract
First, download the Kafka distribution and extract it to your desired location. This example assumes you’re working on a Linux-based system.
[kafka-admin@kafka Downloads]$ ls
jdk-7u75-linux-x64.rpm kafka_2.9.2-0.8.2.0.tgz
[kafka-admin@kafka Downloads]$ sudo rpm -ivh jdk-7u75-linux-x64.rpm
...
[kafka-admin@kafka Downloads]$ sudo tar -xzf kafka_2.9.2-0.8.2.0.tgz -C /opt
[kafka-admin@kafka opt]$ cd /opt
[kafka-admin@kafka opt]$ sudo ln -s kafka_2.9.2-0.8.2.0 kafka
[kafka-admin@kafka opt]$ ls
kafka kafka_2.9.2-0.8.2.0
[kafka-admin@kafka opt]$ sudo chmod kafka-admin:kafka-admin -R kafka
Explanation:
- We first list the contents of the
Downloads
directory, showing the JDK and Kafka TGZ files. - Then, we install the JDK using
rpm
. Kafka requires Java to run. You may need to adapt this step depending on your system’s Java setup. - Next, we extract the Kafka TGZ archive to the
/opt
directory. - A symbolic link
kafka
is created for easier access to the Kafka directory. - Finally, we change the ownership of the Kafka directory to the
kafka-admin
user. Adjust this based on your user setup.
Now we are ready to start all the services required.
Kafka depends on Zookeeper. So we have to start that first.
[kafka-admin@kafka opt]$ cd kafka
[kafka-admin@kafka kafka]$ ls
bin config libs LICENSE logs NOTICE
[kafka-admin@kafka kafka]$ bin/zookeeper-server-start.sh config/zookeeper.properties
This command starts Zookeeper, Kafka’s dependency. By default, it will run on localhost:2181
. You can modify this in the config/zookeeper.properties
file.
Important: If you plan to run Zookeeper on a separate machine, update the zookeeper.connect
property in the config/server.properties
file of your Kafka brokers to point to the correct Zookeeper address.
Next we start server.
[kafka-admin@kafka kafka]$ bin/kafka-server-start.sh config/server.properties
This starts the Kafka server using the default configuration.
Running Multiple Brokers:
To simulate a distributed Kafka cluster, you can run multiple Kafka brokers on a single machine. To do so, create multiple copies of the server.properties
file (e.g., server-1.properties
, server-2.properties
), and modify the following properties in each file:
broker.id
: A unique integer identifier for each broker.port
: The port on which the broker will listen for connections.log.dir
: The directory where the broker will store its logs.
Example configurations:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
Then, start each broker with its respective configuration file:
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
Creating Topics
Kafka organizes messages into topics. Before you can send or receive messages, you need to create a topic.
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
This command creates a topic named “test” with a single partition and a replication factor of 1.
--create
: Specifies that we want to create a topic.--zookeeper localhost:2181
: Specifies the Zookeeper connection string.--replication-factor 1
: Sets the replication factor to 1, meaning each message will be stored on one broker. For production environments, use a higher replication factor for fault tolerance.--partitions 1
: Sets the number of partitions to 1. Partitions allow for parallel processing of messages.--topic test
: Specifies the name of the topic to create.
To list existing topics, use the following command:
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
[kafka-admin@kafka kafka]$
Send some message
Now that we have a Kafka cluster and a topic, let’s send some messages. Open a new terminal and use the Kafka console producer:
[kafka-admin@kafka kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is a message2
Each line you type will be sent as a separate message to the “test” topic. Press Enter
after each message.
Start a Consumer
To receive the messages, open another new terminal and start the Kafka console consumer:
[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic test --from-beginning
This is a message
This is a message2
The --from-beginning
option ensures that the consumer starts reading messages from the beginning of the topic, even if it has already consumed messages before. You should see the messages you sent using the producer.
Installing NodeJS on Centos 6.6
Now, let’s install NodeJS and npm, which are required for running our NodeJS application that will interact with Kafka.
Installing nodejs
and npm
on centos is very simple.
[nginx-admin@nginx ~]$ sudo su
[nginx-admin@nginx ~]# curl -sL https://rpm.nodesource.com/setup | bash -
[nginx-admin@nginx ~]# yum install -y nodejs
These commands install NodeJS and npm using the NodeSource repository. This is a common and reliable way to install NodeJS on CentOS.
Installing gcc-c++
and make
.
These tools are needed to compile native modules, which some NodeJS packages may require.
[nginx-admin@nginx ~]$ sudo yum install gcc-c++ make
[sudo] password for nginx-admin:
Loaded plugins: fastestmirror, refresh-packagekit, security
Setting up Install Process
Loading mirror speeds from cached hostfile
* base: mirrors.123host.vn
* epel: ftp.cuhk.edu.hk
* extras: centos-hn.viettelidc.com.vn
* updates: mirrors.vonline.vn
Package 1:make-3.81-20.el6.x86_64 already installed and latest version
Resolving Dependencies
...
Complete!
Later on we will need kafka-node
lets install that as well.
The kafka-node
package is a popular NodeJS client for interacting with Kafka.
[nginx-admin@nginx ~]$ sudo npm install kafka-node
[sudo] password for nginx-admin:
> snappy@3.0.6 install /home/nginx-admin/node_modules/kafka-node/node_modules/snappy
> node-gyp rebuild
gyp WARN EACCES user "root" does not have permission to access the dev dir "/root/.node-gyp/0.10.36"
gyp WARN EACCES attempting to reinstall using temporary dev dir
"/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/.node-gyp"
make: Entering directory `/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/build'
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-sinksource.o
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-stubs-internal.o
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy.o
AR(target) Release/obj.target/deps/snappy/snappy.a
COPY Release/snappy.a
CXX(target) Release/obj.target/binding/src/binding.o
SOLINK_MODULE(target) Release/obj.target/binding.node
SOLINK_MODULE(target) Release/obj.target/binding.node: Finished
COPY Release/binding.node
make: Leaving directory `/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/build'
kafka-node@0.2.18 node_modules/kafka-node
├── buffer-crc32@0.2.5
├── retry@0.6.1
├── node-uuid@1.4.1
├── async@0.7.0
├── lodash@2.2.1
├── debug@2.1.1 (ms@0.6.2)
├── binary@0.3.0 (buffers@0.1.1, chainsaw@0.1.0)
├── node-zookeeper-client@0.2.0 (async@0.2.10, underscore@1.4.4)
├── buffermaker@1.2.0 (long@1.1.2)
└── snappy@3.0.6 (bindings@1.1.1, nan@1.5.3)
[nginx-admin@nginx ~]$ ls
Lets do a test.
To verify that NodeJS is installed correctly, let’s create a simple HTTP server:
Create a script called example.js
with the following code:
var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Hello World\n');
}).listen(1337, '127.0.0.1');
console.log('Server running at http://127.0.0.1:1337/');
Start the server on a terminal:
[nginx-admin@nginx nodejs]$ node example.js
Server running at http://127.0.0.1:1337/
Open a web browser and navigate to http://127.0.0.1:1337/
. You should see “Hello World” displayed.
Lets make some simple changes to exsisting script to handle JSON.
Let’s modify our NodeJS server to handle JSON data.
// Getting some 'http' power
var http=require('http');
// Setting where we are expecting the request to arrive.
// http://localhost:8125/upload
var request = {
hostname: 'localhost',
port: 8125,
path: '/upload',
method: 'GET'
};
// Lets create a server to wait for request.
http.createServer(function(request, response)
{
// Making sure we are waiting for a JSON.
response.writeHeader(200, {"Content-Type": "application/json"});
// request.on waiting for data to arrive.
request.on('data', function (chunk)
{
// CHUNK which we recive from the clients
// For out request we are assuming its going to be a JSON data.
// We print it here on the console.
console.log(chunk.toString('utf8'))
});
//end of request
response.end();
// Listen on port 8125
}).listen(8125);
Save this code as node_recv_json.js
and start the server:
[nginx-admin@nginx nodejs]$ node node_recv_json.js
In a new terminal, send a JSON payload to the server using curl
:
[nginx-admin@nginx nodejs]$ curl -H "Content-Type: application/json" \
-d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload
You should see the JSON data printed on the server’s console:
[nginx-admin@nginx nodejs]$ node node_recv_json.js
{"username":"xyz","password":"xyz"}
NodeJS Kafka Producer - Using kafka-node
Now that we have Kafka and NodeJS set up, let’s build a NodeJS application that sends data to Kafka.
The following code demonstrates a basic Kafka producer using the kafka-node
library.
Step 1 - Copy the below script in a file called producer_nodejs.js
.
/*
Basic producer to send data to kafka from nodejs.
More Information Here : https://www.npmjs.com/package/kafka-node
*/
// Using kafka-node - really nice library
// create a producer and connect to a Zookeeper to send the payloads.
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);
/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka. (test)
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to. (default)
example command to create a topic in kafka:
[kafka@kafka kafka]$ bin/kafka-topics.sh \
--create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
If there are multiple partition, then we optimize the code here,
so that we send request to different partitions.
*/
payloads = [
{ topic: 'test', messages: 'This is the First Message I am sending', partition: 0 },
];
// producer 'on' ready to send payload to kafka.
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});
producer.on('error', function(err){}
Explanation:
- The code imports the
kafka-node
library. - It creates a Kafka client and a producer, connecting to the Zookeeper instance running on
kafka:2181
. Replacekafka
with the actual hostname or IP address of your Zookeeper server. - A
payloads
array defines the messages to be sent. Each object in the array represents a message and includes the topic name, the message content, and the partition number. - The
producer.on('ready', ...)
event handler is called when the producer is ready to send messages. It sends thepayloads
to Kafka and logs the response. - The
producer.on('error', ...)
event handler is called if there is an error during the sending process.
Step 2 - Start the kafka
cluster as we already did in Installation of Kafka
. Assuming topic as test
Ensure your Kafka cluster is running and that the “test” topic has been created.
Step 3 - Start the consumer service as in the below command.
Start the Kafka console consumer to listen for messages on the “test” topic:
[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic test --from-beginning
Step 4 - Execute below command. This will send This is the First Message I am sending
Message to the Kafka consumer.
Run the producer_nodejs.js
script:
[nodejs-admin@nodejs nodejs]$ node producer_nodejs.js
Step 5 - Check on the consumer you will see the message sent from nodejs
.
You should see the message “This is the First Message I am sending” appear in the Kafka console consumer:
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message here
This is the First Message I am sending
Sending JSON to NodeJS to Kafka.
Let’s create an example that:
- Receives JSON data from a client (e.g., a web browser or
curl
). - Sends the JSON data to a Kafka topic using NodeJS.
- Verifies that the JSON data is received by the Kafka consumer.
Step 1 - Create a script called json_nodejs_kafka.js
with below script.
/*
Getting some 'http' power
*/
var http=require('http');
/*
Setting where we are expecting the request to arrive.
http://localhost:8125/upload
*/
var request = {
hostname: 'localhost',
port: 8125,
path: '/upload',
method: 'GET'
};
/*
Lets create a server to wait for request.
*/
http.createServer(function(request, response)
{
/*
Making sure we are waiting for a JSON.
*/
response.writeHeader(200, {"Content-Type": "application/json"});
/*
request.on waiting for data to arrive.
*/
request.on('data', function (chunk)
{
/*
CHUNK which we recive from the clients
For out request we are assuming its going to be a JSON data.
We print it here on the console.
*/
console.log(chunk.toString('utf8'))
/*
Using kafka-node - really nice library
create a producer and connect to a Zookeeper to send the payloads.
*/
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);
/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka.
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to.
If there are multiple partition, then we optimize the code here,
so that we send request to different partitions.
*/
payloads = [
{ topic: 'test', messages: chunk.toString('utf8'), partition: 0 },
];
/*
producer 'on' ready to send payload to kafka.
*/
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});
/*
if we have some error.
*/
producer.on('error', function(err){})
});
/*
end of request
*/
response.end();
/*
Listen on port 8125
*/
}).listen(8125);
Explanation:
- This code combines the HTTP server and Kafka producer functionality into a single script.
- When the server receives a request with JSON data, it extracts the data from the request body.
- It then creates a Kafka producer and sends the JSON data to the “test” topic.
- The
console.log(chunk.toString('utf8'))
line prints the received JSON data to the NodeJS console. - The
console.log(data)
line (insideproducer.on('ready')
) prints the Kafka response to the NodeJS console.
Step 2 - Start above script on the nodejs
server.
[nodejs-admin@nodejs nodejs]$ vim json_nodejs_kafka.js
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
Step 3 - Execute curl
command to send the JSON to nodejs
.
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
-d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload
Step 4 - Output on nodejs console
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
{"username":"xyz","password":"xyz"}
{ test: { '0': 29 } }
{"username":"xyz","password":"xyz"}
is the JSON request received from thecurl
command.{ test: { '0': 29 } }
is the response from the Kafka cluster indicating that it has successfully received the JSON data. The ‘0’ indicates partition number and 29 is the offset.
Step 5 - Output on the kafka
consumer side.
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 --topic test --from-beginning
{"username":"xyz","password":"xyz"}
You should see the JSON data that was sent from NodeJS displayed in the Kafka console consumer.