Appearance
由于MQ经常处于复杂的分布式系统中,考虑⽹络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的⼀个关键点。如果没有消息重试,就可能产⽣消息丢失的问题,可能对系统产⽣很⼤的影响。所以,秉承宁可多发消息,也不可丢失消息的原则。 MQ消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。
TIP
生产者Producer重试(异步和SendOneWay下配置无效)
- 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
- 如果网络情况比较差,或者跨集群则建改多几次
java
// com.example.rocketmq.jms.PayProducer
@Component
public class PayProducer {
private String producerGroup = "pay_group";
private DefaultMQProducer producer;
public PayProducer() {
producer = new DefaultMQProducer(producerGroup);
// 生产者投递消息重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
}
com.example.rocketmq.controller.PayController
java
@RestController
@RequestMapping("/api/pay")
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/pay_cb")
public Object callback(String text) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
// key是唯一的,一般是订单号等,这里仅做测试,生产者根据key进行消息重投,默认次数为2
Message message = new Message(JmsConfig.TOPIC, "tag1","1234", ("Hello rocketmq = " + text).getBytes());
// 发送消息
SendResult sendResult = payProducer.getProducer().send(message, 10000);
System.out.println(sendResult);
return new HashMap<>();
}
}
DANGER
消费端重试原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等问题。
WARNING
注意: 重试间隔时间配置 ,默认每条消息最多重试 16 次 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 超过重试次数人工补偿
- 消费端去重
- 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
- 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息,
com.example.rocketmq.jms.PayConsumer