Kafka之producer

Kafka之producer

Workflow

Producing msg by create ProducerRecord :

  • topic
  • msg record
  • (optional) key / partition

Producer will serialize key and value so they can sent over network

Then data is sent to partitioner, (if not specify partition) partitioner will choose partition based on msg key.

Once partiton is decided, producer add the records to a batch of records that will be send to same topic and partiton.

Broker receive msg, it send back response - RecordMetadta returned from broke contains topic, partion, offset within the partition.


Construct Producer

Basic Config:

bootstrap.server : host:port pais of brokers .

key.serializer: serialize key to byte array

value.serializer: serialize value to byte array

Sending mode

Fire and Forget

Could result in msg lost

Synchronous Sending

Returns a future object, call get (block) to check is send success or not.

Screen Shot 2020-12-21 at 4.43.44 PM

Exception:

  • Non-retriable error
  • Retries exhausted

Retriable error: eg when new leader is elected for partition , "No leader" error ... Could be resolved by retry, kafka can config auto-retry

Asynchronous Sending

Send method with callback option.

Screen Shot 2020-12-21 at 4.55.29 PM

Producer Configuration

Client Id

Will be used by the brokers to identify messages sent from the client.

ACK

多少个partition receive records before the producer can consider the write success.

  • Ack = 0 (高吞吐,但是可能会丢失消息)

    Will not wait for a reply from broker

  • Ack=1

    Will receive success response when leader replica recived the msg.

    如果leader not available (leader crash and new leader not elected yet), producer receive error response and retry; 但是还是有可能发生data loss - eg leader 收到msg然后crash, replica withou this msg elected as leader。

  • Ack = all

    Will receive success response when all in-sync replica recieved the msg

    Safest mode (make sure msg will survie from broker crash) , higher latency as waiting for more than one broker to receive the msg.

retries

Retry for transient error. retryies controls retry times; retry.backoff.ms control retry backoff.

batch.size

When multiple records are sent to same partition, producer batch send. Controls amount of memory in bytes that will be used for each batch.

linger.ms

cntrols the amount of time to wait for additional messages before sending the current batch.

Batch sending : 或者达到batch size 或者 linger.ms

max.in.flight.requests.per.connection

how many messages the producer will send to the server without receiving responses.

If order matters , setting to 1 。

Others

block.on.buffer.full, set memory buffer size, when sending faster then they can be delivered to broker, producer could be blocked or throw exception when producer run out of memory.

compression.type: by default, msg uncompressed. But


Serializer

kafka 支持serialize/deserializ for

String
Long
Double
Integer
Bytes

可自定义serializer

可使用apache avro

  • language independent schema
  • 可使用schema registry (store all the schemas used to write data to Kafka in the registry)
Screen Shot 2020-12-21 at 6.08.34 PM

partition

Key 的作用:

  • contains addtional info about msg
  • partitioner

when key is null, default partitioner is used. Random - round robin algorithm is used to balance msg among partitions.

when key is not null, default partitioner will hash key , map to specific partition.

Custom partition : eg b2b vendor , most of trx are with specific customer