Kafka之ReliableDelivery

Kafka之ReliableDelivery

**Apache Kafka guarantee*

  1. order guarantee of msg within partition
  2. produced msg considered "commited" when they are written to the partition and all its in sync replica (取决于producer ack的设置0,1 or all)
  3. consumer only read msg that are commited

Reliable Delivery Considerations:

Trade-off - how important to reliably and consistently store msg versus considerations of availability, high throughpyt , low latency


Replication

Replication is the core for reliable delivery

Each topic is broken down into partitions, each partition can have multiple replicas, one of which is designed leader and all events from proucer & consumer goes to leader. Other replica just neet to stay in sync with leader. If leader becoms unavailable then one of in-sync replica will becoms new leader.


Broker Configs

  • broker level
  • Topic level

replication.factor

Eg setting to 3, meaning that each partition is replicated three times on three different brokers.

how to decide N: Based on how critical a topic is and how much you are willing to pay for higher availability.

recommend a replication factor of 3 for any topic where availability is an issu

Placement of replicas is also very important:

  • by default, Kafka will make sure each replica for a partition is on a separate broker.
  • To protect against rack-level misfortune, we recommend placing brokers in multiple racks and using the broker.rack broker configuration parameter to configure the rack name for each broker.

unclean.leader.election.enable

when the leader for a partition is no longer available, one of the in-sync replicas will be chosen as the new leader. - clean election

But what do we do when no in-sync replica exists except for the leader that just became unavailable? if we allow out-of-sync replicas to become leaders, we risk data loss and data inconsistencies.

min.insync.replicas

If a topic has three replicas and you set min.insync.replicas to 2, then you can only write to a partition in the topic if at least two out of the three replicas are in-sync.

eg 3 replicas and min.insync.replicas=2

  • All replica in sync - perfect
  • if 2 out of three replca are not available, producer recieve NotEnoughReplicasException and consumer can only read existing data . The single replica becomes readonly

关于producers

use case 1

  • broker config : 3 replicas, unclean leader election disabled

  • Producer config: acks =1.

Msg write to leader , but not yet to in-sync replicas. leader send back success but crash before replicate to other replicas

Producer think it's written , consumer never can read this msg. - Msg  loss

Use Case 2

  • broker config : 3 replicas, unclean leader election disabled

  • Producer config: acks = all

attempting to write msg when leader crash and election in progress - "Leader no avaiable" - producer should handle error and retry if needed to avaoid msg loss.

关键点

  • acks的设置
  • producer handle error

producer retries

Erro 类别

  • Errors that can be resolved after retrying (eg broker related INVALID_CONFIG error)

  • Errors that won’t be resolved. (eg LEADER_NOT_AVAILABLE is a retriable error.)

producer 应该retry 对于retriable error, retry times 设置取决于场景, eg retry endlessly is ok if never want to drop msg.

Issue:

Retry可能导致duplicate , 例如网络问题producer 没收到 broker 发回来的acknowledgement ,但是broker 写入成功并且完成replica。 因此应用设计要考虑幂等。


关于consumer

The only thing consumer need to do is make sure they keep track of which messages they’ve read and which messages they haven’t. - 也就是consumer 关注的主要应该是offset commit

When reading data from a partition, a consumer is fetching a batch of events, checking the last offset in the batch, and then requesting another batch of events starting from the last offset received.

When a consumer stops, another consumer needs to know where to pick up the work—what was the last offset that the previous consumer processed before it stopped?

丢失消息: 提交的commit 大于当前处理的消息

重复消息: 提交的commit 小于当前处理的消息

group.id

mulitple consumer have same groupID and subscribe same topic , each will be assigned a subset of partitions in the topic and only read subs of msg individually. (all msg will be read by the group as a whole).

auto.offset.reset

controls what the consumer will do when no offsets were committe

  • when consumer first start
  • when consumer as for offsets that don't exist (rebalance)

earliest: start from the beginning of the partition whenever it doesn’t have a valid offset. This can lead to the consumer processing a lot of messages twice, but it guarantees to minimize data loss.

latest: the consumer will start at the end of the partition. This minimizes duplicate processing by the consumer but almost certainly leads to some messages getting missed by the consumer.

enable.auto.commit

consumer don't need to care about implementing commit offset, drawbacks no control over the number of duplicate records you may need to process , eg consumer stopped after processing some records but before the automated commit kicked in

related config: auto.commit.interval.ms

explicitly commit offset

  • Always commit offsets after events were processed

  • Commit frequency is a trade-off between performance and number of duplicates in the event of a crash

  • Make sure you know exactly what offsets you are committing

    commit latest offset processed , otherwise commit offset from poll but not processed can leads to missing msg

  • rebalance

    Rebalance will happen, commit before partition are revoked (eg sync or using listener)

retry in consumer

you commit offsets and not ack individual messages. This means that if you failed to process record #30 and succeeded in processing record #31, you should not commit record #31—this would result in committing all the records up to #31 including #30, which is usually not what you want

solution:

当碰到一个retriable error, commit last record you processed and store recors in a buffer , use pause to make sure no new data retruend for future polling but consumer will not considered left consumergroup (heartbeat)

或者 把没处理的的msg 发送到一个单独topic, 一个单独的consumer group 处理retry。

handle long priocessing

use threadpool , pause after handle processing to worker threads , watit util processing finished in worker thread


Exactly-once delivery

create a unique using topic + partiton + offset

idempotent writes


monitoring

jmx - errpr rate nad retry rate

consumer lag — Ideally, the lag would always be zero and the consumer will always read the latest message. 因为通常大部分应用需要实时数据, 如果lag 过大表明不是实时数据可能已经对于当前业务没有意义。