
1.通过事务机制
RabbitMQ的事务机制 *** 作过程与事务型数据库有些类似:
1.channel.txSelect()用于开启事务
2.channel.txCommit()用于提交事务
3.channel.txRollback()用于回滚事务
--------------------------------------------------------------------------------
示例:
try {
channel.txSelect();
String message = "";
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
channel.txCommite();
}catch (Exception e) {
e.printStackTrace();
channel.toRollback();
}
-----------------------------------------------------详细例子--------------------------------------------
package com.dfyang.rabbitmq.tx;
import com.dfyang.rabbitmq.RabbitConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
public class TXProducer {
private static final String EXCHANGE_NAME = "tx.exchange";
private static final String QUEUE_NAME = "tx.queue";
private static final String ROUTING_KEY = "tx";
public static void main(String[] args) throws Exception {
Connection connection = RabbitConnectionFactory.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "test!";
try {
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
channel.close();
connection.close();
}
}
1.客户端发送Tx.Select 将信道置为事务
2.Broker回复Tx.Select-Ok 确认已将信道置为事务模式
3.在发送完消息之后,客户端发送Tx.Commit提交事务
4.Broker回复Tx.Commit-Ok确认事务提交。
2.生产者/confirm/i机制
1.单条/confirm/i(发送一条等待确认一条)
channel./confirm/iSelect();//将信道置为/confirm/i
String message = "";
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAN,message.getBytes());
if(!channel.waitFor/confirm/is()){
System.out.println("消息发送失败");
}
System.out.println("消息发送成功");
单条/confirm/i模式的效率仅仅比事务模式高一点,这种模式是阻塞的。
2.批量confirm
批量/confirm/i模式就是先开启/confirm/i模式,发送多条之后再调用waitFor/confirm/is()方法确认,这样发送多条之后才会等待一次确认消息,效率比单条/confirm/i模式高了许多。但是如果返回false或者超时,这一批次的消息就要全部重发,如果经常丢消息,效率并不比单条/confirm/i高。。
-----------------------------------详细代码--------------------------------
package com.dfyang.rabbitmq.tx;
import com.dfyang.rabbitmq.RabbitConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.util.Queue;
public class TXProducer {
private static final String EXCHANGE_NAME = "tx.exchange";
private static final String QUEUE_NAME = "tx.queue";
private static final String ROUTING_KEY = "tx";
public static void main(String[] args) throws Exception {
Connection connection = RabbitConnectionFactory.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "test!";
channel./confirm/iSelect();
for (int i = 0; i < 10000; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
channel.waitFor/confirm/isOrDie();
channel.close();
connection.close();
}
}
3.异步/confirm/i模式:采用异步模式将不用阻塞等待borker服务器确认接收到消息就可以继续发送消息
package com.springrabbitmq.comfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iListener;
import com.rabbitmq.client.Connection;
import com.springrabbitmq.util.RabbitMQConnectionUtil;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeoutException;
public class Send3 {
private static final String QUEUE_NAME="test_queue_/confirm/i1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//开启/confirm/i模式
//注意已经定义队列为AMQP的事务机制的话,就不能再改成/confirm/i
channel./confirm/iSelect();
//未确认的消息标识
final SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet());
///confirm/i监听
channel.add/confirm/iListener(new /confirm/iListener() {
//没有问题的handleAck,从un/confirm/ied集合里移除元素表示确认收到了
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
System.out.println("------handleAck----multiple------");
//multiple为true时,移除(deliveryTag+1)之前的多个元素
/confirm/iSet.headSet(deliveryTag+1).clear();
}else {
System.out.println("------handleAck----multiple------false");
//multiple为false时,移除一个
/confirm/iSet.remove(deliveryTag);
}
}
//RabbitMQ异常时没有收到消息,/confirm/i会回执一条Nack给生产者
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
System.out.println("------handleNack----multiple------");
//multiple为true时,移除(deliveryTag+1)之前的多个元素
/confirm/iSet.headSet(deliveryTag+1).clear();
}else {
System.out.println("------handleNack----multiple------false");
//multiple为false时,移除一个
/confirm/iSet.remove(deliveryTag);
}
}
});
String msg="Hello Confirm Message!![异步]";
for (int i=0;i<20;i++){
//channel为每次发布的消息指派一个ID
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("Message Send Success!"+i);
//每次发布后将ID添加到un/confirm/ied未确定是否发送成功的集合Set中
/confirm/iSet.add(seqNo);
System.out.println(/confirm/iSet.size());
}
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)