Work Queue

Work Queue,第1张

Work Queue

接上一篇测试

  1. 在publisher服务定义测试方法,每秒产生50条消息,发送到simple.queue。
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列。
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息。

创建生产者

package com.yy.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue(){
        String queueName="simple.queue";
        String message="hello,string amqp!";
        rabbitTemplate.convertAndSend(queueName,message);
    }

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
        String queueName="simple.queue";
        String message="hello,message__";
        for (int i = 1; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName,message + i);
            Thread.sleep(20);
        }

    }

}

创建消费者

package com.yy.comsumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalTime;

@Component
//声明为bean,让Spring能够找到
public class SpringRabbitListener {

//    @RabbitListener(queues = "simple.queue")
//    //告诉spring要监听了队列
//    public void listenerSimpleQueueMessage(String msg) throws InterruptedException{
//        //String 是列表发的什么类型,此处就对应定义为什么消息
//        System.out.println("Spring 消费者接收到消:【" + msg +"】");
//    }

    @RabbitListener(queues = "simple.queue")
    //告诉spring要监听了队列
    public void listenerWorkQueue(String msg) throws InterruptedException{
        //String 是列表发的什么类型,此处就对应定义为什么消息
        System.out.println("消费者1 接收到消:【" + msg +"】" + LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    //告诉spring要监听了队列
    public void listenerWorkQueue2(String msg) throws InterruptedException{
        //String 是列表发的什么类型,此处就对应定义为什么消息
        System.err.println("消费者2.......接收到消:【" + msg +"】" + LocalTime.now());
       //err,标红方便查看
        Thread.sleep(200);
    }


}

结果并不是消费强的1消费得多,而是一样多。这是因为消息预取了。

修改application.yml中的preFetch值,控制预取值上限。

spring:
  rabbitmq:
    host: 192.168.0.106 # rabbitM的IP
    port: 5672
    username: yymq
    password: 123456@
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能取一条消息,处理完成才能获取下一个消息
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS


最终达到能者多劳。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/zaji/5677384.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-16
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存