This guide demonstrates how to build a Node.js application that receives JSON data via HTTP and routes it to different Apache Kafka topics based on the URL path. We’ll be using the kafka-node library, a popular Node.js client for Kafka, to achieve this.

Kafka-node leverages Zookeeper integration, providing crucial functionalities:

  • Metadata Loading: Retrieves broker metadata from Zookeeper, essential for communication with the Kafka cluster.
  • Broker State Monitoring: Actively monitors broker state changes and automatically refreshes the client’s stored broker and topic metadata, ensuring resilience and up-to-date information.

The Goal:

  1. Accept JSON data sent from a browser or via curl to a Node.js server.
  2. Implement routing logic in Node.js to direct the received JSON to specific Kafka topics based on the request URL. For example, a request to /upload/topic/A will send the JSON payload to the topic_a in Kafka.
  3. Leverage Kafka’s inherent message processing capabilities.
  4. Verify successful JSON delivery to Kafka using the kafka-console-consumer.sh script.

Step 1: Retrieve the json_nodejs_multiple_topics.js Script

Obtain the necessary Node.js script from the GitHub repository:

json_nodejs_multiple_topics.js

This script contains the core logic for receiving HTTP requests, extracting the target Kafka topic from the URL, and publishing the JSON payload to that topic. Examine the script to understand how it utilizes kafka-node to interact with your Kafka cluster. Key parts to look at are the HTTP server setup, the URL parsing, and the Kafka producer configuration.

Step 2: Start the Node.js Server

Deploy and execute the script on your Node.js server. Ensure you have Node.js and npm installed.

[nodejs-admin@nodejs nodejs]$ vim json_nodejs_multiple_topics.js
[nodejs-admin@nodejs nodejs]$ node json_nodejs_multiple_topics.js

This command starts the Node.js server, listening for incoming HTTP requests on the configured port (typically 8125 in the example).

Step 3: Send JSON Payloads with curl

Use curl commands to simulate sending JSON data to different Kafka topics:

[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
                    -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload/topic/A
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
                    -d '{"username":"abc","password":"xyz"}' http://localhost:8125/upload/topic/B
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
                    -d '{"username":"efg","password":"xyz"}' http://localhost:8125/upload/topic/C
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" \
                    -d '{"username":"efg","password":"xyz"}' http://localhost:8125/upload/topic/D

These commands send POST requests with a JSON payload to the Node.js server. The URL determines the target Kafka topic. Note that the last curl command to /upload/topic/D is intended to show an error scenario if the topic topic_d has not been created.

Step 4: Node.js Server Output

Observe the output on the Node.js console. This output confirms that the server received the JSON data and attempted to send it to the specified Kafka topic.

[nginx-admin@nginx nodejs]$ node json_nodejs_multiple_topics.js
For Topic A
{"username":"xyz","password":"xyz"}
{ topic_a: { '0': 16 } }
For Topic B
{"username":"abc","password":"xyz"}
{ topic_b: { '0': 1 } }
For Topic C
{"username":"efg","password":"xyz"}
{ topic_c: { '0': 0 } }
ERROR: Could not Process this URL :/upload/topic/D
{"username":"efg","password":"xyz"}
  • {"username":"xyz","password":"xyz"} represents the JSON request received from the curl command.
  • { topic_a: { '0': 16 } } indicates the response from the Kafka cluster, confirming that the JSON message has been received for topic_a. The '0' represents the partition and 16 the offset.
  • The “ERROR” message demonstrates error handling for invalid topic routes. This highlights the importance of robust error handling in a production environment.

Step 5: Verify Data in Kafka using the Consumer

Before proceeding, ensure that the necessary Kafka topics have been created:

[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
                                            --replication-factor 1 --partitions 1 --topic topic_a
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
                                            --replication-factor 1 --partitions 1 --topic topic_b
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
                                            --replication-factor 1 --partitions 1 --topic topic_c
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
topic_a
topic_b
topic_c
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$

This creates three topics: topic_a, topic_b, and topic_c. It’s crucial to create the topics before sending data to them, or you may encounter errors.

Now, use the kafka-console-consumer.sh script to verify the data within each topic:

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
                                        --zookeeper localhost:2181 --topic topic_a --from-beginning
{"username":"xyz","password":"xyz"}

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
                                        --zookeeper localhost:2181 --topic topic_b --from-beginning
{"username":"abc","password":"xyz"}

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh \
                                        --zookeeper localhost:2181 --topic topic_c --from-beginning
{"username":"efg","password":"xyz"}

The output confirms that the JSON data sent from the Node.js server has been successfully received and stored in the respective Kafka topics. The --from-beginning flag ensures you read all messages from the beginning of the topic.

Explanation of Commands and Parameters:

  • bin/kafka-topics.sh: This script, included with Kafka, is used to manage Kafka topics.
    • --create: Specifies that you want to create a new topic.
    • --zookeeper localhost:2181: Specifies the Zookeeper connection string (host and port). Kafka uses Zookeeper for cluster management and configuration.
    • --replication-factor 1: Sets the replication factor for the topic to 1. This means each message will be stored on only one broker. For production environments, a higher replication factor is recommended for fault tolerance.
    • --partitions 1: Sets the number of partitions for the topic to 1. Partitions allow you to parallelize consumption of messages from a topic.
    • --topic topic_a: Specifies the name of the topic to create.
    • --list: List all the available topics
  • bin/kafka-console-consumer.sh: This script, also included with Kafka, allows you to consume messages from a Kafka topic and print them to the console.
    • --zookeeper localhost:2181: Specifies the Zookeeper connection string.
    • --topic topic_a: Specifies the name of the topic to consume from.
    • --from-beginning: Tells the consumer to start reading messages from the beginning of the topic, rather than only new messages. This is useful for verifying that previously sent messages were successfully stored.
Additional Resources