RabbitMQ中消息确认机制

RabbitMQ中消息确认机制,第1张

RabbitMQ中消息确认机制

目录

1.发送端确认机制

2.消费端确认机制


概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制

1.发送端确认机制

RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。

        在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的

消息发送到交换机

在配置文件中开启消息确认模式

# SIMPLE       禁用发布确认模式,是默认值
# CORRELATED   发布消息成功到交换器或失败后 会触发回调方法
# NONE         有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用 
               rabbitTemplate调用waitFor/confirm/is或waitFor/confirm/isOrDie方法等待broker节点返回 
               发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitFor/confirm/isOrDie方法如果 
               返回false则会关闭channel,则接下来无法发送消息到broker;

spring.rabbitmq.publisher-/confirm/i-type=CORRELATED

 通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理

@Component
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            # 根据具体的业务进行相应的处理
            System.out.println("【交换机】 生产者消息确认失败了====" + cause);
        } else {
            System.out.println("【交换机】 生产者消息确认成功====");
        }
    }
}

 对rabbitTemplate的callback进行设置

@Configuration
public class RabbitConfig {

    @Autowired
    private RabbitConfirmConfig rabbit/confirm/iConfig;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate(){    
        rabbitTemplate.setConfirmCallback(rabbit/confirm/iConfig);
    }
}
    

 交换机路由消息到队列

在配置文件中开启消息异常重新入队

# 确保消息发送失败后可以重新返回到队列中
# 也可以通过 rabbitTemplate.setMandatory(true) 来设置
spring.rabbitmq.publisher-returns=true

 通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理

@Component
public class RabbitConfirmConfig implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

        # 根据具体的业务对异常进行处理,自行判断是否消息可以丢弃
        if (AMQP.NO_ROUTE == replyCode){
            System.out.println("【队列】 交换机路由到队列失败====" + message);
        }

    }

}

对rabbitTemplate的 returnback 进行设置

@Configuration
public class RabbitConfig {

    @Autowired
    private RabbitConfirmConfig rabbit/confirm/iConfig;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate(){
        rabbitTemplate.setReturnCallback(rabbit/confirm/iConfig);
    }
}
    

2.消费端确认机制

在配置文件中对消费者消息应答方式进行设置

# NONE   则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
# MANUAL 需要显式的调用当前channel的basicAck方法
# AUTO   看情况确认,如果此时消费者抛出异常则消息会返回到队列中
spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL


SpringBoot项目中支持如下的一些配置:
#最大重试次数 
spring.rabbitmq.listener.simple.retry.max-attempts=5 

#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时) 
spring.rabbitmq.listener.simple.retry.enabled=true 

#重试间隔时间(单位毫秒) 
spring.rabbitmq.listener.simple.retry.initial-interval=5000 # 

重试超过最大次数后是否拒绝 
spring.rabbitmq.listener.simple.default-requeue-rejected=false 

#ack模式 
spring.rabbitmq.listener.simple.acknowledge-mode=manual

通过实现监听器的方式实现消费端消息的推送

@Component
public class OrderListener {

    @RabbitListener(queues = RabbitConfig.QUEUE_ORDER_NAME, ackMode = "MANUAL")
    public void toOrderMessage(Channel channel, Message message) throws IOException {

        try {
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e){
                    
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,  true);
        }

    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/zaji/5716351.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存