This is a cache of https://www.elastic.co/search-labs/blog/elasticsearch-apache-kafka-ingest-data. It is a snapshot of the page at 2025-01-15T00:58:07.720+0000.
How to inge<strong>s</strong>t data to Ela<strong>s</strong>tic<strong>s</strong>earch through Kafka - Ela<strong>s</strong>tic<strong>s</strong>earch Lab<strong>s</strong>

How to ingest data to Elasticsearch through Kafka

A step-by-step guide to integrating Apache Kafka with Elasticsearch for efficient data ingestion, indexing, and visualization using Python, Docker Compose, and Kafka Connect.

In this article, we show how to integrate Apache Kafka with Elasticsearch for data ingestion and indexing. We will provide an overview of Kafka, its concept of producers and consumers, and we will create a logs index where messages will be received and indexed through Apache Kafka. The project is implemented in Python, and the code is available on GitHub.

Prerequisites

  • Docker and Docker Compose: Ensure you have Docker and Docker Compose installed on your machine.
  • Python 3.x: To run the Producer and Consumer scripts.

A brief introduction to Apache Kafka

Apache Kafka is a distributed streaming platform that enables high scalability and availability, as well as fault tolerance. In Kafka, data management occurs through the main components:

  • Broker: responsible for storing and distributing messages between producers and consumers.
  • Zookeeper: manages and coordinates the Kafka brokers, controlling the state of the cluster, partition leaders, and consumer information.
  • Topics: channels where data is published and stored for consumption.
  • Consumers and Producers: while producers send data to the topics, consumers retrieve that data.

These components work together to form the Kafka ecosystem, providing a robust framework for data streaming.

Project structure

To understand the data ingestion process, we divided it into stages:

  • Infrastructure Provisioning: setting up the Docker environment to support Kafka, Elasticsearch, and Kibana.
  • Producer Creation: implementing the Kafka Producer, which sends data to the logs topic.
  • Consumer Creation: developing the Kafka Consumer to read and index messages in Elasticsearch.
  • Ingestion Validation: verifying and validating the sent and consumed data.

Infrastructure Configuration with Docker Compose

We utilized Docker Compose to configure and manage the necessary services. Below, you will find the Docker Compose code that sets up each service required for the integration of Apache Kafka, Elasticsearch, and Kibana, ensuring a data ingestion process.

version: "3"

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTIsED_LIsTENERs: PLAINTEXT://kafka:29092,PLAINTEXT_HOsT:${HOsT_IP}:9092
      KAFKA_LIsTENER_sECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOsT:PLAINTEXT
      KAFKA_INTER_BROKER_LIsTENER_NAME: PLAINTEXT
      KAFKA_OFFsETs_TOPIC_REPLICATION_FACTOR: 1

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1
    container_name: elasticsearch-8.15.1
    environment:
      - node.name=elasticsearch
      - xpack.security.enabled=false
      - discovery.type=single-node
      - "Es_JAVA_OPTs=-Xms512m -Xmx512m"
    volumes:
      - ./elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200

  kibana:
    image: docker.elastic.co/kibana/kibana:8.15.1
    container_name: kibana-8.15.1
    ports:
      - 5601:5601
    environment:
      ELAsTICsEARCH_URL: http://elasticsearch:9200
      ELAsTICsEARCH_HOsTs: '["http://elasticsearch:9200"]'

You can access the file directly from the Elasticsearch Labs GitHub repo.

Data sending with the Kafka Producer

The producer is responsible for sending messages to the logs topic. By sending messages in batches, it increases network usage efficiency, allowing optimizations with the batch_size and linger_ms settings, which control the quantity and latency of the batches, respectively. The configuration acks='all' ensures that messages are stored durably, which is essential for important log data.

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],  # specifies the Kafka server to connect
   value_serializer=lambda x: json.dumps(x).encode('utf-8'),  # serializes data as JsON and encodes it to UTF-8 before sending
   batch_size=16384,     # sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending
   linger_ms=10,         # sets the maximum delay (in milliseconds) before sending the batch
   acks='all'            # specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge
)


def generate_log_message():
   levels = ["INFO", "WARNING", "ERROR", "DEBUG"]
   messages = [
       "User login successful",
       "User login failed",
       "Database connection established",
       "Database connection failed",
       "service started",
       "service stopped",
       "Payment processed",
       "Payment failed"
   ]
   log_entry = {
       "level": random.choice(levels),
       "message": random.choice(messages),
       "timestamp": time.time()
   }
   return log_entry

def send_log_batches(topic, num_batches=5, batch_size=10):
   for i in range(num_batches):
       logger.info(f"sending batch {i + 1}/{num_batches}")
       for _ in range(batch_size):
           log_message = generate_log_message()
           producer.send(topic, value=log_message)
       producer.flush()


if __name__ == "__main__":
   topic = "logs"
   send_log_batches(topic)
   producer.close()

When starting the producer, messages are sent in batches to the topic, as shown below:

INFO:kafka.conn:set configuration …
INFO:log_producer:sending batch 1/5 
INFO:log_producer:sending batch 2/5
INFO:log_producer:sending batch 3/5
INFO:log_producer:sending batch 4/5

Consumption and Indexing of Data with the Kafka Consumer

The consumer is designed to process messages efficiently, consuming batches from the logs topic and indexing them into Elasticsearch. With auto_offset_reset='latest', it ensures that the consumer starts processing the most recent messages, ignoring the older ones, and max_poll_records=10 limits the batch to 10 messages. With fetch_max_wait_ms=2000, the consumer waits up to 2 seconds to accumulate enough messages before processing the batch.

In its main loop, the consumer consumes log messages, processes, and indexes each batch into Elasticsearch, ensuring continuous data ingestion.

consumer = KafkaConsumer(
   'logs',                               
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='latest',            # Ensures reading from the latest offset if the group has no offset stored
   enable_auto_commit=True,               # Automatically commits the offset after processing
   group_id='log_consumer_group',         # specifies the consumer group to manage offset tracking
   max_poll_records=10,                   # Maximum number of messages per batch
   fetch_max_wait_ms=2000                 # Maximum wait time to form a batch (in ms)
)

def create_bulk_actions(logs):
   for log in logs:
       yield {
           "_index": "logs",
           "_source": {
               'level': log['level'],
               'message': log['message'],
               'timestamp': log['timestamp']
           }
       }

if __name__ == "__main__":
   try:
       print("starting message processing…")
       while True:

           messages = consumer.poll(timeout_ms=1000)  # Poll receive messages

           # process each batch messages
           for _, records in messages.items():
               logs = [json.loads(record.value) for record in records]
               bulk_actions = create_bulk_actions(logs)
               response = helpers.bulk(es, bulk_actions)
               print(f"Indexed {response[0]} logs.")
   except Exception as e:
       print(f"Erro: {e}")
   finally:
       consumer.close()
       print(f"Finish")

Visualizing Data in Kibana

With Kibana, we can explore and validate the data ingested from Kafka and indexed in Elasticsearch. By accessing Dev Tools in Kibana, you can view the indexed messages and confirm that the data is as expected. For example, if our Kafka producer sent 5 batches of 10 messages each, we should see a total of 50 records in the index.

To verify the data, you can use the following query in the Dev Tools section:

GET /logs/_search
{
  "query": {
    "match_all": {}
  }
}

Response:

Additionally, Kibana provides the ability to create visualizations and dashboards that can help make the analysis more intuitive and interactive. Below, you can see some examples of the dashboards and visualizations we created, which illustrate the data in various formats, enhancing our understanding of the information processed.

Data Ingestion with Kafka Connect

Kafka Connect is a service designed to facilitate integration between data sources and destinations (sinks), such as databases or file systems. It operates with predefined connectors that handle data movement automatically. In our case, Elasticsearch functions as the data sink.

Using Kafka Connect, we can simplify the data ingestion process, eliminating the need to manually implement the data ingestion workflow into Elasticsearch. With the appropriate connector, Kafka Connect allows data sent to a Kafka topic to be directly indexed in Elasticsearch with minimal setup and no additional coding required.

Working with Kafka Connect

To implement Kafka Connect, we’ll add the kafka-connect service to our Docker Compose setup. A key part of this configuration is installing the Elasticsearch connector, which will handle data indexing.

After configuring the service and creating the Kafka Connect container, a configuration file for the Elasticsearch connector will be needed. This file defines essential parameters such as:

  • connection.url: Connection URL for Elasticsearch.
  • topics: The Kafka topic the connector will monitor (in this case, "logs").
  • type.name: Document type in Elasticsearch (typically _doc).
  • value.converter: Converts Kafka messages to JsON format.
  • value.converter.schemas.enable: specifies whether the schema should be included.
  • schema.ignore and key.ignore: settings to ignore Kafka schemas and keys during indexing.

Below is the curl command to create the Elasticsearch connector in Kafka Connect:

curl --location '{{url}}/connectors' \
--header 'Content-Type: application/json' \
--data '{
    "name": "elasticsearch-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchsinkConnector",
        "topics": "logs",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "_doc",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "schema.ignore": "true",
        "key.ignore": "true"
    }
}'

With this configuration, Kafka Connect will automatically begin ingesting data sent to the "logs" topic and indexing it in Elasticsearch. This approach allows for fully automated data ingestion and indexing without requiring additional coding, thereby simplifying the entire integration process.

Conclusion

Integrating Kafka and Elasticsearch creates a powerful pipeline for real-time data ingestion and analysis. This guide provides a foundational approach for building a robust data ingestion architecture, with seamless visualization and analysis in Kibana, ready to adapt to more complex requirements in the future.

Furthermore, using Kafka Connect makes the integration between Kafka and Elasticsearch even more streamlined, eliminating the need for additional code to process and index data. Kafka Connect enables data sent to a specific topic to be automatically indexed in Elasticsearch with minimal configuration.

Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!

Elasticsearch is packed with new features to help you build the best search solutions for your use case. Dive into our sample notebooks to learn more, start a free cloud trial, or try Elastic on your local machine now.

Ready to build state of the art search experiences?

sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself