Skip to content

Consumer消费者实战

Kafka中的消费者组

Kafka以分区纬度消费消息

  1. 一个分区只可以被消费组中的一个消费者所消费。
  2. 一个消费组中的一个消费者可以消费多个分区,例如 C1 消费了P0,P3
  3. 所有消费者一起消费所有的分区,例如 C1 和 C2 共同完成了对 P0、P1、P2、P3 的消费。
  4. 在不同消费组中,每个消费组都会消费所有的分区,例如消费组A、消费组B都消费了P0、P1、P2、P3。
  5. 同一个消费组里面的消费者对分区是互斥的,例如 C1 和 C2 不会消费同一个分区;而分区在不同的消费组间是共享的。
  6. 如果分区数小于消费者数量,多余的消费者将处于空闲状态,不会消费消息。

TIP

消费者从Broker获取数据通常采用的是"pull"(拉取)模式。在这种模式下,消费者通过主动向Broker发送请求来获取数据。

为什么使用pull模式?

首先,pull模式能够根据消费者的消费能力进行调整,比如有三台机器,A机器配置高处理能力强,另外两台弱一些,在消费完本次数据后就主动向Broker“要”数据,实现机器消费能力自适应;

如果使用broker主动push的方式,当消息量非常大时,消费者可能会无法及时处理这些推送过来的消息。这样会导致消息堆积,影响系统的稳定性和性能。而使用pull模式,消费者可以根据自身的处理能力合理地控制数据的获取速度,避免消息的积压。

使用push模式等于是Broker向消费者发送消息,Broker没法知道消费者对之前消息的处理情况,也就无法实现自适应功能。

此外,如果broker没有数据可供消费者获取时,消费者可以通过配置超时时间来进行阻塞等待。当有新的消息到达时,消费者会立即获取到这些消息并进行处理。

Kafka消费者两种模式

SpringBoot关闭Kafka日志

logback.xml

xml
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>
    <root level="info">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

application.yml

yaml
logging:
  config: classpath:logback.xml

消费者配置

properties
#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id

#为true则自动提交偏移量
enable.auto.commit

#自动提交offset周期
auto.commit.interval.ms

#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset

#序列化器
key.deserializer

JavaApi配置如下

java
private Properties getKafkaConfigProperties() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, xk857ProducerMqService.getEndpoint());
    // 消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
    props.put("group.id", "xk857-g1");

    // 开启自动提交offset
    props.put("enable.auto.commit", "true");

    // 自动提交offset延迟时间
    props.put("auto.commit.interval.ms", "1000");

    // 反序列化
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
}

消费者消费消息

java
@Test
public void simpleConsumerTest(){
    Properties props = getKafkaConfigProperties();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    // 订阅topic主题
    consumer.subscribe(Arrays.asList(TopicEnums.GE_TUI_MESSAGE.getValue()));

    while (true) {
        //拉取时间控制,阻塞超时时间
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
        }
    }
}

消费者分配策略

  • Range(默认):范围分配,将每个 Topic 的分区按顺序划分范围。
    • 简单易实现。
    • 可能导致分配不均(尤其 Topic 数量多时,前面的消费者负载偏高)。
  • RoundRobin:轮询分配,将所有 Topic 的分区按哈希排序后轮询分配给消费者。
    • 分配更均衡。
    • 要求所有消费者订阅相同的 Topic 列表。
  • Sticky:粘性分配。
    • 初始分配类似 RoundRobin,但再平衡(Rebalance)时尽量保留原有分配,减少分区迁移。
    • 减少再平衡时的数据迁移,提升稳定性。
    • 需配合 partition.assignment.strategy 使用。
java
Properties props = new Properties();

// 设置 Range 分配策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("topic1", "topic2")); // 订阅主题列表
java
Properties props = new Properties();

// 设置 RoundRobin 分配策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 所有消费者必须订阅相同的Topic
consumer.subscribe(List.of("topic1","topic2"));
java
Properties props = new Properties();

// 设置 Sticky 分配策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("topic1", "topic2"));

DANGER

同一个消费者组的分配策略必须是一致的,