
目录
1.发送端确认机制
2.消费端确认机制
1.发送端确认机制概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制
RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。
在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的
消息发送到交换机
在配置文件中开启消息确认模式
# SIMPLE 禁用发布确认模式,是默认值
# CORRELATED 发布消息成功到交换器或失败后 会触发回调方法
# NONE 有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用
rabbitTemplate调用waitFor/confirm/is或waitFor/confirm/isOrDie方法等待broker节点返回
发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitFor/confirm/isOrDie方法如果
返回false则会关闭channel,则接下来无法发送消息到broker;
spring.rabbitmq.publisher-/confirm/i-type=CORRELATED
通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理
@Component
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
# 根据具体的业务进行相应的处理
System.out.println("【交换机】 生产者消息确认失败了====" + cause);
} else {
System.out.println("【交换机】 生产者消息确认成功====");
}
}
}
对rabbitTemplate的callback进行设置
@Configuration
public class RabbitConfig {
@Autowired
private RabbitConfirmConfig rabbit/confirm/iConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(rabbit/confirm/iConfig);
}
}
交换机路由消息到队列
在配置文件中开启消息异常重新入队
# 确保消息发送失败后可以重新返回到队列中 # 也可以通过 rabbitTemplate.setMandatory(true) 来设置 spring.rabbitmq.publisher-returns=true
通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理
@Component
public class RabbitConfirmConfig implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
# 根据具体的业务对异常进行处理,自行判断是否消息可以丢弃
if (AMQP.NO_ROUTE == replyCode){
System.out.println("【队列】 交换机路由到队列失败====" + message);
}
}
}
对rabbitTemplate的 returnback 进行设置
@Configuration
public class RabbitConfig {
@Autowired
private RabbitConfirmConfig rabbit/confirm/iConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setReturnCallback(rabbit/confirm/iConfig);
}
}
2.消费端确认机制
在配置文件中对消费者消息应答方式进行设置
# NONE 则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险 # MANUAL 需要显式的调用当前channel的basicAck方法 # AUTO 看情况确认,如果此时消费者抛出异常则消息会返回到队列中 spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL SpringBoot项目中支持如下的一些配置: #最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=5 #是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时) spring.rabbitmq.listener.simple.retry.enabled=true #重试间隔时间(单位毫秒) spring.rabbitmq.listener.simple.retry.initial-interval=5000 # 重试超过最大次数后是否拒绝 spring.rabbitmq.listener.simple.default-requeue-rejected=false #ack模式 spring.rabbitmq.listener.simple.acknowledge-mode=manual
通过实现监听器的方式实现消费端消息的推送
@Component
public class OrderListener {
@RabbitListener(queues = RabbitConfig.QUEUE_ORDER_NAME, ackMode = "MANUAL")
public void toOrderMessage(Channel channel, Message message) throws IOException {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)