Ready to integrate your NodeJS application with Kafka? This post provides a comprehensive guide to setting up a Kafka producer using the kafka-node
library. We’ll walk through the code, explain each step, and offer insights into best practices.
Before diving in, make sure you have Kafka and NodeJS installed and configured. Refer to our previous post on Installing Kafka and Setting up NodeJS if needed.
Our setup uses the following server details:
nodejs
: The server where your NodeJS application runs.kafka
: The Kafka server (in this example, a single-node cluster).
Step 1: Creating the producer_nodejs.js
Script
Create a file named producer_nodejs.js
and copy the following script into it. This script leverages the kafka-node
library to connect to your Kafka cluster and send messages.
/*
Basic producer to send data to kafka from nodejs.
More Information Here : https://www.npmjs.com/package/kafka-node
*/
// Using kafka-node - a robust and well-maintained library
// Creates a producer and connects to Zookeeper to send payloads.
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);
/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka. (test)
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to. (default)
example command to create a topic in kafka:
[kafka@kafka kafka]$ bin/kafka-topics.sh \
--create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
If there are multiple partitions, then we optimize the code here,
so that we send requests to different partitions. Load balancing!
*/
payloads = [
{ topic: 'test', messages: 'This is the First Message I am sending', partition: 0 },
];
// producer 'on' ready to send payload to kafka.
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});
producer.on('error', function(err){
console.error("Error connecting to Kafka:", err);
})
Explanation:
require('kafka-node')
: Imports thekafka-node
library. Make sure you have installed it usingnpm install kafka-node
.new kafka.Client('kafka:2181')
: Creates a Kafka client, connecting to the Zookeeper instance running onkafka:2181
. Important: Replacekafka:2181
with the correct address and port of your Zookeeper.new Producer(client)
: Creates a Kafka producer using the established client connection.payloads
: An array of message payloads. Each payload specifies the topic, message content, and partition (optional). We are sending simple string but can be JSON. This is where you define what data you want to send to Kafka.producer.on('ready', ...)
: This is an event listener that triggers when the producer is successfully connected and ready to send messages. Inside, theproducer.send(payloads, ...)
function sends the defined payloads to Kafka.producer.on('error', ...)
: Another event listener that catches any errors that occur during the producer’s operation. It’s crucial to implement error handling to ensure your application is resilient. We’ve added aconsole.error
call here for basic logging.
Step 2: Start the Kafka Cluster
As mentioned in our Installation of Kafka guide, ensure your Kafka cluster is running. Also, verify that the test
topic exists. If not, create it using the following command:
[kafka@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
This command creates a topic named test
with a replication factor of 1 and a single partition.
Step 3: Start the Kafka Consumer
To verify that our producer is working correctly, we’ll start a Kafka consumer in the console. This consumer will listen to the test
topic and display any messages it receives.
[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
The --from-beginning
flag ensures that the consumer reads all messages from the beginning of the topic, even if it has been running before.
Step 4: Run the NodeJS Producer
Now, execute the producer_nodejs.js
script from your NodeJS server:
[nodejs-admin@nodejs nodejs]$ node producer_nodejs.js
This will run the script, connect to Kafka, and send the message defined in the payloads
array. You should see output in your console indicating that the message has been sent successfully. The console.log(data)
inside the producer.send
callback will output details about the message delivery.
Step 5: Verify the Message in the Consumer
Switch back to the Kafka consumer console. You should see the message “This is the First Message I am sending” displayed.
[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is the First Message I am sending
Congratulations! You’ve successfully sent a message to Kafka from your NodeJS application.
Further Exploration
This is a basic example, but kafka-node
offers much more functionality:
- Sending JSON Data: Modify the
payloads
array to send JSON data instead of strings. For example:{ topic: 'test', messages: JSON.stringify({ message: 'Hello Kafka!' }), partition: 0 }
- Multiple Partitions: If your topic has multiple partitions, you can distribute messages across them for increased throughput. You can either specify the partition directly in the payload or let Kafka automatically assign partitions based on a key.
- Asynchronous Producers: Explore asynchronous producers for improved performance. These producers buffer messages in memory and send them in batches, reducing the overhead of individual send requests.
- Compression: Enable compression (e.g., GZIP, Snappy) to reduce the size of messages sent to Kafka, saving bandwidth and storage space.
By understanding the fundamentals presented in this guide, you can start building robust and scalable applications that leverage the power of Kafka for real-time data streaming and processing. Remember to consult the kafka-node
documentation for a complete overview of its features and options.