
- ①. SpringBoot案例 - 发布与订阅模式
- ②. SpringBoot案例 - 路由模式
- ③. SpringBoot案例 - 通配符模式
-
①. 生产和消费者工程如下
-
②. 导入依赖
org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-web
- ③. 编写yaml(生产者和消费者一样)
server:
port: 8080
spring:
rabbitmq:
host: 139.198.169.136
port: 5672
virtual-host: /myvitrualhost
username: tang
password: 9602111022yxTZ@
- ④. 生产者配置文件如下
@Configuration
public class FanoutRabbitConfig {
//1. 声明交换机
@Bean
public FanoutExchange fanoutOrderExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}
//2. 声明队列
@Bean
public Queue emailQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
return new Queue("weixin.fanout.queue", true);
}
//3. 将队列和交换机绑定
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
public Binding bindingEmail() {
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding bindingSms() {
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding bindingWeixin() {
return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
}
}
- ⑤. service代码以及启动后的效果如下
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//1. 发布与订阅模式
public void makeOrder(Long userId, Long productId) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend("fanout_order_exchange", "", orderNumer);
}
}
@SpringBootTest
class ProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder(1L,1L);
}
}
- ⑥. 三个消费者代码如下
@Service
@RabbitListener(queues = {"email.fanout.queue"})
public class EmailService {
@RabbitHandler
public void messageRevice(String message){
System.out.println("email----------"+message);
}
}
@Component
@RabbitListener(queues = {"sms.fanout.queue"})
public class SmsService {
@RabbitHandler
public void messageRevice(String message){
System.out.println("SMS----------"+message);
}
}
@Component
@RabbitListener(queues = {"weixin.fanout.queue"})
public class WeixinService {
@RabbitHandler
public void messageRevice(String message){
System.out.println("Weixin----------"+message);
}
}
- ⑦. 先启动生产者,后启动消费者,可以看到如下演示:
- ①. 生产者代码如下
@Configuration
public class DirectRabbitConfig {
//1.声明交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct_order_exchange",true,false);
}
//2.声明队列
@Bean
public Queue emailDirectQueue() {
return new Queue("email.direct.queue", true);
}
@Bean
public Queue smsDirectQueue() {
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue weChatDirectQueue() {
return new Queue("weChat.direct.queue", true);
}
//3.队列和交换机绑定
@Bean
public Binding bingDirectEmail(){
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
}
@Bean
public Binding bingDirectSms(){
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding bindDirectWeChat(){
return BindingBuilder.bind(weChatDirectQueue()).to(directExchange()).with("weChat");
}
}
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//2. direct模式
public void makeDirectOrder(Long userId, Long productId) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ Sms和微信发送消息
rabbitTemplate.convertAndSend("direct_order_exchange", "sms", orderNumer);
rabbitTemplate.convertAndSend("direct_order_exchange", "weChat", orderNumer);
}
}
// Direct模式
@Test
public void DirectTest(){
orderService.makeDirectOrder(1L,1L);
}
- ②. 消费者代码如下
@Service
@RabbitListener(queues = {"email.direct.queue"})
public class EmailService {
@RabbitHandler
public void messageDirectRevice(String message){
System.out.println("email direct----------"+message);
}
}
@Component
@RabbitListener(queues = {"sms.direct.queue"})
public class SmsService {
@RabbitHandler
public void messageDirectRevice(String message){
System.out.println("SMS direct----------"+message);
}
}
@Component
@RabbitListener(queues = {"weChat.direct.queue"})
public class WeixinService {
@RabbitHandler
public void messageDirectRevice(String message){
System.out.println("weChat direct----------"+message);
}
}
③. SpringBoot案例 - 通配符模式
- ①. 生产者代码如下
//3. topic模式
public void makeTopicOrder(Long userId, Long productId){
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ Sms和微信发送消息
// 匹配规则:#.sms.#、#.weChat.#
rabbitTemplate.convertAndSend("topic_order_exchange", "weChat.sms", orderNumer);
}
- ②. 消费者使用注解的方式替换配置类
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "email.topic.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "topic_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.TOPIC),
key = "#.email.#"
))
@Component
public class TopicEmailConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("email------topic模式:"+message);
}
}
@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "topic_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicSmsConsumer{
@RabbitHandler
public void reviceMessage(String message){
System.out.println("sms------topic模式:"+message);
}
}
@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "weChat.topic.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "topic_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicWeChatConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("weChat------topic模式:"+message);
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)