Kafka之spring kafka

Kafka之spring kafka

spring initializer dependency : Spring for Apache Kafka Messaging

Topic

使用KafkaAmin Bean , 可以自动创建topic (if not exists) - NewTopic

@Configuration
public class KafkaTopicConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
return new NewTopic("baeldung", 1, (short) 1);
}
}

Producer

Key :KafkaTemplate which wraps a Producer instance and provides convenience methods for sending messages to Kafka topics.

  • producer config : producer的配置参数
  • Producer factory : producer
  • kafka tempalte : 其他对象可以引入该bean,来发送msg
@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

其他一些配置

// 重试次数,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//同步到replca, 默认为1
// acks=0 把消息发送到kafka就认为发送成功
// acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
// acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG, 1);
// 生产者空间不足时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
// 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none")

KafkaTemplate

// 注入 kafka template
@Autowired
private KafkaTemplate kafkaTemplate;

// 同步发送
public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
}

// 异步发送
public void sendMessageAsync(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("success");
}

@Override
public void onSuccess(Object o) {
System.out.println("failure");

}
});
}

Consumer

  • ConsumerFactory
  • KafkaListenterContainerFactory

consumerFactory

public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

其他的配置:

// 是否自动提交offset偏移量(默认true) 
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交的频率(ms)
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超时设置
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// offset偏移量规则设置:
// (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

kafkaListenerContainerFactory

kafka container 工厂,负责创 建container,当使用@KafkaListener时需要提供。

消息监听器的两种实现类:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

ConcurrentMessageListenerContainer

t also has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.

PartitioAssignor

Screen Shot 2020-12-23 at 11.09.31 AM

Lisntener

// Multiple listeners can be implemented for a topic, each with a different group Id. 
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}

// one consumer can listen for messages from various topics:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")

// HEADER
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}


RecordFilter

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/adapter/RecordFilterStrategy.html

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;

https://www.jianshu.com/p/92487ba9052f

https://www.confluent.io/blog/apache-kafka-spring-boot-application/

https://thepracticaldeveloper.com/spring-boot-kafka-config/

https://www.baeldung.com/spring-kafka