This post outlines how to send JSON data from a client (browser or curl
) through a NodeJS server and ultimately into a Kafka topic. This architecture is useful for ingesting data streams into a robust and scalable messaging system for further processing.
What we are trying to achieve?
- Send
JSON
data from a browser or usingcurl
to a NodeJS server. - The NodeJS server will receive the
JSON
data and redirect it to a Kafka topic. - Further processing can then be performed on the data within the Kafka ecosystem (e.g., using Kafka Streams, Spark Streaming).
- Finally, verify the
JSON
data arrives correctly in Kafka using thekafka-console-consumer.sh
script.
Step 1: Create json_nodejs_kafka.js
Create a file named json_nodejs_kafka.js
with the following NodeJS code. This script sets up a simple HTTP server that listens for incoming JSON
data and forwards it to Kafka.
/*
Getting some 'http' power
*/
var http=require('http');
/*
Setting where we are expecting the request to arrive.
http://localhost:8125/upload
*/
var request = {
hostname: 'localhost',
port: 8125,
path: '/upload',
method: 'GET'
};
/*
Lets create a server to wait for request.
*/
http.createServer(function(request, response)
{
/*
Making sure we are waiting for a JSON.
*/
response.writeHeader(200, {"Content-Type": "application/json"});
/*
request.on waiting for data to arrive.
*/
request.on('data', function (chunk)
{
/*
CHUNK which we recive from the clients
For out request we are assuming its going to be a JSON data.
We print it here on the console.
*/
console.log(chunk.toString('utf8'))
/*
Using kafka-node - really nice library
create a producer and connect to a Zookeeper to send the payloads.
*/
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);
/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka.
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to.
If there are multiple partition, then we optimize the code here,
so that we send request to different partitions.
*/
payloads = [
{ topic: 'test', messages: chunk.toString('utf8'), partition: 0 },
];
/*
producer 'on' ready to send payload to kafka.
*/
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});
/*
if we have some error.
*/
producer.on('error', function(err){})
});
/*
end of request
*/
response.end();
/*
Listen on port 8125
*/
}).listen(8125);
Explanation:
require('http')
: Imports the built-in NodeJS HTTP module.http.createServer(...)
: Creates an HTTP server that listens for incoming requests.response.writeHeader(200, {"Content-Type": "application/json"})
: Sets the response header to indicate that the server expects and will handleJSON
data.request.on('data', function (chunk) { ... })
: This event listener is triggered when data is received in the request. The data is received in chunks.chunk.toString('utf8')
: Converts the received data chunk (which is typically a buffer) to a UTF-8 string. This is essential for handling theJSON
data.kafka = require('kafka-node')
: Imports thekafka-node
library. Make sure you have installed it usingnpm install kafka-node
.client = new kafka.Client('kafka:2181')
: Creates a Kafka client, connecting to the Zookeeper instance atkafka:2181
. Important: Replacekafka:2181
with the actual address of your Zookeeper instance. Usingkafka
assumes you have configured your environment (e.g., using Docker Compose) so thatkafka
resolves to the correct IP address.producer = new Producer(client)
: Creates a Kafka producer, which will send messages to Kafka.payloads = [{ topic: 'test', messages: chunk.toString('utf8'), partition: 0 }]
: Defines the payload to be sent to Kafka.topic
: The Kafka topic to which the message will be sent (in this case,test
). You’ll need to create this topic in Kafka.messages
: The actual message content, which is theJSON
data received from the client.partition
: The partition within the topic to which the message should be sent. Setting it to0
sends all messages to the first partition. For production systems, you’ll want to implement a partitioning strategy for better throughput.
producer.on('ready', function() { ... })
: This event listener is triggered when the producer is ready to send messages. Theproducer.send(payloads, ...)
function actually sends the data to Kafka.producer.on('error', function(err) { ... })
: Handles any errors that occur during message production. It’s good practice to log these errors..listen(8125)
: Starts the HTTP server and listens for incoming requests on port 8125.
Step 2: Start the NodeJS Script
Open a terminal and navigate to the directory where you saved json_nodejs_kafka.js
. Then, start the script using:
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
This will start the NodeJS server, and it will begin listening for incoming requests on port 8125.
Step 3: Send JSON Data using curl
Open another terminal and use the curl
command to send a JSON
payload to the NodeJS server:
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
-d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload
Explanation:
-H "Content-Type: application/json"
: Sets theContent-Type
header toapplication/json
, indicating that the request body containsJSON
data. This is crucial so the server knows how to interpret the data.-d '{"username":"xyz","password":"xyz"}'
: Specifies theJSON
data to be sent in the request body.http://localhost:8125/upload
: The URL of the NodeJS server endpoint to which the request is sent.
Step 4: Verify Output on the NodeJS Console
In the terminal where you started the NodeJS script, you should see output similar to this:
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
{"username":"xyz","password":"xyz"}
{ test: { '0': 29 } }
{"username":"xyz","password":"xyz"}
: This is theJSON
data received from thecurl
command. Theconsole.log(chunk.toString('utf8'))
line in the NodeJS script prints this.{ test: { '0': 29 } }
: This is the response from the Kafka cluster, indicating that it has successfully received theJSON
data. The'0'
represents the partition and29
is an offset.
Step 5: Verify Output on the Kafka Consumer Side
Open a new terminal on the Kafka server and use the kafka-console-consumer.sh
script to consume messages from the test
topic:
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 --topic test --from-beginning
{"username":"xyz","password":"xyz"}
Explanation:
bin/kafka-console-consumer.sh
: The path to the Kafka console consumer script.--zookeeper localhost:2181
: Specifies the Zookeeper instance to connect to. Important: Replacelocalhost:2181
with the actual address of your Zookeeper instance.--topic test
: Specifies the Kafka topic to consume messages from.--from-beginning
: Starts consuming messages from the beginning of the topic.
You should see the JSON
data you sent from the curl
command:
{"username":"xyz","password":"xyz"}
This confirms that the JSON
data was successfully sent from the browser/curl
to the NodeJS server and then to the Kafka topic.
Important Considerations:
- Error Handling: The provided NodeJS script has minimal error handling. In a production environment, you should add robust error handling to catch exceptions and log errors. This is particularly important for Kafka producer errors.
- Kafka Configuration: Ensure your Kafka broker and Zookeeper instances are properly configured and accessible. The addresses used in the NodeJS script (
kafka:2181
andlocalhost:2181
) must be correct. - Dependencies: Make sure you have the
kafka-node
package installed. Runnpm install kafka-node
in your NodeJS project directory. - Asynchronous Nature: The Kafka producer operations are asynchronous. You should handle the asynchronous nature of these operations correctly to avoid potential issues with message delivery.
- Partitioning: For production systems, carefully consider your partitioning strategy to distribute messages across multiple partitions for increased throughput and scalability. The current example uses a single partition.
- Kafka Version: Ensure the
kafka-node
library is compatible with your Kafka version. - Security: For production systems, secure your Kafka cluster and NodeJS application.
This detailed guide provides a solid foundation for sending JSON
data from a client to Kafka via NodeJS. Remember to adapt the code and configurations to your specific environment and requirements.