Apache Kafka – Example of Producer Consumer Code with Java

A simple Kafka project structure

kafka-project-structure-in-eclipse

5 Easy steps !

1. Add the dependency in pom.xml if its a maven project.

        
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.0</version>
</dependency>
    

2. At first let’s declare an interface having Kafka configuration parameters as Constants.

        
package com.programmertoday.kafka;

public interface KafkaConstants {

  public static String KAFKA_BROKERS = "localhost:9092";
  public static Integer MESSAGE_COUNT = 10;
  public static String CLIENT_ID = "client_1";
  public static String TOPIC_NAME = "demotopic";
  public static String GROUP_ID_CONFIG = "consumerGroup_1";
  public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 10;
  public static String OFFSET_RESET_LATEST = "latest";
  public static String OFFSET_RESET_EARLIER = "earliest";
  public static Integer MAX_POLL_RECORDS = 1;

}
    

3. Create a producer class – let’s call it “KafkaProducerClass”

        
package com.programmertoday.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerClass {

	public static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); // for custom partitions
        return new KafkaProducer<>(props);
    }
}
        

4. Create a consumer class – let’s call it “KafkaConsumerClass”

        
package com.programmertoday.kafka;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaConsumerClass {

    public static Consumer<Long, String> createConsumer() {
          Properties props = new Properties();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
          props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
          props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KafkaConstants.MAX_POLL_RECORDS);
          props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
          Consumer<Long, String> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Collections.singletonList(KafkaConstants.TOPIC_NAME));
          return consumer;
      } 
}
    

5. Create a Kafka test client – with main method in it and we poll the records.

        
package com.programmertoday.kafka;

import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaClient {

  public static void main(String[] args) {

    System.out.println("Kakfa Test in progress...........");
    executeProducer(); // if running producer, please commit the below line.
    //executeConsumer(); // if running consumer, please comment to above line.
  }

  static void executeConsumer() {
    Consumer<Long, String> consumer = KafkaConsumerClass.createConsumer();
    int noMessageFound = 0;
    while (true) { // condition set to true so that it keeps on running
      ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
      // 1000 is the time in milliseconds consumer will wait if no record is found at
      // broker.
      if (consumerRecords.count() == 0) {
        noMessageFound++;
        if (noMessageFound > KafkaConstants.MAX_NO_MESSAGE_FOUND_COUNT)
          // If no message found count is reached to threshold exit loop.
          break;
        else
          continue;
      }
      // to print each record.
      consumerRecords.forEach(record -> {
        System.out.println("Record Key " + record.key());
        System.out.println("Record value " + record.value());
        System.out.println("Record partition " + record.partition());
        System.out.println("Record offset " + record.offset());
      });
      // commits the offset of record to broker.
      consumer.commitAsync();
    }
    consumer.close(); // to close the consumer
  }

  static void executeProducer() {
    Producer<Long, String> producer = KafkaProducerClass.createProducer();
    for (int index = 0; index < KafkaConstants.MESSAGE_COUNT; index++) {
      ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(KafkaConstants.TOPIC_NAME,
          "This is a kafka record " + index);
      try {
        RecordMetadata metadata = producer.send(record).get();
        System.out.println("Record sent with key " + index + " to partition " + metadata.partition()
            + " with offset " + metadata.offset());
      } catch (ExecutionException e) {
        System.out.println("Error in sending record");
        System.out.println(e);
      } catch (InterruptedException e) {
        System.out.println("Error in sending record");
        System.out.println(e);
      }
    }
  }

}
    

Summary

Now you know how to create kafka producer, kafka consumer in java, we hope you enjoyed learning this.


Leave a Comment