Kafka之consumer

Kafka之consumer

Consumer and ConsumerGroup

Normally consumers are part of a consumer group - multiple consumers subscribed to a topic and belong to same consumer group, each consumer in the group will receive msg from a different subset of partitions in the topic.

Screen Shot 2020-12-21 at 9.34.16 PM Screen Shot 2020-12-21 at 9.34.37 PM

data consumption scaling - adding more member to a customer group (但当consumer多于partition的时候, 有consumer为idle), this is a good reason to create topic with large number of partition which allows adding more consumer when load increase.

多个consumer group (one of the main design goals in Kafka was to make the data produced to Kafka topics available for many use cases throughout the organization.)

Screen Shot 2020-12-21 at 9.45.21 PM

partition rebalance

Moving partition ownership from one consumer to another is called a rebalance, during rebalance consumer can't consume msg from broker , basically rebalance is a short window ofr unavailability for entire consumer group.

consumer maintain memmber ship in consumer group and ownership of partition assigned to them by sending heartbeat. If consumer stops sending heartbeats to group cooridinator -> session timeout and group corordinator consider it dead and trigger rebalance.

construct consumer

Tree mandatory properties: bootstrap.servers, key.deserializer, and value.deserializer.

group.id (not strictlt mandatory) - Sepecifies the consumer group the Kafka``Consumer instance belongs to.

Poll Loop

poll loop handles all details of coordination, partition rebalances, heartbeats, and data fetching,

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %d, offset = %d,"
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());

int updatedCount = 1;
if (custCountryMap.countainsKey(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)

JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close();
}

poll() returns a list of records. Record contains

  • topic and partition this record comes from
  • offset within the partition
  • key and value

ThreadSafe

One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread. (ExecutorService)


Configuration

fetch.min.bytes

specify the minimum amount of data that it wants to receive from the broker when fetching records.

例如如果broker 收到consumer fetch 的请求,新的records 数量小于fetch.min.bytes, broker 会等待。

fetch.max.wait.ms

fetch.min.bytes, you tell Kafka to wait until it has enough data to send before responding to the consumer. fetch.max.wait.ms lets you control how long to wait.

session.timeout.ms

The amount of time a consumer can be out of contact with the brokers while still considered alive defaults to 10 seconds.

相关的属性: heartbeat.interval.ms ontrols how frequently the KafkaConsumer poll() method will send a heartbeat to the group coordinator, 通常heartbeat.interval.ms 设置为session.timeout.ms 的三分之一。

auto.offset.reset

Controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid

  • Latest: start reading from newest records
  • Earliest: readong from the very beginning.

enbale.auto.commit

Whether consumer commits offsets automatially , default to true.

When setting to true, also you want to control how frequently offsets wull be committed by `auto.commit.interval.ms.`

partition.assignment.strategy

  • Range (default)

    eg consumer c1 and c2 subscripted to two topics T1 and T2, each topic has 3 partition. p0 and p1 from T1 & T2 assigned to c1 , p2 from T1 & T2 assigned to c2.

  • round robin

    Takes all the partitions from all subscribed topics and assigns them to consumers sequentially, one by one.

    Same example: C1 have p0 ,p2 for T1 and P1 for T2; C2 have P1 from T1 and p0, p2 from T2.

client.id

max.poll.records - Maximum number of records that a single call to poll() will return


Commits and Offsets

One of Kafka’s unique characteristics is that it does not track acknowledgments from consumers the way many JMS queues do. Instead, it allows consumers to use Kafka to track their position (offset) in each partition.

Commit - updating the current position in the partition (special topic __consumer_offsets)

rebalance

如果consumer crash or new consumer join - rebalance. Rebalance 后 consumer 可能会被assign 新的partition(s), 通过last commited offset 新的consumer决定从哪里开始读取消息。

If commited offset is smaller than the offset the last msg the client processed - duplicate

Screen Shot 2020-12-21 at 11.17.37 PM

If commit offset is larger than offset the last msg the client processed- data miss

Screen Shot 2020-12-21 at 11.18.42 PM

automatic commit

enable.auto.commit=true
auto.commit.interval.ms // default 5 seconds

Eery five seconds the consumer will commit the largest offset your client received from poll()

Auto commit 带来的问题 - duplicate

auto commit 虽然方便,但是由于开发人员无法进行控制可能导致一些问题, 例如每5秒commit offset, 假定commit 后的3秒 发生了rebalance, 新的consumer 会重复处理三秒这个间隔的消息(由于没来得及commit offset)

虽然可以通过auto.commit.interval.ms 缩小这个timewindow 但是无法完全避免。

commit current offset

enable.auto.commit=false

手动调用commitSync().

It is important to remember that commitSync() will commit the latest offset returned by poll(), so make sure you call commitSync() after you are done processing all the records in the collection, or you risk missing messages as described previously.

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset =
%d, customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value()); 1
}
try {
consumer.commitSync(); 2
} catch (CommitFailedException e) {
log.error("commit failed", e) 3
}
}

async commit

同步commit 的问题是consumer 会blocked知道broker 响应。

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(); 1
}

问题: commitSync会自动retry, async 不会。Asnyc commit 支持callback

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
}); 1
}

async 为什么不retry:因为async 不需要等待可以继续处理新消息, 假如offset 2000 提交没有响应(网络问题),但是后续处理offset3000 并commit 成功。 如果重试可能导致2000 覆盖3000, 重复处理消息。

combine sync and async commit

Normally occasion failure to commit withou retry is not huge problem, but if e know that this is the last commit before we close the consumer, or before a rebalance, we want to make extra sure that the commit succeeds

a common pattern is to combine commitAsync() with commitSync() just before shutdown.

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync(); 1
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(); 2
} finally {
consumer.close();
}
}

1

Rebalance Listeners

If you know your consumer is about to lose ownership of a partition, you will want to commit offsets of the last event you’ve processed. Perhaps you also need to close file handles, database connections, and such.

Screen Shot 2020-12-21 at 11.42.17 PM
private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener { 1
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) { 2
}

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. " +
"Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets); 3
}
}

try {
consumer.subscribe(topics, new HandleRebalance()); 4

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()+1, null));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}

Key: consumer.subscribe(topics, new HandleRebalance());


Exit poll loop

consumer.wakeup() is the only consumer method that is safe to call from a different thread- poll() exit with WakeupException

WakeupException 不需要handle,before exiting the thread,

you must call consumer.close() and commit offset (通常sync ,有retry)


Standalone Consumer:

通常的做法是使用consumer group, partition 自动分配,自动的rebalance 当consumer 被添加或移除出组。 但有一些场景,只需一个consumer 来读取所有或者特定的partition。因此可以使用单个consumer 并且直接指定parttion。

区别使用consumer.assign(partitions); 直接指定topic的partition , 而不是subscribe topic。

When you know exactly which partitions the consumer should read, you don’t subscribe to a topic—instead, you assign yourself a few partitions. A consumer can either subscribe to topics (and be part of a consumer group), or assign itself partitions, but not both at the same time.

consumer can assign itself all partitions of a specific topic and consume from them

List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic"); 1

if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(),
partition.partition()));
consumer.assign(partitions); 2

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);

for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitSync();
}
}

1