In this article, we show how to integrate Apache Kafka with Elasticsearch for data ingestion and indexing. We will provide an overview of Kafka, its concept of producers and consumers, and we will create a logs index where messages will be received and indexed through Apache Kafka. The project is implemented in Python, and the code is available on GitHub.
Prerequisites
- Docker and Docker Compose: Ensure you have Docker and Docker Compose installed on your machine.
- Python 3.x: To run the Producer and Consumer scripts.
A brief introduction to Apache Kafka
Apache Kafka is a distributed streaming platform that enables high scalability and availability, as well as fault tolerance. In Kafka, data management occurs through the main components:
- Broker: responsible for storing and distributing messages between producers and consumers.
- Zookeeper: manages and coordinates the Kafka brokers, controlling the state of the cluster, partition leaders, and consumer information.
- Topics: channels where data is published and stored for consumption.
- Consumers and Producers: while producers send data to the topics, consumers retrieve that data.
These components work together to form the Kafka ecosystem, providing a robust framework for data streaming.
Project structure
To understand the data ingestion process, we divided it into stages:
- Infrastructure Provisioning: setting up the Docker environment to support Kafka, Elasticsearch, and Kibana.
- Producer Creation: implementing the Kafka Producer, which sends data to the logs topic.
- Consumer Creation: developing the Kafka Consumer to read and index messages in Elasticsearch.
- Ingestion Validation: verifying and validating the sent and consumed data.
Infrastructure Configuration with Docker Compose
We utilized Docker Compose to configure and manage the necessary services. Below, you will find the Docker Compose code that sets up each service required for the integration of Apache Kafka, Elasticsearch, and Kibana, ensuring a data ingestion process.
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTIsED_LIsTENERs: PLAINTEXT://kafka:29092,PLAINTEXT_HOsT:${HOsT_IP}:9092
KAFKA_LIsTENER_sECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOsT:PLAINTEXT
KAFKA_INTER_BROKER_LIsTENER_NAME: PLAINTEXT
KAFKA_OFFsETs_TOPIC_REPLICATION_FACTOR: 1
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1
container_name: elasticsearch-8.15.1
environment:
- node.name=elasticsearch
- xpack.security.enabled=false
- discovery.type=single-node
- "Es_JAVA_OPTs=-Xms512m -Xmx512m"
volumes:
- ./elasticsearch:/usr/share/elasticsearch/data
ports:
- 9200:9200
kibana:
image: docker.elastic.co/kibana/kibana:8.15.1
container_name: kibana-8.15.1
ports:
- 5601:5601
environment:
ELAsTICsEARCH_URL: http://elasticsearch:9200
ELAsTICsEARCH_HOsTs: '["http://elasticsearch:9200"]'
You can access the file directly from the Elasticsearch Labs GitHub repo.
Data sending with the Kafka Producer
The producer is responsible for sending messages to the logs topic. By sending messages in batches, it increases network usage efficiency, allowing optimizations with the batch_size
and linger_ms
settings, which control the quantity and latency of the batches, respectively. The configuration acks='all'
ensures that messages are stored durably, which is essential for important log data.
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'], # specifies the Kafka server to connect
value_serializer=lambda x: json.dumps(x).encode('utf-8'), # serializes data as JsON and encodes it to UTF-8 before sending
batch_size=16384, # sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending
linger_ms=10, # sets the maximum delay (in milliseconds) before sending the batch
acks='all' # specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge
)
def generate_log_message():
levels = ["INFO", "WARNING", "ERROR", "DEBUG"]
messages = [
"User login successful",
"User login failed",
"Database connection established",
"Database connection failed",
"service started",
"service stopped",
"Payment processed",
"Payment failed"
]
log_entry = {
"level": random.choice(levels),
"message": random.choice(messages),
"timestamp": time.time()
}
return log_entry
def send_log_batches(topic, num_batches=5, batch_size=10):
for i in range(num_batches):
logger.info(f"sending batch {i + 1}/{num_batches}")
for _ in range(batch_size):
log_message = generate_log_message()
producer.send(topic, value=log_message)
producer.flush()
if __name__ == "__main__":
topic = "logs"
send_log_batches(topic)
producer.close()
When starting the producer, messages are sent in batches to the topic, as shown below:
INFO:kafka.conn:set configuration …
INFO:log_producer:sending batch 1/5
INFO:log_producer:sending batch 2/5
INFO:log_producer:sending batch 3/5
INFO:log_producer:sending batch 4/5
Consumption and Indexing of Data with the Kafka Consumer
The consumer is designed to process messages efficiently, consuming batches from the logs topic and indexing them into Elasticsearch. With auto_offset_reset='latest'
, it ensures that the consumer starts processing the most recent messages, ignoring the older ones, and max_poll_records=10
limits the batch to 10 messages. With fetch_max_wait_ms=2000
, the consumer waits up to 2 seconds to accumulate enough messages before processing the batch.
In its main loop, the consumer consumes log messages, processes, and indexes each batch into Elasticsearch, ensuring continuous data ingestion.
consumer = KafkaConsumer(
'logs',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest', # Ensures reading from the latest offset if the group has no offset stored
enable_auto_commit=True, # Automatically commits the offset after processing
group_id='log_consumer_group', # specifies the consumer group to manage offset tracking
max_poll_records=10, # Maximum number of messages per batch
fetch_max_wait_ms=2000 # Maximum wait time to form a batch (in ms)
)
def create_bulk_actions(logs):
for log in logs:
yield {
"_index": "logs",
"_source": {
'level': log['level'],
'message': log['message'],
'timestamp': log['timestamp']
}
}
if __name__ == "__main__":
try:
print("starting message processing…")
while True:
messages = consumer.poll(timeout_ms=1000) # Poll receive messages
# process each batch messages
for _, records in messages.items():
logs = [json.loads(record.value) for record in records]
bulk_actions = create_bulk_actions(logs)
response = helpers.bulk(es, bulk_actions)
print(f"Indexed {response[0]} logs.")
except Exception as e:
print(f"Erro: {e}")
finally:
consumer.close()
print(f"Finish")
Visualizing Data in Kibana
With Kibana, we can explore and validate the data ingested from Kafka and indexed in Elasticsearch. By accessing Dev Tools in Kibana, you can view the indexed messages and confirm that the data is as expected. For example, if our Kafka producer sent 5 batches of 10 messages each, we should see a total of 50 records in the index.
To verify the data, you can use the following query in the Dev Tools section:
GET /logs/_search
{
"query": {
"match_all": {}
}
}
Response:
Additionally, Kibana provides the ability to create visualizations and dashboards that can help make the analysis more intuitive and interactive. Below, you can see some examples of the dashboards and visualizations we created, which illustrate the data in various formats, enhancing our understanding of the information processed.
Data Ingestion with Kafka Connect
Kafka Connect is a service designed to facilitate integration between data sources and destinations (sinks), such as databases or file systems. It operates with predefined connectors that handle data movement automatically. In our case, Elasticsearch functions as the data sink.
Using Kafka Connect, we can simplify the data ingestion process, eliminating the need to manually implement the data ingestion workflow into Elasticsearch. With the appropriate connector, Kafka Connect allows data sent to a Kafka topic to be directly indexed in Elasticsearch with minimal setup and no additional coding required.
Working with Kafka Connect
To implement Kafka Connect, we’ll add the kafka-connect service to our Docker Compose setup. A key part of this configuration is installing the Elasticsearch connector, which will handle data indexing.
After configuring the service and creating the Kafka Connect container, a configuration file for the Elasticsearch connector will be needed. This file defines essential parameters such as:
connection.url
: Connection URL for Elasticsearch.topics
: The Kafka topic the connector will monitor (in this case, "logs").type.name
: Document type in Elasticsearch (typically _doc).value.converter
: Converts Kafka messages to JsON format.value.converter.schemas.enable
: specifies whether the schema should be included.schema.ignore
andkey.ignore
: settings to ignore Kafka schemas and keys during indexing.
Below is the curl
command to create the Elasticsearch connector in Kafka Connect:
curl --location '{{url}}/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchsinkConnector",
"topics": "logs",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"key.ignore": "true"
}
}'
With this configuration, Kafka Connect will automatically begin ingesting data sent to the "logs" topic and indexing it in Elasticsearch. This approach allows for fully automated data ingestion and indexing without requiring additional coding, thereby simplifying the entire integration process.
Conclusion
Integrating Kafka and Elasticsearch creates a powerful pipeline for real-time data ingestion and analysis. This guide provides a foundational approach for building a robust data ingestion architecture, with seamless visualization and analysis in Kibana, ready to adapt to more complex requirements in the future.
Furthermore, using Kafka Connect makes the integration between Kafka and Elasticsearch even more streamlined, eliminating the need for additional code to process and index data. Kafka Connect enables data sent to a specific topic to be automatically indexed in Elasticsearch with minimal configuration.
Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!
Elasticsearch is packed with new features to help you build the best search solutions for your use case. Dive into our sample notebooks to learn more, start a free cloud trial, or try Elastic on your local machine now.