Skip to content

一个 topic 有多个 partition分区,每个分区又有多个副本,那么使用消息生产者Producer发送消息,如何指定发送到那个分区呢?

Producer发送消息原理剖析

生产者发送到broker有多重情况,如下所示:

  1. 如果指定PartitionId,则被发送到指定的分区;
  2. 如果未指定PartitionId,但指定了Key,会按照hash(key)发送至对应分区;
  3. 如果未指定PartitionId也没指定Key,会按照默认轮训模式发送到每个分区;
  4. 如果同时指定了PartitionIdKey,会发送到指定的分区(Key不起作用);
  5. 说明:分区有多个副本,但只有一个replicationLeader负责该分区和生产者消费者交互;

生产者到broker发送流程

Kafka客户端会将消息先暂存在本地,等待收集到足够的数量后再一次性发送到服务器。这样可以减少网络开销,提高整体性能,一般来说有两个决定性因素:缓冲区大小和缓冲时间,无论满足那一条都会触发发送逻辑;

生产者常见配置

  • bootstrap.servers:用于指定Kafka的地址,也就是Broker的地址。
  • acks:用于设置数据可靠性级别,可以选择0、1或者all。0表示不需要等待服务器的确认,1表示只需要等待leader节点的确认,all表示需要等待所有的副本节点都确认。
  • retries:设置请求失败时生产者自动重试的次数。如果设置为0,则不进行重试,如果启用重试,则可能会产生重复的消息。
  • batch.size:设置每个分区未发送消息的总字节大小。当消息达到该大小时,将触发发送。默认值为16KB。
  • linger.ms:设置消息发送的等待时间。当batch被填满或者等待时间达到上限时,将触发发送。默认值为0,表示立即发送。
  • buffer.memory:用于限制Kafka Producer可以使用的内存缓冲大小。默认值为32MB。如果设置得太小,可能导致内存缓冲快速写满而阻塞用户线程。
  • key.serializer:指定用于将消息中的key序列化为字节数组的序列化器。即使消息中没有指定key,也必须设置该参数。
  • value.serializer:指定用于将消息中的value序列化为字节数组的序列化器。
properties
bootstrap.servers  
acks
retries
batch.size
linger.ms
buffer.memory
key.serializer
value.serializer

Producer使用JavaApi发送消息实战

首先需要构建配置信息,如下所示:

java
private Properties getKafkaConfigProperties() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    return props;
}

解释如下:

java
private Properties getKafkaConfigProperties() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getEndpoint());
    // 所有副本都确认后才认为消息发送成功
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    // 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

    /* 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满
      如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
      通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送减少请求
      如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送 */
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

    /* buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
       如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
       会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
       buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整
       需要结合实际业务情况压测进行配置 */
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

    /* key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实 org.apache.kafka.common.serialization.Serializer接口的类,key序列化成字节数组。 */
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    return props;
}

发送消息

java
private Producer<String, String> kafkaProducer;

// 所有实例注入后调用
@PostConstruct
public void init() {
    this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
}


public void send(TopicEnums topicEnum, String key, String tag, String msg) {
    Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg));
    try {
        RecordMetadata recordMetadata = future.get();//不关心是否发送成功,则不需要这行
        System.out.println("发送状态:" + recordMetadata.toString());
    } catch (InterruptedException | ExecutionException e) {
        log.error("消息发送失败");
        throw new RuntimeException(e);
    }
}

key的作用

key默认是null,如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡地分布在各个partition上;如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到Topic的哪个partition,拥有相同key的消息会被写到同一个partition,实现顺序消息。

ProducerRecord参数说明

ProducerRecord(简称PR)发送给Kafka Broker的key/value 值对, 封装基础数据信息

  • Topic (名字)
  • PartitionID (可选)
  • Key(可选)
  • Value

使用回调函数方式处理

java
@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
    kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
        if (e == null) {
            log.info("recordMetadata:{}", recordMetadata);
        } else {
            log.error("消息发送失败");
            throw new RuntimeException(e);
        }
    });
    return true;
}

扩展:自定义partition分区规则

默认分区器:org.apache.kafka.clients.producer.internals.DefaultPartitioner

自定义分区规则:①创建类,实现Partitioner接口,重写方法;②配置 partitioner.class 指定类即可;

java
// 功能:如果key是xk857则分配到9区,否则使用key取模方式进行分区
public class Xk857Partitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if("xk857".equals(key)) {
            return 0;
        }
        //使用hash值取模,确定分区(默认的也是这个方式)
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

使用的时候指定自定义类即可

java
private Producer<String, String> myKafkaProducer;

@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
    Properties props = getKafkaConfigProperties();
    props.put("partitioner.class", "net.xdclass.xdclassredis.XdclassPartitioner");
    
    this.myKafkaProducer = new KafkaProducer<>();
    kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
        if (e == null) {
            log.info("recordMetadata:{}", recordMetadata);
        } else {
            log.error("消息发送失败");
            throw new RuntimeException(e);
        }
    });
    return true;
}