Skip to content

SpringBoot对Kafka提供了简单易用的集成,使其操作消息生产者和消费者更加方便。

xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

生产者发送消息

  • 配置文件修改增加生产者信息
yaml
spring:
  kafka:
    bootstrap-servers: ip1:9092,ip2:9092,ip3:9092
    producer:
      # 消息重发的次数。
      retries: 0
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

发送消息

java
private static final String TOPIC_NAME = "user.register.topic";

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@GetMapping("/api/user/{phone}")
public void sendMessage1(@PathVariable("phone") String phone) {
    kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> {
        // 消息发送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息发送到的分区
        int partition = success.getRecordMetadata().partition();
        // 消息在分区内的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("发送消息失败:" + failure.getMessage());
    });
}

消费者消费消息

yaml
spring:
  kafka:
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      listener:
        #手工ack,调用ack后立刻提交offset
        ack-mode: manual_immediate
        #容器运行的线程数
        concurrency: 4

消费者监听

java
@Component
public class MQListener {

    /**
     *  消费监听
     * @param record
     */
    @KafkaListener(topics = {"user.register.topic"},groupId = "xdlcass-test-gp")
    public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        // 打印出消息内容
        System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        ack.acknowledge();
    }
}

Kafka事务消息

配置修改如下:

yaml
spring:
  kafka:
    bootstrap-servers: ip1:9092,ip2:9092,ip3:9092
    producer:
      # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      #retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      #acks: all
      #事务id
      transaction-id-prefix: xdclass-tran
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      listener:
        # 在侦听器容器中运行的线程数。
        concurrency: 4
        #listner负责ack,手动调用Acknowledgment.acknowledge()后立即提交
        ack-mode: manual_immediate
        #避免出现主题未创建报错
        missing-topics-fatal: false

事务代码编写

java
/**
  * 注解方式的事务
  * @param i
  */
@GetMapping("/kafka/transaction1")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage1(int i) {
    kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1  i="+i);
    if (i == 0) {
        throw new RuntimeException("fail");
    }
    kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2  i="+i);
}

/**
  * 声明式事务支持
  * @param i
  */
@GetMapping("/kafka/transaction2")
public void sendMessage2(int i) {
    kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
        @Override
        public Object doInOperations(KafkaOperations kafkaOperations) {
            kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:1  i="+i);
            if(i==0) {
                throw new RuntimeException("input is error");
            }
            kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:2  i="+i);
            return true;
        }
    });
}