Appearance
一个 topic 有多个 partition分区,每个分区又有多个副本,那么使用消息生产者Producer发送消息,如何指定发送到那个分区呢?
Producer发送消息原理剖析
生产者发送到broker有多重情况,如下所示:
- 如果指定
PartitionId
,则被发送到指定的分区; - 如果未指定
PartitionId
,但指定了Key,会按照hash(key)发送至对应分区; - 如果未指定
PartitionId
也没指定Key
,会按照默认轮训模式发送到每个分区; - 如果同时指定了
PartitionId
和Key
,会发送到指定的分区(Key不起作用); - 说明:分区有多个副本,但只有一个
replicationLeader
负责该分区和生产者消费者交互;
生产者到broker发送流程
Kafka客户端会将消息先暂存在本地,等待收集到足够的数量后再一次性发送到服务器。这样可以减少网络开销,提高整体性能,一般来说有两个决定性因素:缓冲区大小和缓冲时间,无论满足那一条都会触发发送逻辑;
生产者常见配置
bootstrap.servers
:用于指定Kafka的地址,也就是Broker的地址。acks
:用于设置数据可靠性级别,可以选择0、1或者all。0表示不需要等待服务器的确认,1表示只需要等待leader节点的确认,all表示需要等待所有的副本节点都确认。retries
:设置请求失败时生产者自动重试的次数。如果设置为0,则不进行重试,如果启用重试,则可能会产生重复的消息。batch.size
:设置每个分区未发送消息的总字节大小。当消息达到该大小时,将触发发送。默认值为16KB。linger.ms
:设置消息发送的等待时间。当batch被填满或者等待时间达到上限时,将触发发送。默认值为0,表示立即发送。buffer.memory
:用于限制Kafka Producer可以使用的内存缓冲大小。默认值为32MB。如果设置得太小,可能导致内存缓冲快速写满而阻塞用户线程。key.serializer
:指定用于将消息中的key序列化为字节数组的序列化器。即使消息中没有指定key,也必须设置该参数。value.serializer
:指定用于将消息中的value序列化为字节数组的序列化器。
properties
bootstrap.servers
acks
retries
batch.size
linger.ms
buffer.memory
key.serializer
value.serializer
Producer使用JavaApi发送消息实战
首先需要构建配置信息,如下所示:
java
private Properties getKafkaConfigProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
解释如下:
java
private Properties getKafkaConfigProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getEndpoint());
// 所有副本都确认后才认为消息发送成功
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
props.put(ProducerConfig.RETRIES_CONFIG, 0);
// 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
/* 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满
如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送减少请求
如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送 */
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
/* buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整
需要结合实际业务情况压测进行配置 */
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/* key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实 org.apache.kafka.common.serialization.Serializer接口的类,key序列化成字节数组。 */
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
发送消息
java
private Producer<String, String> kafkaProducer;
// 所有实例注入后调用
@PostConstruct
public void init() {
this.kafkaProducer = new KafkaProducer<>(getKafkaConfigProperties());
}
public void send(TopicEnums topicEnum, String key, String tag, String msg) {
Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg));
try {
RecordMetadata recordMetadata = future.get();//不关心是否发送成功,则不需要这行
System.out.println("发送状态:" + recordMetadata.toString());
} catch (InterruptedException | ExecutionException e) {
log.error("消息发送失败");
throw new RuntimeException(e);
}
}
key的作用
key默认是null,如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡地分布在各个partition上;如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到Topic的哪个partition,拥有相同key的消息会被写到同一个partition,实现顺序消息。
ProducerRecord参数说明
ProducerRecord(简称PR)发送给Kafka Broker的key/value 值对, 封装基础数据信息
- Topic (名字)
- PartitionID (可选)
- Key(可选)
- Value
使用回调函数方式处理
java
@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
if (e == null) {
log.info("recordMetadata:{}", recordMetadata);
} else {
log.error("消息发送失败");
throw new RuntimeException(e);
}
});
return true;
}
扩展:自定义partition分区规则
默认分区器:org.apache.kafka.clients.producer.internals.DefaultPartitioner
自定义分区规则:①创建类,实现Partitioner接口,重写方法;②配置 partitioner.class 指定类即可;
java
// 功能:如果key是xk857则分配到9区,否则使用key取模方式进行分区
public class Xk857Partitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if("xk857".equals(key)) {
return 0;
}
//使用hash值取模,确定分区(默认的也是这个方式)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
使用的时候指定自定义类即可
java
private Producer<String, String> myKafkaProducer;
@Override
public boolean send(TopicEnums topicEnum, String key, String tag, String msg) {
Properties props = getKafkaConfigProperties();
props.put("partitioner.class", "net.xdclass.xdclassredis.XdclassPartitioner");
this.myKafkaProducer = new KafkaProducer<>();
kafkaProducer.send(new ProducerRecord<>(topicEnum.getValue(), key, msg), (recordMetadata, e) -> {
if (e == null) {
log.info("recordMetadata:{}", recordMetadata);
} else {
log.error("消息发送失败");
throw new RuntimeException(e);
}
});
return true;
}