Appearance
Producer生产者实战
一个 topic 有多个 partition分区,每个分区又有多个副本,那么使用消息生产者Producer发送消息,如何指定发送到那个分区呢?
Producer发送消息原理剖析如下图所示,Kafka客户端会将消息先暂存在本地,等待收集到足够的数量后再一次性发送到服务器。这样可以减少网络开销,提高整体性能,一般来说有两个决定性因素:缓冲区大小和缓冲时间,无论满足那一条都会触发发送逻辑;
md
功能:负责消息的创建与预处理。
Product:用户调用生产者API生成原始消息。
Interceptor拦截器:在消息发送前进行自定义处理(如日志、过滤)。
Serialize序列化器:将消息的Key和Value序列化为字节流,确保数据格式兼容。
Partitioner分区器:根据策略(如哈希、轮询)确定消息发往Topic的哪个分区。
md
功能:缓存待发送消息,实现批量传输优化。
容量**:默认32MB,减少频繁网络请求。
结构:按Topic分区维护消息队列(未直接标注),Sender线程按批次拉取。
md
异步发送线程(Sender线程):从RecordAccumulator拉取消息并异步发送至Kafka集群。
Sender读取消息:将缓存的消息按Broker分组,封装为ProducerBatch。
NetworkClient:管理网络通信,处理连接、请求发送及响应。
InflightBatches:控制每个Broker的并发请求数(默认最多5条未完成请求),防止网络拥塞。
Req1/Req2:待发送的请求批次,按优先级或顺序处理。
重试机制:若请求超时或失败(ACK未确认),自动重试(需配置retries参数)。
Selector:底层I/O多路复用器,监听网络通道状态,高效处理多个Broker连接。
md
ACK确认机制:根据生产者配置(acks=0/1/all),Broker返回写入成功确认:
`acks=0`:不等待确认,可能丢失数据。
`acks=1`:仅Leader副本确认即视为成功。
`acks=all`:需所有ISR副本确认,可靠性最高。
关键设计思想
- 异步解耦:主线程与Sender线程分离,避免阻塞用户业务逻辑。
- 批量压缩:RecordAccumulator减少小包传输,提升吞吐量。
- 流量控制:InflightBatches限制并发请求,平衡吞吐与资源消耗。
- 可靠性保障:重试机制与ACK策略确保消息不丢失(至少一次语义)。
::: tipp 核心关注点
- Kafka的分区确认是通过生产者确认的。
- Kafka内存缓冲池默认32M,会有多个队列,每个队列16k,默认积赞到16k才会发送消息,发送消息是异步的。
- kafka发送消息是按批次发送,不是按消息条数发送。
- 消息一个批次的大小,和最长等待发送时间是可配置的。
Producer发送消息原理剖析
生产者发送到broker有多重情况,如下所示:
- 如果指定
PartitionId
,则被发送到指定的分区; - 如果未指定
PartitionId
,但指定了Key,会按照hash(key)发送至对应分区; - 如果未指定
PartitionId
也没指定Key
,会按照默认轮训模式发送到每个分区; - 如果同时指定了
PartitionId
和Key
,会发送到指定的分区(Key不起作用); - 说明:分区有多个副本,但只有一个
replicationLeader
负责该分区和生产者消费者交互;
生产者常见配置
bootstrap.servers
:用于指定Kafka的地址,也就是Broker的地址。acks
:用于设置数据可靠性级别,可以选择0、1或者all。0表示不需要等待服务器的确认,1表示只需要等待leader节点的确认,all表示需要等待所有的副本节点都确认。retries
:设置请求失败时生产者自动重试的次数。如果设置为0,则不进行重试,如果启用重试,则可能会产生重复的消息。batch.size
:设置每个分区未发送消息的总字节大小。当消息达到该大小时,将触发发送。默认值为16KB。linger.ms
:设置消息发送的等待时间。当batch被填满或者等待时间达到上限时,将触发发送。默认值为0,表示立即发送。buffer.memory
:用于限制Kafka Producer可以使用的内存缓冲大小。默认值为32MB。如果设置得太小,可能导致内存缓冲快速写满而阻塞用户线程。key.serializer
:指定用于将消息中的key序列化为字节数组的序列化器。即使消息中没有指定key,也必须设置该参数。value.serializer
:指定用于将消息中的value序列化为字节数组的序列化器。
Producer使用JavaApi发送消息实战
核心配置说明
java
private Properties getKafkaConfigProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
java
private Properties getKafkaConfigProperties() {
Properties props = new Properties();
// Kafka 集群的地址列表,生产者通过这些地址连接到 Kafka 集群。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
// 消息发送确认机制:
// "0":生产者不会等待任何确认。
// "1":默认值,生产者会等待 leader 确认收到消息。
// "all"(或 "−1"):生产者会等待 leader 和所有副本确认收到消息,建议设置成all
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 发送失败时的重试次数, 默认值:2147483647(即 Integer.MAX_VALUE,理论上无限重试)
props.put(ProducerConfig.RETRIES_CONFIG, 0);
// 批量发送消息的大小(字节)。消息会被积累到这个大小后一起发送,默认值:16KB
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 生产者在发送消息前等待的时间(毫秒),等待时间到达后即使批次未满也会发送,默认值:0 毫秒(立即发送)
/* 默认值为 0 时,消息会立即发送,即使 batch.size 未填满。设置 linger.ms > 0,消息会在缓冲区等待,直到达到 linger.ms 时间或 batch.size 被填满后发送,从而增加批量发送的可能性,减少请求数量,增加吞吐量。 */
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者用来缓存消息的总内存大小(字节),默认值:33554432 字节(32 MB)
/* buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器,会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了;
buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整,需要结合实际业务情况压测进行配置 */
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 用于将消息键序列化为字节数组的类。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 用于将消息值序列化为字节数组的类。
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 消息压缩类型。可选值有 "none"(无压缩)、"gzip"、"snappy"、"lz4"、"zstd"。默认值:"none"
// props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
// 用于标识生产者的字符串,便于在日志和监控中识别。默认值:空字符串
// props.put(ProducerConfig.CLIENT_ID_CONFIG, "my-producer");
// 每个连接上未确认的请求数目。默认值:5;如果一定要保持顺序,则需要设置成1
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return props;
}
发送消息
java
@Slf4j
public class KafkaProducerTest {
private Producer<String, String> kafkaProducer;
@PostConstruct // 所有实例注入后调用
public void init() {
this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
}
@Test
public void send() {
try {
// 特点:获取结果,可靠性最高,吞吐量最低(等待响应),可设置超时时间
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容")).get();
// RecordMetadata recordMetadata1 = kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容")).get(5, TimeUnit.SECONDS);
log.info("发送状态:{}", recordMetadata.toString());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("消息发送失败");
throw new RuntimeException(e);
}
}
}
java
@Slf4j
public class KafkaProducerTest {
private Producer<String, String> kafkaProducer;
@PostConstruct
public void init() {
this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
}
@Test
public void send() {
// 直接发送不处理结果,特点:最高吞吐量,最低可靠性(可能丢失消息)
kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容"));
}
}
java
@Slf4j
public class KafkaProducerTest {
private Producer<String, String> kafkaProducer;
@PostConstruct
public void init() {
this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
}
@Test
public void send() {
// 特点:平衡吞吐量和可靠性(推荐生产环境使用)
kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容"), (metadata, e) -> {
if (e != null) {
log.error("异步发送失败", e);
} else {
log.info("异步发送成功,分区:{},偏移量:{}", metadata.partition(), metadata.offset());
}
});
}
}
key的作用
默认情况下,Kafka 的消息键(key)为 null
,此时使用 RoundRobin 算法将消息均匀分布到各个分区。如果设置了键,Kafka 会根据键的哈希值将消息写入特定分区,确保具有相同键的消息按顺序写入同一分区。
未设置键可能导致顺序问题。例如,订单的创建、支付和退款消息可能被分配到不同分区。如果某个分区因延迟导致消息未及时处理,消费者可能会先收到退款消息,再收到支付消息,导致顺序错误。
为避免此问题,可为消息指定键(如订单 ID),确保同一订单的所有消息被写入同一分区,从而保持消费顺序。
DANGER
如果要全局一致则只能有一个分区(一般没人用),这样会降低Kafka吞吐量,根据key保证消息消费顺序即可。
ProducerRecord参数说明
ProducerRecord(简称PR)发送给Kafka Broker的key/value 值对,封装基础数据信息。
- Topic (名字):消息要发送到的主题名称,必须指定;
- PartitionID (可选):分区 ID,如果指定了分区 ID,消息将直接发送到该分区;
- Key(可选):Kafka 使用键的哈希值来决定消息发送到哪个分区。相同键的消息会被发送到同一个分区,从而保证顺序性。
- Value:消息的实际内容,即要发送的数据。
使用回调函数方式处理
java
@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
if (e == null) {
log.info("recordMetadata:{}", recordMetadata);
} else {
log.error("消息发送失败");
throw new RuntimeException(e);
}
});
return true;
}
扩展:自定义partition分区规则
默认分区器:org.apache.kafka.clients.producer.internals.DefaultPartitioner
自定义分区规则:①创建类,实现Partitioner接口,重写方法;②配置 partitioner.class 指定类即可;
java
// 功能:如果key是xk857则分配到9区,否则使用key取模方式进行分区
public class Xk857Partitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if("xk857".equals(key)) {
return 0;
}
//使用hash值取模,确定分区(默认的也是这个方式)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
java
private Producer<String, String> myKafkaProducer;
@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
Properties props = getKafkaConfigProperties();
props.put("partitioner.class", "net.xdclass.xdclassredis.XdclassPartitioner");
this.myKafkaProducer = new KafkaProducer<>();
kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
if (e == null) {
log.info("recordMetadata:{}", recordMetadata);
} else {
log.error("消息发送失败");
throw new RuntimeException(e);
}
});
return true;
}
扩展:使用生产者拦截器
Kafka生产者拦截器(Interceptor)允许开发者在消息发送到Broker之前或之后插入自定义逻辑,常用于以下场景:
- 消息内容增强(添加统一头信息)
- 发送链路监控(统计发送成功率)
- 消息审计跟踪(记录消息轨迹)
- 异常统一处理
自定义拦截器实现
java
public class ProducerInterceptorDemo implements ProducerInterceptor<String, String> {
private static final String TIMESTAMP_HEADER = "produce-time";
// 发送到Broker前执行(可修改消息内容)
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 添加发送时间戳到消息头
Headers headers = record.headers();
headers.add(TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
// 可修改消息内容(示例:为value添加前缀)
String modifiedValue = "[Interceptor] " + record.value();
// 示例:统一添加消息追踪ID
headers.add("trace-id", UUID.randomUUID().toString().getBytes());
// 示例:敏感信息过滤
if (record.value().contains("password")) {
throw new RuntimeException("敏感信息禁止发送!");
}
return new ProducerRecord<>(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
modifiedValue, // 修改后的value
headers
);
}
// Broker确认成功后执行
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.printf("消息发送成功! topic:%s partition:%d offset:%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
System.err.println("消息发送失败: " + exception.getMessage());
}
}
// 关闭拦截器时调用
@Override
public void close() {
System.out.println("拦截器资源释放");
}
// 初始化配置时调用
@Override
public void configure(Map<String, ?> configs) {
System.out.println("拦截器初始化配置: " + configs);
}
}
java
private Properties getKafkaConfigProperties() {
Properties props = new Properties();
// ...其他配置保持不变
// 配置拦截器(支持多个逗号分隔)
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.ProducerInterceptorDemo");
return props;
}
使用说明
- 执行顺序:多个拦截器按配置顺序依次执行
onSend()
方法,onAcknowledgement()
按配置逆序执行。 - 异常处理:
onSend()
抛出异常会向上传递,中断发送流程,onAcknowledgement()
异常只会记录错误日志,不会影响后续拦截器执行。 - 性能影响:避免在拦截器中执行耗时操作,不要修改消息的
topic
和partition
(可能导致路由异常)。
完整调用流程
TIP
最佳实践生产环境建议添加以下拦截器:
- 监控拦截器:统计发送成功率/延迟等指标。
- 审计拦截器:记录关键消息的发送轨迹。
- 安全拦截器:进行敏感词过滤或权限校验。