Kafka之java实现
Maven : https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
Doc : https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Topic
创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
|
1 partition , 1 replication factor
producer
config
- bootstrap.servers: broker
- Key serializer, Value serializer
public static void main(String[] args) {
Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
try { for (int i = 0; i < 100; i++) { System.out.println(">>>" + i); kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), "test msg - " + i)); } } catch (Exception e) { e.printStackTrace(); } finally { kafkaProducer.close(); } }
|
Consumer
- enable.auto.commit: enable 自动提交 offset
- auto.commit.interval.ms: 自动提交的间隔(毫秒)
public static void main(String[] args) {
Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("group.id", "test-group-0");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Collections.singletonList("test"));
try { while (true) { System.out.println("Start polling msg..."); ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (records.count() == 0) { System.out.println("No msg polled back..."); continue; } records.forEach(record -> { System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset()); });
} } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } }
|
https://www.devglan.com/apache-kafka/apache-kafka-java-example
http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html