Skip to content

Producer生产者实战

一个 topic 有多个 partition分区,每个分区又有多个副本,那么使用消息生产者Producer发送消息,如何指定发送到那个分区呢?

Producer发送消息原理剖析如下图所示,Kafka客户端会将消息先暂存在本地,等待收集到足够的数量后再一次性发送到服务器。这样可以减少网络开销,提高整体性能,一般来说有两个决定性因素:缓冲区大小和缓冲时间,无论满足那一条都会触发发送逻辑;

Kafka生产者消息发送流程图.drawio

md
功能:负责消息的创建与预处理。
Product:用户调用生产者API生成原始消息。
Interceptor拦截器:在消息发送前进行自定义处理(如日志、过滤)。
Serialize序列化器:将消息的Key和Value序列化为字节流,确保数据格式兼容。
Partitioner分区器:根据策略(如哈希、轮询)确定消息发往Topic的哪个分区。
md
功能:缓存待发送消息,实现批量传输优化。
容量**:默认32MB,减少频繁网络请求。
结构:按Topic分区维护消息队列(未直接标注),Sender线程按批次拉取。
md
异步发送线程(Sender线程):从RecordAccumulator拉取消息并异步发送至Kafka集群。
Sender读取消息:将缓存的消息按Broker分组,封装为ProducerBatch。
NetworkClient:管理网络通信,处理连接、请求发送及响应。
InflightBatches:控制每个Broker的并发请求数(默认最多5条未完成请求),防止网络拥塞。
Req1/Req2:待发送的请求批次,按优先级或顺序处理。
重试机制:若请求超时或失败(ACK未确认),自动重试(需配置retries参数)。
Selector:底层I/O多路复用器,监听网络通道状态,高效处理多个Broker连接。
md
ACK确认机制:根据生产者配置(acks=0/1/all),Broker返回写入成功确认:
`acks=0`:不等待确认,可能丢失数据。
`acks=1`:仅Leader副本确认即视为成功。
`acks=all`:需所有ISR副本确认,可靠性最高。

关键设计思想

  1. 异步解耦:主线程与Sender线程分离,避免阻塞用户业务逻辑。
  2. 批量压缩:RecordAccumulator减少小包传输,提升吞吐量。
  3. 流量控制:InflightBatches限制并发请求,平衡吞吐与资源消耗。
  4. 可靠性保障:重试机制与ACK策略确保消息不丢失(至少一次语义)。

::: tipp 核心关注点

  1. Kafka的分区确认是通过生产者确认的。
  2. Kafka内存缓冲池默认32M,会有多个队列,每个队列16k,默认积赞到16k才会发送消息,发送消息是异步的。
  3. kafka发送消息是按批次发送,不是按消息条数发送。
  4. 消息一个批次的大小,和最长等待发送时间是可配置的。

Producer发送消息原理剖析

生产者发送到broker有多重情况,如下所示:

  1. 如果指定PartitionId,则被发送到指定的分区;
  2. 如果未指定PartitionId,但指定了Key,会按照hash(key)发送至对应分区;
  3. 如果未指定PartitionId也没指定Key,会按照默认轮训模式发送到每个分区;
  4. 如果同时指定了PartitionIdKey,会发送到指定的分区(Key不起作用);
  5. 说明:分区有多个副本,但只有一个replicationLeader负责该分区和生产者消费者交互;

生产者常见配置

  • bootstrap.servers:用于指定Kafka的地址,也就是Broker的地址。
  • acks:用于设置数据可靠性级别,可以选择0、1或者all。0表示不需要等待服务器的确认,1表示只需要等待leader节点的确认,all表示需要等待所有的副本节点都确认。
  • retries:设置请求失败时生产者自动重试的次数。如果设置为0,则不进行重试,如果启用重试,则可能会产生重复的消息。
  • batch.size:设置每个分区未发送消息的总字节大小。当消息达到该大小时,将触发发送。默认值为16KB。
  • linger.ms:设置消息发送的等待时间。当batch被填满或者等待时间达到上限时,将触发发送。默认值为0,表示立即发送。
  • buffer.memory:用于限制Kafka Producer可以使用的内存缓冲大小。默认值为32MB。如果设置得太小,可能导致内存缓冲快速写满而阻塞用户线程。
  • key.serializer:指定用于将消息中的key序列化为字节数组的序列化器。即使消息中没有指定key,也必须设置该参数。
  • value.serializer:指定用于将消息中的value序列化为字节数组的序列化器。

Producer使用JavaApi发送消息实战

核心配置说明

java
private Properties getKafkaConfigProperties() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    return props;
}
java
private Properties getKafkaConfigProperties() {
    Properties props = new Properties();

    // Kafka 集群的地址列表,生产者通过这些地址连接到 Kafka 集群。
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");

    // 消息发送确认机制:
    // "0":生产者不会等待任何确认。
    // "1":默认值,生产者会等待 leader 确认收到消息。
    // "all"(或 "−1"):生产者会等待 leader 和所有副本确认收到消息,建议设置成all
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    // 发送失败时的重试次数, 默认值:2147483647(即 Integer.MAX_VALUE,理论上无限重试)
    props.put(ProducerConfig.RETRIES_CONFIG, 0);

    // 批量发送消息的大小(字节)。消息会被积累到这个大小后一起发送,默认值:16KB
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

    // 生产者在发送消息前等待的时间(毫秒),等待时间到达后即使批次未满也会发送,默认值:0 毫秒(立即发送)
  	/* 默认值为 0 时,消息会立即发送,即使 batch.size 未填满。设置 linger.ms > 0,消息会在缓冲区等待,直到达到 linger.ms 时间或 batch.size 被填满后发送,从而增加批量发送的可能性,减少请求数量,增加吞吐量。 */
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

    // 生产者用来缓存消息的总内存大小(字节),默认值:33554432 字节(32 MB)
  	/* buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
       如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器,会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了;
       buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整,需要结合实际业务情况压测进行配置 */
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

    // 用于将消息键序列化为字节数组的类。
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    // 用于将消息值序列化为字节数组的类。
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    // 消息压缩类型。可选值有 "none"(无压缩)、"gzip"、"snappy"、"lz4"、"zstd"。默认值:"none"
    // props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

    // 用于标识生产者的字符串,便于在日志和监控中识别。默认值:空字符串
    // props.put(ProducerConfig.CLIENT_ID_CONFIG, "my-producer");

    // 每个连接上未确认的请求数目。默认值:5;如果一定要保持顺序,则需要设置成1
    // props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    return props;
}

发送消息

java
@Slf4j
public class KafkaProducerTest {

    private Producer<String, String> kafkaProducer;

    @PostConstruct // 所有实例注入后调用
    public void init() {
        this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
    }

    @Test
    public void send() {
        try {
            // 特点:获取结果,可靠性最高,吞吐量最低(等待响应),可设置超时时间
            RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容")).get();
            // RecordMetadata recordMetadata1 = kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容")).get(5, TimeUnit.SECONDS);
            log.info("发送状态:{}", recordMetadata.toString());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("消息发送失败");
            throw new RuntimeException(e);
        }
    }
}
java
@Slf4j
public class KafkaProducerTest {

    private Producer<String, String> kafkaProducer;

    @PostConstruct
    public void init() {
        this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
    }

		@Test
    public void send() {
      // 直接发送不处理结果,特点:最高吞吐量,最低可靠性(可能丢失消息)
        kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容"));
    }
}
java
@Slf4j
public class KafkaProducerTest {

    private Producer<String, String> kafkaProducer;

    @PostConstruct
    public void init() {
        this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
    }

    @Test
    public void send() {
     	 	// 特点:平衡吞吐量和可靠性(推荐生产环境使用)
        kafkaProducer.send(new ProducerRecord<>("xk857", "001", "消息内容"), (metadata, e) -> {
            if (e != null) {
                log.error("异步发送失败", e);
            } else {
                log.info("异步发送成功,分区:{},偏移量:{}", metadata.partition(), metadata.offset());
            }
        });
    }
}

key的作用

默认情况下,Kafka 的消息键(key)为 null,此时使用 RoundRobin 算法将消息均匀分布到各个分区。如果设置了键,Kafka 会根据键的哈希值将消息写入特定分区,确保具有相同键的消息按顺序写入同一分区。

未设置键可能导致顺序问题。例如,订单的创建、支付和退款消息可能被分配到不同分区。如果某个分区因延迟导致消息未及时处理,消费者可能会先收到退款消息,再收到支付消息,导致顺序错误。

为避免此问题,可为消息指定键(如订单 ID),确保同一订单的所有消息被写入同一分区,从而保持消费顺序。

DANGER

如果要全局一致则只能有一个分区(一般没人用),这样会降低Kafka吞吐量,根据key保证消息消费顺序即可。

ProducerRecord参数说明

ProducerRecord(简称PR)发送给Kafka Broker的key/value 值对,封装基础数据信息。

  • Topic (名字):消息要发送到的主题名称,必须指定;
  • PartitionID (可选):分区 ID,如果指定了分区 ID,消息将直接发送到该分区;
  • Key(可选):Kafka 使用键的哈希值来决定消息发送到哪个分区。相同键的消息会被发送到同一个分区,从而保证顺序性。
  • Value:消息的实际内容,即要发送的数据。

使用回调函数方式处理

java
@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
    kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
        if (e == null) {
            log.info("recordMetadata:{}", recordMetadata);
        } else {
            log.error("消息发送失败");
            throw new RuntimeException(e);
        }
    });
    return true;
}

扩展:自定义partition分区规则

默认分区器:org.apache.kafka.clients.producer.internals.DefaultPartitioner

自定义分区规则:①创建类,实现Partitioner接口,重写方法;②配置 partitioner.class 指定类即可;

java
// 功能:如果key是xk857则分配到9区,否则使用key取模方式进行分区
public class Xk857Partitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if("xk857".equals(key)) {
            return 0;
        }
        //使用hash值取模,确定分区(默认的也是这个方式)
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}
java
private Producer<String, String> myKafkaProducer;

@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
    Properties props = getKafkaConfigProperties();
    props.put("partitioner.class", "net.xdclass.xdclassredis.XdclassPartitioner");
    
    this.myKafkaProducer = new KafkaProducer<>();
    kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
        if (e == null) {
            log.info("recordMetadata:{}", recordMetadata);
        } else {
            log.error("消息发送失败");
            throw new RuntimeException(e);
        }
    });
    return true;
}

扩展:使用生产者拦截器

Kafka生产者拦截器(Interceptor)允许开发者在消息发送到Broker之前或之后插入自定义逻辑,常用于以下场景:

  • 消息内容增强(添加统一头信息)
  • 发送链路监控(统计发送成功率)
  • 消息审计跟踪(记录消息轨迹)
  • 异常统一处理

自定义拦截器实现

java
public class ProducerInterceptorDemo implements ProducerInterceptor<String, String> {
    
    private static final String TIMESTAMP_HEADER = "produce-time";

    // 发送到Broker前执行(可修改消息内容)
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 添加发送时间戳到消息头
        Headers headers = record.headers();
        headers.add(TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
        
        // 可修改消息内容(示例:为value添加前缀)
        String modifiedValue = "[Interceptor] " + record.value();
      
        // 示例:统一添加消息追踪ID
        headers.add("trace-id", UUID.randomUUID().toString().getBytes());

        // 示例:敏感信息过滤
        if (record.value().contains("password")) {
            throw new RuntimeException("敏感信息禁止发送!");
        }
        
        return new ProducerRecord<>(
            record.topic(),
            record.partition(),
            record.timestamp(),
            record.key(),
            modifiedValue,  // 修改后的value
            headers
        );
    }

    // Broker确认成功后执行
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.printf("消息发送成功! topic:%s partition:%d offset:%d%n",
                metadata.topic(), metadata.partition(), metadata.offset());
        } else {
            System.err.println("消息发送失败: " + exception.getMessage());
        }
    }

    // 关闭拦截器时调用
    @Override
    public void close() {
        System.out.println("拦截器资源释放");
    }

    // 初始化配置时调用
    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("拦截器初始化配置: " + configs);
    }
}
java
private Properties getKafkaConfigProperties() {
    Properties props = new Properties();
    // ...其他配置保持不变
    
    // 配置拦截器(支持多个逗号分隔)
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.ProducerInterceptorDemo");
    return props;
}

使用说明

  1. 执行顺序:多个拦截器按配置顺序依次执行onSend()方法,onAcknowledgement()按配置逆序执行。
  2. 异常处理onSend()抛出异常会向上传递,中断发送流程,onAcknowledgement()异常只会记录错误日志,不会影响后续拦截器执行。
  3. 性能影响:避免在拦截器中执行耗时操作,不要修改消息的topicpartition(可能导致路由异常)。

完整调用流程

Kafka拦截器.drawio

TIP

最佳实践生产环境建议添加以下拦截器:

  1. 监控拦截器:统计发送成功率/延迟等指标。
  2. 审计拦截器:记录关键消息的发送轨迹。
  3. 安全拦截器:进行敏感词过滤或权限校验。