Kafka之spring kafka
Kafka之spring kafka
spring initializer dependency : Spring for Apache Kafka Messaging
Topic
使用KafkaAmin Bean , 可以自动创建topic (if not exists) - NewTopic
|
Producer
Key :KafkaTemplate which wraps a Producer instance and provides convenience methods for sending messages to Kafka topics.
- producer config : producer的配置参数
- Producer factory : producer
- kafka tempalte : 其他对象可以引入该bean,来发送msg
|
其他一些配置
// 重试次数,0为不启用重试机制 |
KafkaTemplate
// 注入 kafka template
private KafkaTemplate kafkaTemplate;
// 同步发送
public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
}
// 异步发送
public void sendMessageAsync(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
public void onFailure(Throwable throwable) {
System.out.println("success");
}
public void onSuccess(Object o) {
System.out.println("failure");
}
});
}
// 注入 kafka template |
Consumer
- ConsumerFactory
- KafkaListenterContainerFactory
consumerFactory
public ConsumerFactory<String, String> consumerFactory() { |
其他的配置:
// 是否自动提交offset偏移量(默认true) |
kafkaListenerContainerFactory
kafka container 工厂,负责创 建container,当使用@KafkaListener时需要提供。
消息监听器的两种实现类:
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer
delegates to one or more KafkaMessageListenerContainer
instances to provide multi-threaded consumption.
ConcurrentMessageListenerContainer
t also has a concurrency
property. For example, container.setConcurrency(3)
creates three KafkaMessageListenerContainer
instances.
PartitioAssignor

Lisntener
// Multiple listeners can be implemented for a topic, each with a different group Id. |
RecordFilter
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/adapter/RecordFilterStrategy.html
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
ConcurrentKafkaListenerContainerFactory<String, String> factory = |
https://www.jianshu.com/p/92487ba9052f
https://www.confluent.io/blog/apache-kafka-spring-boot-application/
https://thepracticaldeveloper.com/spring-boot-kafka-config/
https://www.baeldung.com/spring-kafka