
接上一篇测试
- 在publisher服务定义测试方法,每秒产生50条消息,发送到simple.queue。
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列。
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息。
创建生产者
package com.yy.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName="simple.queue";
String message="hello,string amqp!";
rabbitTemplate.convertAndSend(queueName,message);
}
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName="simple.queue";
String message="hello,message__";
for (int i = 1; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName,message + i);
Thread.sleep(20);
}
}
}
创建消费者
package com.yy.comsumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalTime;
@Component
//声明为bean,让Spring能够找到
public class SpringRabbitListener {
// @RabbitListener(queues = "simple.queue")
// //告诉spring要监听了队列
// public void listenerSimpleQueueMessage(String msg) throws InterruptedException{
// //String 是列表发的什么类型,此处就对应定义为什么消息
// System.out.println("Spring 消费者接收到消:【" + msg +"】");
// }
@RabbitListener(queues = "simple.queue")
//告诉spring要监听了队列
public void listenerWorkQueue(String msg) throws InterruptedException{
//String 是列表发的什么类型,此处就对应定义为什么消息
System.out.println("消费者1 接收到消:【" + msg +"】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
//告诉spring要监听了队列
public void listenerWorkQueue2(String msg) throws InterruptedException{
//String 是列表发的什么类型,此处就对应定义为什么消息
System.err.println("消费者2.......接收到消:【" + msg +"】" + LocalTime.now());
//err,标红方便查看
Thread.sleep(200);
}
}
结果并不是消费强的1消费得多,而是一样多。这是因为消息预取了。
修改application.yml中的preFetch值,控制预取值上限。
spring:
rabbitmq:
host: 192.168.0.106 # rabbitM的IP
port: 5672
username: yymq
password: 123456@
virtual-host: /
listener:
simple:
prefetch: 1 #每次只能取一条消息,处理完成才能获取下一个消息
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
最终达到能者多劳。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)