Kafka之producer
Kafka之producer
Workflow
Producing msg by create ProducerRecord
:
- topic
- msg record
- (optional) key / partition
Producer will serialize key and value so they can sent over network
Then data is sent to partitioner, (if not specify partition) partitioner will choose partition based on msg key.
Once partiton is decided, producer add the records to a batch of records that will be send to same topic and partiton.
Broker receive msg, it send back response - RecordMetadta returned from broke contains topic, partion, offset within the partition.
Construct Producer
Basic Config:
bootstrap.server
: host:port pais of brokers .
key.serializer
: serialize key to byte array
value.serializer
: serialize value to byte array
Sending mode
Fire and Forget
Could result in msg lost
Synchronous Sending
Returns a future
object, call get
(block) to check is send success or not.

Exception:
- Non-retriable error
- Retries exhausted
Retriable error: eg when new leader is elected for partition , "No leader" error ... Could be resolved by retry, kafka can config auto-retry
Asynchronous Sending
Send method with callback option.

Producer Configuration
Client Id
Will be used by the brokers to identify messages sent from the client.
ACK
多少个partition receive records before the producer can consider the write success.
Ack = 0 (高吞吐,但是可能会丢失消息)
Will not wait for a reply from broker
Ack=1
Will receive success response when leader replica recived the msg.
如果leader not available (leader crash and new leader not elected yet), producer receive error response and retry; 但是还是有可能发生data loss - eg leader 收到msg然后crash, replica withou this msg elected as leader。
Ack = all
Will receive success response when all in-sync replica recieved the msg
Safest mode (make sure msg will survie from broker crash) , higher latency as waiting for more than one broker to receive the msg.
retries
Retry for transient error. retryies
controls retry times; retry.backoff.ms
control retry backoff.
batch.size
When multiple records are sent to same partition, producer batch send. Controls amount of memory in bytes that will be used for each batch.
linger.ms
cntrols the amount of time to wait for additional messages before sending the current batch.
Batch sending : 或者达到batch size 或者 linger.ms
max.in.flight.requests.per.connection
how many messages the producer will send to the server without receiving responses.
If order matters , setting to 1 。
Others
block.on.buffer.full
, set memory buffer size, when sending faster then they can be delivered to broker, producer could be blocked or throw exception when producer run out of memory.
compression.type
: by default, msg uncompressed. But
Serializer
kafka 支持serialize/deserializ for
String
Long
Double
Integer
Bytes
可自定义serializer
可使用apache avro
- language independent schema
- 可使用schema registry (store all the schemas used to write data to Kafka in the registry)

partition
Key 的作用:
- contains addtional info about msg
- partitioner
when key is null, default partitioner is used. Random - round robin algorithm is used to balance msg among partitions.
when key is not null, default partitioner will hash key , map to specific partition.
Custom partition : eg b2b vendor , most of trx are with specific customer