About cookies on this site Our websites require some cookies to function properly (required). In addition, other cookies may be used with your consent to analyze site usage, improve the user experience and for advertising. For more information, please review your options. By visiting our website, you agree to our processing of information as described in IBM’sprivacy statement. To provide a smooth navigation, your cookie preferences will be shared across the IBM web domains listed here.
Tutorial
Get hands on experience with an IBM Event Streams Java sample application
Review the code and learn how to write client code that produces and consumes messages from Apache Kafka
To get some hands on experience with a sample application, we are going to review the Java Console sample in the event-streams-samples repository.
In this tutorial, we focus specifically on the Local Development sample.
We will look at how to write client code, so that you learn how to produce and consume messages from Apache Kafka.
Prerequisites
- General overview of Apache Kafka
- IBM Cloud account
- For the sample application, you must also have installed Git, Gradle, and Java 8 or higher.
- Completed the steps for creating an Event Streams instance and running a sample application.
Overview of the Java sample
We are not going to walk through every line of code in the sample in this tutorial, but it is worth explaining the structure of the code in the sample.
The code can be found in the kafka-java-console-sample folder in the event-streams-samples GitHub repo.
Main method: EventStreamsConsoleSample.java
The EventStreamsConsoleSample.java file contains the main method. It does the following:
- Parses and validates the command line arguments.
- Checks that the topic that the sample is working against exists, and creates it if it does not exist.
- Starts the clients on different threads. The sample can either run a producer, a consumer, or both at once. They are started and will run until the user cancels them (using Ctrl-C).
Producer
A producer is an application that publishes streams of messages to Kafka topics. You learned all about producers in our "Apache Kafka fundamentals" article.
Producer Configuration properties
For this sample, the producer configuration is built in EventStreamsConsoleSample.java in the getProducerConfigs() method, which builds on some common configuration that is used across all clients and sets a small number of producer-specific configuration properties.
These configuration properties are worth noting:
Serializers (and deserializers)
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // key.serializer configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value.serializerThese are the serializers used for the message key and value that is produced. In the sample, a simple string is used for both so the Kafka-provided StringSerializer gives us what we need. Note that the consumer must have matching deserializers.
Acknowledgements (
acks)configs.put(ProducerConfig.ACKS_CONFIG, "all"); // acksWith
acksset toallthe producer requires all in-sync replicas to have received the message. The leader will send acknowledgement only when all in-sync replicas have confirmed the message has been safely written. This is the most durable option, however this is at the cost of increased latency.
Read more about important producer configuration settings in the Event Streams on IBM Cloud documentation. For the complete documentation of all producer configuration see the Apache Kafka documentation. But, be warned, there are loads of configuration options that you might be tempted to change. We suggest that you stick to a few until you are comfortable with the behavior of the application.
ProducerRunnable.java
ProducerRunnable.java implements Runnable and is therefore run in its own thread.
The constructor creates a new instance of KafkaProducer based on the provided configuration.
// Create a Kafka producer with the provided client configuration
kafkaProducer = new KafkaProducer<>(producerConfigs);
The run() function is where the actual work is done. You will notice that the thread runs in a while loop, checking whether the application is shutting down via the closing variable.
A ProducerRecord is constructed to represent the message to be produced. In the comment, it notes that the sample application uses the default partitioner.
// If a partition is not specified, the client will use the default partitioner to choose one.
ProducerRecord<String, String> record = new ProducerRecord<>(topic,key,message);
In other cases, you may want to be in control of determining the partition yourself, which you can see an example of in the Kafka javadoc.
In the sample, the ProducerRecord is sent asynchronously, then immediately blocks waiting for the acknowledgement. This is sufficient for demonstration purposes in the sample, however it is unlikely to be the required behavior in a real-world application because of the performance implications. When looking at the requirements for your application, you should consider how you want your producer to behave when sending messages and processing the acknowledgement.
// Send record asynchronously
Future<RecordMetadata> future = kafkaProducer.send(record);
// Synchronously wait for a response from Event Streams / Kafka on every message produced.
// For high throughput the future should be handled asynchronously.
RecordMetadata recordMetadata = future.get(5000, TimeUnit.MILLISECONDS);
Consumer
A consumer reads messages from one or more topics and processes them. You learned all about consumers in our "Apache Kafka fundamentals" article.
Consumer Configuration
For this sample, the consumer configuration is built in EventStreamsConsoleSample.java in the getConsumerConfigs() method. This method builds on some common configuration that is used across all clients. It also has similar configuration to the producer, such as the deserializer for key, value. However, the method does set a small number of consumer-specific configuration properties, such as:
group.idconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-console-sample-group"); // group.idThe
group.idproperty controls the consumer group which this consumer is part of. It will either join an existing group or create a new one as required.auto.offset.resetconfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // auto.offset.resetThe
auto.offset.resetproperty determines what to do when the current offset for this consumer is no longer present on the server, or there is no initial offset.latestmeans that the current offset is automatically set to the latest offset on the partition, that is, the consumer will consume from the latest records.
Read more about important consumer configuration settings in the Event Streams on IBM Cloud documentation. For full documentation of all consumer configuration, see the Apache Kafka documentation. But, be warned that there are loads of configuration options that you might be tempted to change. We suggest that you stick to a few until you are comfortable with the behavior of the application.
ConsumerRunnable.java
Like the producer, ConsumerRunnable.java implements Runnable and is therefore run in its own thread.
The constructor create a new instance of KafkaConsumer based on the provided configuration.
// Create a Kafka consumer with the provided client configuration
kafkaConsumer = new KafkaConsumer<>(consumerConfigs);
Again, like the producer, most of the logic is inside the run() function with logic to identify if the application is being shutdown.
The consumer polls to see if there are any ConsumerRecords available. This is a collection and all available messages will be returned. If nothing is received within 3 seconds, the consumer finishes the poll() and logs that there were no messages consumed.
// Poll on the Kafka consumer, waiting up to 3 secs if there's nothing to consume.
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000L));
if (records.isEmpty()) {
logger.info("No messages consumed");
}
If the consumer did receive some messages, the sample application simply loops over each message and prints the contents of each one.
for (ConsumerRecord<String, String> record : records) {
logger.info("Message consumed: {}", record);
}
Time to get creative?!
Now that you understand more about the Java sample and how the clients work, it is time to play around with the code a bit. Clone the event-streams-samples code and navigate into the kafka-java-console-sample folder, explore the docs, and get ready to play with the code.
Start by experimenting with starting and stopping the clients independently. What happens if you stop consuming messages for a period then start again?
How about you try modifying the client code? For example, what would happen if the consumer was doing some lengthy processing of each message it reads? You could recreate this by adding a sleep to see what happens.
While you are experimenting be sure to refer to the client configuration documentation to understand what options are available to you as the developer of kafka based applications:
Summary and next steps
In this tutorial, we walked through the Java sample application and learned what it does. Hopefully, you also played around with the sample code and gained more understanding about how producer and consumer code works.
You're now ready to take on the IBM Event Streams coding challenge and write a consumer app.
Or, perhaps you're ready to learn how to debug your application?