
首先要清楚rabbitmq基础知识
生产者:生产消息
消费者:消费监听指定队列的消息
配置类:将交换机队列绑定到一起
引入依赖:
org.springframework.boot spring-boot-starter-amqpcn.hutool hutool-all5.4.5 org.projectlombok lombokprovided 1.18.16
创建生产者:
@Component
@Slf4j
public class SendUserIntegralRabbitMqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(SendUserIntegralDTO dto) {
try {
rabbitTemplate.convertAndSend(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL,
MqConstant.CHECK_ROUTING_KEY_USER_INTEGRAL,
JSONUtil.toJsonStr(dto));
} catch (Exception e) {
log.error("----------------------发送用户获取积分消息到 RabbitMQ 异常:", e);
}
}
}
使用生产者:
if (CommonConstant.TWO.equals(query.getIsPass())) {
SendUserIntegralDTO dto = new SendUserIntegralDTO();
dto.setIntegral(examineIntegralPO.getGrantIntegral());
dto.setExamineId(examineIntegralPO.getId());
sendUserIntegralRabbitMqProducer.send(dto);
}
消费者代码:
@Slf4j
@Component
public class UserIntegralRabbitMqConsumer {
@Resource
private IntegralService integralService;
@RabbitListener(queues = {MqConstant.CHECK_QUEUE_USER_INTEGRAL}) //将生产者绑定到队列上
public void checkUserIntegrallistener(Message message, Channel channel) {
String jsonStr = JSONUtil.toJsonStr(new String(message.getBody()));
try {
SendUserIntegralDTO dto = JSONUtil.toBean(jsonStr, SendUserIntegralDTO.class);
integralService.addUserIntegral(dto);
} catch (BizException e) {
log.error("------------调整用户积分业务异常:", e);
} catch (Exception e) {
log.error("------------调整用户积分未捕获异常:", e);
} finally {
// 手动进行应答
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
log.error("------------用户获取积分消费者 ack 失败:", e);
}
}
}
}
配置类:
@Configuration
@Slf4j
public class RabbitMqConfig {
@Bean(MqConstant.CHECK_QUEUE_USER_INTEGRAL)
public Queue checkQueueUserIntegral(){
return new Queue(MqConstant.CHECK_QUEUE_USER_INTEGRAL);
}
@Bean
public Binding bindingTopicByCheckUserIntegral(
@Qualifier(MqConstant.CHECK_QUEUE_USER_INTEGRAL) Queue queue,
@Qualifier(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(MqConstant.CHECK_ROUTING_KEY_USER_INTEGRAL).noargs();
}
@Bean(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL)
public Exchange exchangeTopicByCheckUserIntegral(){
return ExchangeBuilder.topicExchange(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL).durable(true).build();
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)