
上一篇文章有讲到rabbitmq的安装、web管理端和springboot简单集成rabbitmq
本文重点介绍rabbitmq相关api的使用
按照官网常用的五种模式的顺序:HelloWorld、Work queues、Publish/Subscribe、Routing、Topics
模式简单介绍 HelloWorld一个demo,实际很少使用。
Work queues在多个消费者之间分配任务,竞争消费模式。
Publish/Subscribe发布订阅模式,同时向多个消费者发送消息。
Routing选择性的接收消息
Topics基于表达式接收消息
模式具体使用(rabbitmqclient) HelloWorld创建maven项目并且引入依赖
com.rabbitmq amqp-client5.9.0 junit junit4.13.2
创建工具类,用于处理连接和信道的创建,以及他们的关闭
package org.cc;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
public static Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
// connectionFactory.setHost("localhost");//默认主机:localhost
// connectionFactory.setPort(5672);//默认端口5672
// connectionFactory.setUsername("guest");//默认用户名:guest
// connectionFactory.setPassword("guest");//默认密码:guest
// connectionFactory.setVirtualHost("/");//默认虚拟主机:/
return connectionFactory.newConnection();
}
public static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
channel.close();
connection.close();
}
}
创建消费者
package org.cc;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloWorldConsumer {
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
//若队列已存在,这些参数必须与队列一致,若队列不存在则创建
channel.queueDeclare("my helloworld queue",true,false,false,null);
channel.basicConsume("my helloworld queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("helloworld consumer接收到消息:"+new String(body));
}
});
System.in.read();//保持消费者一直监听队列
}
}
创建生产者
package org.cc;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class HelloWorldProducer {
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
//若队列已存在,这些参数必须与队列一致,若队列不存在则创建
channel.queueDeclare("my helloworld queue",true,false,false,null);
channel.basicPublish("","my helloworld queue",null,"helloworld消息内容".getBytes(StandardCharsets.UTF_8));
ConnectionUtils.closeConnection(connection,channel);
}
}
若要保证rabbitmq重启后消息仍然存在,生产者发送消息时需要设置props参数
channel.basicPublish("","my helloworld queue", MessageProperties.PERSISTENT_TEXT_PLAIN,"helloworld消息内容".getBytes(StandardCharsets.UTF_8));
开启手动ack,消费者接收到消息时,需要手动发送ack确认后消息才会真正从队列中删除
channel.basicConsume("my helloworld queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("helloworld consumer接收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
Work queues
创建消费者,与上面helloworld模式代码基本一致,将原有的创建消费者的代码重复一遍
package org.cc;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloWorldConsumer {
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
//若队列已存在,这些参数必须与队列一致,若队列不存在则创建
channel.queueDeclare("my helloworld queue",true,false,false,null);
channel.basicConsume("my helloworld queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("helloworld consumer接收到消息:"+new String(body));
}
});
System.in.read();//保持消费者一直监听队列
}
}
创建生产者,与上面helloworld模式代码基本一致,这里连续发送10条消息
package org.cc;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WorkProducer {
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("my work queue",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("","my work queue", MessageProperties.PERSISTENT_TEXT_PLAIN,("work消息内容"+i).getBytes(StandardCharsets.UTF_8));
}
ConnectionUtils.closeConnection(connection,channel);
}
}
从消费者的控制台可以看到两个消费者轮流接收到消息
Publish/Subscribe消费者
package org.cc;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Subscriber {
@Test
public void receive() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout exchange", BuiltinExchangeType.FANOUT);
channel.queueDeclare("my fanout queue1",true,false,false,null);
channel.queueBind("my fanout queue1","fanout exchange","");
channel.basicConsume("my fanout queue1",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("my fanout queue1 consumer接收到消息:"+new String(body));
}
});
Connection connection1 = ConnectionUtils.createConnection();
Channel channel1 = connection1.createChannel();
channel1.queueDeclare("my fanout queue2",true,false,false,null);
channel.queueBind("my fanout queue2","fanout exchange","");
channel1.basicConsume("my fanout queue2",true,new DefaultConsumer(channel1){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("my fanout queue2 consumer接收到消息:"+new String(body));
}
});
System.in.read();//保持消费者一直监听队列
}
}
生产者
package org.cc;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Publisher {
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout exchange", BuiltinExchangeType.FANOUT);
channel.basicPublish("fanout exchange","", MessageProperties.PERSISTENT_TEXT_PLAIN,"fanout exchange消息内容".getBytes(StandardCharsets.UTF_8));
ConnectionUtils.closeConnection(connection,channel);
}
}
队列需要同交换机绑定,生产者向交换机发送消息
Routing消费者
package org.cc;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RoutingKeyConsumer {
@Test
public void receive() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("my direct queue1",true,false,false,null);
channel.queueBind("my direct queue1","direct exchange","info");
channel.basicConsume("my direct queue1",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("my direct queue1 consumer接收到消息:"+new String(body));
}
});
Connection connection1 = ConnectionUtils.createConnection();
Channel channel1 = connection1.createChannel();
channel1.queueDeclare("my direct queue2",true,false,false,null);
channel.queueBind("my direct queue2","direct exchange","info");
channel.queueBind("my direct queue2","direct exchange","error");
channel1.basicConsume("my direct queue2",true,new DefaultConsumer(channel1){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("my direct queue2 consumer接收到消息:"+new String(body));
}
});
System.in.read();//保持消费者一直监听队列
}
}
生产者
package org.cc;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RoutingProducer {
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct exchange", BuiltinExchangeType.DIRECT);
channel.basicPublish("direct exchange","info", MessageProperties.PERSISTENT_TEXT_PLAIN,"direct exchange info消息内容".getBytes(StandardCharsets.UTF_8));
channel.basicPublish("direct exchange","error", MessageProperties.PERSISTENT_TEXT_PLAIN,"direct exchange error消息内容".getBytes(StandardCharsets.UTF_8));
ConnectionUtils.closeConnection(connection,channel);
}
}
Topics
交换机路由消息给队列时基于表达式,*匹配1个,#配置0个或1个或多个
例如:当队列1的路由值设置user.*,队列2的路由值设置user.#时,向交换机分别发送四条消息,消息的路由值分别为user.insert、user.insert.a、user.、user
此时队列1会收到路由值为user.insert和user.的消息,队列1能收到上面全部四条消息
消费者代码
package org.cc;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicsConsumer {
@Test
public void receive() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic exchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("my topic queue1",true,false,false,null);
channel.queueBind("my topic queue1","topic exchange","user.*");
channel.basicConsume("my topic queue1",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("my topic queue1 consumer接收到消息:"+new String(body));
}
});
Connection connection1 = ConnectionUtils.createConnection();
Channel channel1 = connection1.createChannel();
channel1.queueDeclare("my topic queue2",true,false,false,null);
channel.queueBind("my topic queue2","topic exchange","user.#");
channel1.basicConsume("my topic queue2",true,new DefaultConsumer(channel1){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("my topic queue2 consumer接收到消息:"+new String(body));
}
});
System.in.read();//保持消费者一直监听队列
}
}
生产者代码
package org.cc;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class TopicsProducer {
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.createConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic exchange", BuiltinExchangeType.TOPIC);
channel.basicPublish("topic exchange","user.insert", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.insert消息内容".getBytes(StandardCharsets.UTF_8));
channel.basicPublish("topic exchange","user.insert.a", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.insert.a消息内容".getBytes(StandardCharsets.UTF_8));
channel.basicPublish("topic exchange","user.", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.消息内容".getBytes(StandardCharsets.UTF_8));
channel.basicPublish("topic exchange","user", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user消息内容".getBytes(StandardCharsets.UTF_8));
ConnectionUtils.closeConnection(connection,channel);
}
}
模式具体使用(springboot集成rabbitmq)
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)