
1.创建namesrv服务
拉取镜像
docker pull rocketmqinc/rocketmq
创建nameServer存储路径
mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
构建namesrv容器
docker run -d --restart=always --name rmqnamesrv -p 9876:9876 -v /docker/rocketmq/data/namesrv/logs:/root/logs -v /docker/rocketmq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
2.创建broker节点
创建broker数据存储路径
mkdir -p /docker/rocketmq/data/broker/logs /docker/rocketmq/data/broker/store /docker/rocketmq/conf
创建配置文件
vi /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的主从关系 brokerName = broker-a #0表示Master,大于0表示不同的slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH #设置broker节点所在服务器的ip地址(公网IP) brokerIP1 = 192.168.52.136
构建broker容器
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
3.创建rockermq-console服务
拉取镜像
docker pull pangliang/rocketmq-console-ng
构建rockermq-console容器
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 9999:8080 pangliang/rocketmq-console-ng IP为内网IP
4.开放端口
5.测试
6.编写代码
引入maven依赖
org.apache.rocketmq rocketmq-client4.5.1
生产者
public class SyncProducer {
public static void main(String[] args) throws Exception {
final DefaultMQProducer producer=new DefaultMQProducer("test_producer");
//这里需要设置NameServer地址
producer.setNamesrvAddr("101.43.12.115:9876");
producer.start();
for (int i = 0; i < 10; i++) {
new Thread(){
@Override
public void run() {
while (true){
try {
Message message=new Message("TopicTest","TagA",("Test").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
}
while (true){
Thread.sleep(1000);
}
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("test_consumer");
//这里需要设置NameServer地址
consumer.setNamesrvAddr("101.43.12.115:9876");
//订阅Topic,你要消费哪些Topic的消息
consumer.subscribe("TopicTest","*");
//这里注册一个回掉接口,去接收获取到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
结果显示
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)