A simple Kafka project structure

5 Easy steps !
1. Add the dependency in pom.xml if its a maven project.
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
2. At first let’s declare an interface having Kafka configuration parameters as Constants.
package com.programmertoday.kafka;
public interface KafkaConstants {
public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT = 10;
public static String CLIENT_ID = "client_1";
public static String TOPIC_NAME = "demotopic";
public static String GROUP_ID_CONFIG = "consumerGroup_1";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 10;
public static String OFFSET_RESET_LATEST = "latest";
public static String OFFSET_RESET_EARLIER = "earliest";
public static Integer MAX_POLL_RECORDS = 1;
}
3. Create a producer class – let’s call it “KafkaProducerClass”
package com.programmertoday.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerClass {
public static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); // for custom partitions
return new KafkaProducer<>(props);
}
}
4. Create a consumer class – let’s call it “KafkaConsumerClass”
package com.programmertoday.kafka;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerClass {
public static Consumer<Long, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KafkaConstants.MAX_POLL_RECORDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(KafkaConstants.TOPIC_NAME));
return consumer;
}
}
5. Create a Kafka test client – with main method in it and we poll the records.
package com.programmertoday.kafka;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaClient {
public static void main(String[] args) {
System.out.println("Kakfa Test in progress...........");
executeProducer(); // if running producer, please commit the below line.
//executeConsumer(); // if running consumer, please comment to above line.
}
static void executeConsumer() {
Consumer<Long, String> consumer = KafkaConsumerClass.createConsumer();
int noMessageFound = 0;
while (true) { // condition set to true so that it keeps on running
ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
// 1000 is the time in milliseconds consumer will wait if no record is found at
// broker.
if (consumerRecords.count() == 0) {
noMessageFound++;
if (noMessageFound > KafkaConstants.MAX_NO_MESSAGE_FOUND_COUNT)
// If no message found count is reached to threshold exit loop.
break;
else
continue;
}
// to print each record.
consumerRecords.forEach(record -> {
System.out.println("Record Key " + record.key());
System.out.println("Record value " + record.value());
System.out.println("Record partition " + record.partition());
System.out.println("Record offset " + record.offset());
});
// commits the offset of record to broker.
consumer.commitAsync();
}
consumer.close(); // to close the consumer
}
static void executeProducer() {
Producer<Long, String> producer = KafkaProducerClass.createProducer();
for (int index = 0; index < KafkaConstants.MESSAGE_COUNT; index++) {
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(KafkaConstants.TOPIC_NAME,
"This is a kafka record " + index);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key " + index + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
} catch (ExecutionException e) {
System.out.println("Error in sending record");
System.out.println(e);
} catch (InterruptedException e) {
System.out.println("Error in sending record");
System.out.println(e);
}
}
}
}
Summary
Now you know how to create kafka producer, kafka consumer in java, we hope you enjoyed learning this.