
该文章代码来自黑马程序员全面深入学习Java并发编程,JUC并发编程全套教程_哔哩哔哩_bilibili
阻塞队列public class BlockingQueue线程池{ // 任务队列 private Queue queue = new linkedList (); // 锁 private ReentrantLock lock = new ReentrantLock(); // 生产者队列 private Condition fullWaitSet = lock.newCondition(); // 消费者队列 private Condition emptyWaitSet = lock.newCondition(); // 队列的最大容量 private int maxNum; public BlockingQueue(int maxNum){ this.maxNum = maxNum; } // 带超时时间的获取任务 public T poll(long timeout, TimeUnit timeUnit){ lock.lock(); try{ // 将timeout统一转换成纳秒 long nanos = timeUnit.tonanos(timeout); while(queue.isEmpty()){ // 返回的是剩余时间 if(nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } T task = (T) queue.poll(); fullWaitSet.signal(); return task; } catch (Exception e){ e.printStackTrace(); return null; } finally { lock.unlock(); } } // 消费者获取任务 public T take() { lock.lock(); try{ while(queue.isEmpty()){ emptyWaitSet.await(); } T task = (T) queue.poll(); fullWaitSet.signal(); return task; } catch (Exception e){ e.printStackTrace(); return null; } finally { lock.unlock(); } } // 生产者添加任务 public void put(T task) { lock.lock(); try{ while(queue.size() == maxNum){ fullWaitSet.await(); } queue.add(task); emptyWaitSet.signal(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } // 带超时时间的阻塞添加 public boolean offer(T task, long timeout, TimeUnit timeUnit){ lock.lock(); try{ long nanos = timeUnit.tonanos(timeout); while(queue.size() == maxNum){ if(nanos <= 0){ return false; } nanos = fullWaitSet.awaitNanos(nanos); } queue.add(task); emptyWaitSet.signal(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } return true; } // 拒绝策略 public void tryPut(RejectPolicy rejctPolicy, T task){ lock.lock(); try{ // 判断队列是否已满 if(queue.size() == maxNum){ System.out.println("[" + Thread.currentThread().getName() + "]" + "执行拒绝策略"); rejctPolicy.reject(this, task); } else { System.out.println("[" + Thread.currentThread().getName() + "]" + "加入任务队列"); queue.add(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } // 返回阻塞队列中的任务数 public int size(){ lock.lock(); try{ return queue.size(); } finally { lock.unlock(); } } }
public class ThreadPool {
// 阻塞队列
private BlockingQueue taskQueue;
// 线程集合
private HashSet workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueNum, RejectPolicy rejctPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueNum);
this.rejectPolicy = rejctPolicy;
}
// 执行任务
public void execute(Runnable task){
synchronized (workers) {
// 当任务数没有超过coreSize的时候,交给Worker对象执行
// 当任务数超过了coreSize,将其假如任务队列
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println("[" + Thread.currentThread().getName() + "]" + "新增worker");
workers.add(worker);
worker.start();
} else {
//System.out.println("[" + Thread.currentThread().getName() + "]" + "加入到任务队列或执行拒绝策略");
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当task不为空,执行task
// 2) 当task执行完毕,看看任务队列是否为空,如果不为空就取出并执行
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null){
try{
System.out.println("["+Thread.currentThread().getName()+"]"+"正在执行任务");
task.run();
} catch (Exception e){
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers){
System.out.println("["+Thread.currentThread().getName()+"]"+"任务执行完毕,将worker移除");
workers.remove(this);
}
}
}
}
测试类
public class Test {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(
2,
1000,
TimeUnit.MICROSECONDS,
5,
(queue, task) -> {
queue.put(task);
});
for(int i=0; i<15; i++){
int j = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("第"+j+"个task执行");
}
});
}
}
}
执行结果
[main]新增worker [main]新增worker [main]加入任务队列 [main]加入任务队列 [Thread-1]正在执行任务 [main]加入任务队列 [Thread-0]正在执行任务 第0个task执行 第1个task执行 [main]加入任务队列 [main]加入任务队列 [main]执行拒绝策略 [Thread-0]正在执行任务 [Thread-1]正在执行任务 [main]加入任务队列 第2个task执行 [main]执行拒绝策略 第3个task执行 [main]执行拒绝策略 [Thread-0]正在执行任务 第4个task执行 [Thread-1]正在执行任务 第5个task执行 [main]执行拒绝策略 [Thread-0]正在执行任务 第6个task执行 [Thread-1]正在执行任务 第7个task执行 [main]加入任务队列 [main]执行拒绝策略 [Thread-0]正在执行任务 [main]加入任务队列 [Thread-1]正在执行任务 第9个task执行 第8个task执行 [Thread-1]正在执行任务 第10个task执行 [Thread-0]正在执行任务 第11个task执行 [Thread-1]正在执行任务 第12个task执行 [Thread-0]正在执行任务 [Thread-1]正在执行任务 第13个task执行 第14个task执行 [Thread-1]任务执行完毕,将worker移除 [Thread-0]任务执行完毕,将worker移除
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)