
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了 *** 作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
1、rabbitmq如何保证消息可靠首先我们知道一个完整的结构涉及到生产者,mq,消费者这三部分,mq解除了系统之间的耦合,但是会出现一些问题,比如现在是三部分,它们都是独立的,每一部分都会存在消息丢失的情况,所以要从这三部分一起解决此问题。如下图。
1.1 mq保证消息不丢失对交换机,队列,消息进行持久化,持久化之后mq断电重启,消息还会被继续消费
1.1.1交换机、队列持久化@Configuration
public class MQConfig {
//队列
public final static String /confirm/i_QUEUE = "/confirm/i1_queue";
//交换机
public final static String /confirm/i_EXCHANGE = "/confirm/i1_exchange";
//routingKey
public final static String ROUTINGKEY = "routing_key";
@Bean
public Queue /confirm/iQueue() {
return new Queue(/confirm/i_QUEUE, true, false, false);
}
@Bean
DirectExchange /confirm/iExchange() {
return new DirectExchange(MQConfig./confirm/i_EXCHANGE);
}
@Bean
Binding bindingDirectExchangeCA() {
return BindingBuilder.bind(/confirm/iQueue()).to(/confirm/iExchange()).with(ROUTINGKEY);
}
}
1.1.2 消息持久化
springboot集成的rabbitmq的持久化,其实默认就实现了,且看源码:
@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
//点进去
@Override
public void convertAndSend(String exchange, String routingKey, final Object object,
@Nullable CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
此时发现消息转换的时候,传入了一个MessageProperties对象
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}
static {
//持久化
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = 0;
}
从 DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;看出springboot默认已经对消息进行了持久化
1.2 消费者消息不丢失通过手动ack机制,当消费者成功将消息消费成功后,返回消息给mq,告诉mq你发送的消息我已经消费成功了,mq将队列中的消息进行删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
yaml
rabbitmq:
template:
mandatory: true #指定消息在没有被队列接收时是否强行退回还是直接丢弃
listener:
simple:
retry:
####开启消费者异常重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 5000
####开启手动ack
acknowledge-mode: manual
1.3 生产者消息不丢失
发布确认机制,生产者通过回调可以得知发送的消息是否发送到交换机
yml文件:
publisher-/confirm/i-type: correlated # 发布消息成功到交换器后会触发回调方法 publisher-returns: true #publisher-return模式可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃 template: mandatory: true #指定消息在没有被队列接收时是否强行退回还是直接丢弃
相关业务代码:
@Configuration
@Slf4j
public class RabbitConfigStudy implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.set/confirm/iCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
return rabbitTemplate;
}
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new
String(message.getBody()), exchange, replyText, routingKey);
// spring_returned_message_correlation:该属性是指退回待确认消息的唯一标识 ,也就是id,可以作为更新表
System.out.println("消息:" + message.getMessageProperties().getHeader("spring_returned_message_correlation").toString());
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)