
消息队列 用户 消息发送消费 自研 rpcIO线程处理等 . 是多消息出入 单线程消费结构. (也支持多线程消费.但不是本代码的设计初衷)
大概流程
1.创建消息分组
2.添加一条消息
3.消息加入分组队列 并通知队列 执行消息
4.消息管理器获取消息运行
5.直到消息全部执行完毕 停止执行
6.等待消息管理器收到新的消息 再次运行
原版地址 https://blog.csdn.net/napcleon1/article/details/105402879
上一版存在一些问题
1.while (true) 循环
while(true)循环会一直占用资源导致服务器效率低下
2.单线程故障率高
3. 多任务模式下(类似kafka 的多个topic) 会创建多个while(true) 线程 及其占用资源.
下面直接上代码吧
首先写一个类里面有一个 线程安全的 list 用于存放消息
import com.github.niupengyu.core.exception.SysException;
import com.github.niupengyu.core.util.DateUtil;
import com.github.niupengyu.core.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
消息管理器 提供 内存消息队列的 创建 添加任务 等 *** 作
public class CommonMessageService {
Map multipleMessageServiceMap=new HashMap<>();
private Logger logger= LoggerFactory.getLogger(CommonMessageService.class);
private ExecutorService pools;
//创建一个线程池 默认 3个线程
public CommonMessageService(int corePoolSize){
pools=new ThreadPoolExecutor(corePoolSize,Integer.MAX_VALUE,0l, TimeUnit.SECONDS,
new linkedBlockingQueue<>());
}
public CommonMessageService(){
pools=new ThreadPoolExecutor(3,Integer.MAX_VALUE,0l, TimeUnit.SECONDS,
new linkedBlockingQueue<>());
}
public CommonMessageService(ExecutorService pools){
this.pools=pools;
}
//创建一个 分钟 类似创建kafka的topic
public void addGroup(String key,DataRunner dataRunner,String name,int maxQueues){
if(multipleMessageServiceMap.containsKey(key)){
return;
}
MultipleMessageService multipleMessageService=new MultipleMessageService(name,maxQueues,dataRunner,pools);
multipleMessageServiceMap.put(key,multipleMessageService);
}
//结束一个线程管理器(这个方法多少有点小问题.)
public void end() {
multipleMessageServiceMap.clear();
pools.shutdown();
}
// 添加一个消息到任务队列(不一定立即执行) 任务分组名称 数据
public void add(String key,Object data){
if(multipleMessageServiceMap.containsKey(key)){
multipleMessageServiceMap.get(key).add(data);
}else{
throw new SysException("任务分组不存在"+key);
}
}
//添加一个消息 并立即执行
public void addNow(String key,Object data){
if(multipleMessageServiceMap.containsKey(key)){
multipleMessageServiceMap.get(key).addNow(data);
}else{
throw new SysException("任务分组不存在"+key);
}
}
// 添加一个消息到分组队列 制定处理器
public void add(String key,DataRunner dataRunner,Object data){
if(multipleMessageServiceMap.containsKey(key)){
multipleMessageServiceMap.get(key).add(dataRunner,data);
}else{
throw new SysException("任务分组不存在"+key);
}
}
//添加一个消息 制定处理器 并立即执行
public void addNow(String key,DataRunner dataRunner,Object data){
if(multipleMessageServiceMap.containsKey(key)){
multipleMessageServiceMap.get(key).addNow(dataRunner,data);
}else{
throw new SysException("任务分组不存在"+key);
}
}
//添加一条消息并自动创建分组
public void addNew(String key,DataRunner dataRunner,String name,int maxQueues,Object data){
addGroup(key, dataRunner, name,maxQueues);
add(key,dataRunner, data);
}
//返回线程队列信息
public Map messageInfo(){
Map data=new HashMap<>();
for(Map.Entry entry:multipleMessageServiceMap.entrySet()){
MultipleMessageService multipleMessageService=entry.getValue();
data.put(entry.getKey(), StringUtil.createMap("size",multipleMessageService.size(),"stop",multipleMessageService.isStop()));
}
return data;
}
}
import com.github.niupengyu.core.exception.SysException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//数据管理类
public class DataManager implements Runnable{
private final Lock lock = new ReentrantLock();
private final Lock runLock = new ReentrantLock();
private Logger logger;
private String name;
private boolean stop=true;
private ExecutorService pools;
private DataQueues dataQueues;
private MessageBean messageBean;
private int maxQueues=0;
public DataManager() {
}
//初始化函数
public void init(String name, DataQueues dataQueues,int maxQueues,ExecutorService pools) {
this.name=name;
//this.dataRunner=dataRunner;
this.dataQueues=dataQueues;
this.maxQueues=maxQueues;
logger= LoggerFactory.getLogger(name);
this.pools=pools;
}
public void add(MessageBean messageobj) {
lock.lock();
try {
int size=dataQueues.messageSize();
while(maxQueues>0&&size>=maxQueues){
logger.debug("等待 {}/{} ",size,maxQueues);
Thread.sleep(500);
size=dataQueues.messageSize();
}
dataQueues.add(messageobj);
}catch(Exception e){
throw new SysException(e);
}finally{
lock.unlock();
}
}
public MessageBean getMessage(){
lock.lock();
MessageBean obj=null;
try {
obj=dataQueues.getMessage();
}catch(Exception e){
throw new SysException(e);
}finally{
lock.unlock();
}
return obj;
}
public int messageSize(){
lock.lock();
int size=0;
try {
size=dataQueues.messageSize();
}catch(Exception e){
throw new SysException(e);
}finally{
lock.unlock();
}
return size;
}
//判断任务是否执行完毕
public boolean isStop(){
runLock.lock();
try {
return stop;
}catch (Exception e){
throw new SysException(e);
}finally {
runLock.unlock();
}
}
//设置运行状态
public void setStop(boolean stop){
runLock.lock();
try {
this.stop = stop;
}catch (Exception e){
throw new SysException(e);
}finally {
runLock.unlock();
}
}
//处理多级实现类
@Override
public void run() {
while(true){
MessageBean messageBean=this.getMessage();
if(messageBean==null){
setStop(true);
return;
}
try {
messageBean.execute();
} catch (Exception e) {
logger.error("消息处理异常 ",messageBean.toString(),e);
}
}
}
//结束任务(这个多少有点问题)
public void end() {
//setStop(true);
pools.shutdown();
}
//开始消费任务队列
public void start() {
pools.execute(this);
}
//立即执行
public void addNow(MessageBean messageBean) {
pools.execute(new Runnable() {
@Override
public void run() {
try {
messageBean.execute();
} catch (Exception e) {
logger.error("消息处理异常 ",messageBean.toString(),e);
}
}
});
}
}
消息管理方法
import com.github.niupengyu.core.exception.SysException; import java.util.ArrayList; import java.util.List; public class DataQueues{ private List message=new ArrayList<>(); public DataQueues() { } public void add(T messageobj) { message.add(messageobj); } public void addList(List messageList) throws SysException { message.addAll(messageList); } public T getMessage(){ T obj=null; if(message.size()>0){ obj=message.get(0); message.remove(0); } return obj; } public List getMessageList(int size){ List list=null; int messageSize=message.size(); int length; if(messageSize>0){ if(messageSize>size){ length=size; list=new ArrayList<>(message.subList(0,length)); message=new ArrayList<>(message.subList(length,messageSize)); }else{ length=messageSize; list=new ArrayList<>(message.subList(0,length)); message=new ArrayList<>(); } } return list; } public int messageSize(){ int size=0; size=message.size(); return size; } }
处理器 需要实现的接口
public interface DataRunner{ void execute(T messageBean) throws Exception; }
封装消息实体
public class MessageBean{ private DataRunner dataRunner; private T value; public DataRunner getDataRunner() { return dataRunner; } public MessageBean(DataRunner dataRunner, T value) { this.dataRunner = dataRunner; this.value = value; } public void setDataRunner(DataRunner dataRunner) { this.dataRunner = dataRunner; } public T getValue() { return value; } public void setValue(T value) { this.value = value; } public void execute() throws Exception { dataRunner.execute(value); } @Override public String toString() { return "MessageBean{" + "dataRunner=" + dataRunner + ", value=" + value + '}'; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; //消息管理器服务 封装 public class MultipleMessageService{ //private MessageManager messageManager; private Logger logger= LoggerFactory.getLogger(MultipleMessageService.class); private DataManager dataManager=new DataManager(); private DataRunner dataRunner; // 创建消息队列 public MultipleMessageService(DataRunner dataRunner,String name){ this.init(name,dataRunner,new DataQueues(),0, Executors.newSingleThreadExecutor()); } public MultipleMessageService(int count,DataRunner dataRunner,String name){ this.init(name,dataRunner,new DataQueues(),0, Executors.newFixedThreadPool(count)); } public MultipleMessageService(int count,int maxQueues,DataRunner dataRunner,String name){ this.init(name,dataRunner,new DataQueues(),maxQueues, Executors.newFixedThreadPool(count)); } public MultipleMessageService(String name, int maxQueues, DataRunner dataRunner, ExecutorService pools){ this.init(name,dataRunner,new DataQueues(),maxQueues,pools); } private void init(String name, DataRunner dataRunner, DataQueues dataQueues,int maxQueues,ExecutorService pools) { this.dataManager=new DataManager(); this.dataRunner=dataRunner; dataManager.init(name,dataQueues,maxQueues,pools); } //结束 public void end() { dataManager.end(); } //添加 public void add(T o){ dataManager.add(new MessageBean(dataRunner,o)); if(dataManager.isStop()){ dataManager.setStop(false); dataManager.start(); } } //立即执行 public void addNow(T o){ dataManager.addNow(new MessageBean(dataRunner,o)); } //添加任务 并制定处理器 public void add(DataRunner dataRunner,T o){ dataManager.add(new MessageBean(dataRunner,o)); if(dataManager.isStop()){ dataManager.setStop(false); dataManager.start(); } } //立即执行 并制定处理器 public void addNow(DataRunner dataRunner,T o){ dataManager.addNow(new MessageBean(dataRunner,o)); } //查看当前队列长度 public int size(){ return dataManager.messageSize(); } //查看任务是否运行中 public boolean isStop(){ return dataManager.isStop(); } }
测试类
public static void main(String[] args) {
创建队列管理器
CommonMessageService commonMessageService=
new CommonMessageService();
添加分组 test 并添加工作线程实现类
commonMessageService.addGroup("test", new DataRunner() {
@Override
public void execute(String messageBean) throws Exception {
System.out.println(DateUtil.dateFormat()+" "+messageBean);
Random random=new Random();
int i=random.nextInt(1);
Thread.sleep(1000);
}
},"test",0);
// 测试 给test中添加 10条消息
for(int i=0;i<10;i++){
commonMessageService.add("test","消息 "+i);
}
}
下面是运行结果
欢迎交流沟通
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)