Kafka之java实现

Kafka之java实现

Maven : https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

Doc : https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Topic

创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

1 partition , 1 replication factor

producer

config

  • bootstrap.servers: broker
  • Key serializer, Value serializer
public static void main(String[] args) {

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer(properties);

try {
for (int i = 0; i < 100; i++) {
System.out.println(">>>" + i);
kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), "test msg - " + i));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
}
}

Consumer

  • enable.auto.commit: enable 自动提交 offset
  • auto.commit.interval.ms: 自动提交的间隔(毫秒)

public static void main(String[] args) {

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("group.id", "test-group-0");


KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singletonList("test"));

try {
while (true) {
System.out.println("Start polling msg...");
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

if (records.count() == 0) {
System.out.println("No msg polled back...");
continue;
}
records.forEach(record -> {
System.out.printf("Consumer Record:(%s, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});

}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}

https://www.devglan.com/apache-kafka/apache-kafka-java-example

http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html

http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html