Appearance
消费者从Broker获取数据通常采用的是"pull"(拉取)模式。在这种模式下,消费者通过主动向Broker发送请求来获取数据。
为什么使用pull模式?
首先,pull模式能够根据消费者的消费能力进行调整,比如有三台机器,A机器配置高处理能力强,另外两台弱一些,在消费完本次数据后就主动向Broker“要”数据,实现机器消费能力自适应;
如果使用broker主动push的方式,当消息量非常大时,消费者可能会无法及时处理这些推送过来的消息。这样会导致消息堆积,影响系统的稳定性和性能。而使用pull模式,消费者可以根据自身的处理能力合理地控制数据的获取速度,避免消息的积压。
使用push模式等于是Broker向消费者发送消息,Broker没法知道消费者对之前消息的处理情况,也就无法实现自适应功能。
此外,如果broker没有数据可供消费者获取时,消费者可以通过配置超时时间来进行阻塞等待。当有新的消息到达时,消费者会立即获取到这些消息并进行处理。
SpringBoot关闭Kafka日志
logback.xml
xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
application.yml
yaml
logging:
config: classpath:logback.xml
消费者配置
properties
#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id
#为true则自动提交偏移量
enable.auto.commit
#自动提交offset周期
auto.commit.interval.ms
#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset
#序列化器
key.deserializer
JavaApi配置如下
java
private Properties getKafkaConfigProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, xk857ProducerMqService.getEndpoint());
// 消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
props.put("group.id", "xk857-g1");
// 开启自动提交offset
props.put("enable.auto.commit", "true");
// 自动提交offset延迟时间
props.put("auto.commit.interval.ms", "1000");
// 反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
消费者消费消息
java
@Test
public void simpleConsumerTest(){
Properties props = getKafkaConfigProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅topic主题
consumer.subscribe(Arrays.asList(TopicEnums.GE_TUI_MESSAGE.getValue()));
while (true) {
//拉取时间控制,阻塞超时时间
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
}
}
}