总的来说,Kafka Producer是将数据发送到kafka集群的客户端。其组成部分如下图所示:
基本组件:
kafka producer有三个必须指定的参数:
Kafka Producer异步发送消息,返回一个Future,代表发送结果。 此外,用户可以选择提供在 Kafka broker确认记录时调用的回调。 虽然它看起来很简单,但在背后却发生了一些事情。
此时,消息还在内存中,并没有发送给Kafka broker。 Record Accumulator 按主题和分区对内存中的消息进行分组。
Sender线程将具有相同broker的多个批次作为领导者分组到请求中并发送它们。 此时,消息被发送到Kafka。
Kafka Producer 提供配置参数来控制在各个阶段花费的时间:
用户可以通过 acks 配置参数控制写入 Kafka 的消息的持久性。 允许的值为:
使用 acks=all 时,对于同步副本有一些细微差别需要澄清。 在 Kafka 方面,两个设置和当前状态会影响行为:
min.insync.replicas 指定 acks=all 请求的同步副本的最小阈值。 如果不能满足这个要求,Broker 会拒绝 producer 的请求,甚至不尝试写等待确认。 下表说明了可能的情况。
在瞬时故障期间,同步副本数可能低于副本总数,但只要它大于或等于 min.insync.replicas——带有 acks=all 的请求就会成功。
用户可以通过重新发送失败的请求来减轻暂时性故障并增加持久性。 这可以通过重试(默认 MAX_INT)和 delivery.timeout.ms(默认 120000)设置来实现。 重试可能会导致消息重复并改变消息的顺序。 这些副作用可以通过设置 enable.idempotence=true 来减轻,但它是以降低吞吐量为代价的。
主题中的消息被组织成分区。 用户可以通过消息键或可插入的 ProducerPartitioner 实现来控制分区分配。 Partitioner 可以使用 partitioner.class 配置来设置,它应该是一个实现 org.apache.kafka.clients.producer.Partitioner 接口的完全限定类名。
Kafka 提供了三种开箱即用的实现:DefaultPartitioner、RoundRobinPartitioner 和 UniformStickyPartitioner。
DefaultPartitioner——如果消息键为空——使用当前分区,并在下一批中更改。 对于非空键,它使用以下公式计算:murmur2hash(key) % total nr of topic partitions。
RoundRobinPartitioner — 忽略消息键,以循环方式在所有活动分区之间平均分配消息。 如果分区有一个指定的代理作为领导者,则分区被认为是活跃的。
UniformStickyPartitioner——忽略消息键,使用当前分区,并在下一批更改分区。