1. 程式人生 > >【JAVA】簡單實現一個阻塞任務佇列

【JAVA】簡單實現一個阻塞任務佇列

package p18.juc;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

public class
MyBlockingTaskQueen<T> {
private ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); private Lock lock = new ReentrantLock(); private Condition cond = lock.newCondition(); private int taskCapacity=10; public MyBlockingTaskQueen(int taskCapacity){ this
.taskCapacity = taskCapacity; } /* * 新增任務 */ public void put(T task) { try{ lock.lock(); while(queue.size()>taskCapacity) { System.out.println(Thread.currentThread().getName()+":任務數量達到上限,當前執行緒被掛起"); cond.await(); } queue.add(task); cond.signalAll(); } catch
(InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } /* * 處理任務 */ public void handle(Consumer<T>cons) { try { lock.lock(); while(queue.size()<=0) { System.out.println(Thread.currentThread().getName()+":當前沒有任務,執行緒被掛起"); cond.await(); } cons.accept(queue.poll()); cond.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } /* * 測試 */ public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); MyBlockingTaskQueen<Task> queue = new MyBlockingTaskQueen<>(1); /* * 模擬兩個新增任務的執行緒,共新增2000條任務 */ Thread supplier1 = new Thread(()->{ for (int i = 0; i < 1000; i++) { queue.put(new Task("task"+System.currentTimeMillis())); } System.out.println("==============="); },"生產者1"); Thread supplier2 = new Thread(()->{ for (int i = 0; i < 1000; i++) { queue.put(new Task("task"+System.currentTimeMillis())); } System.out.println("======================"); },"生產者2"); /* * 模擬兩個處理任務的執行緒,每個執行緒分1000條 */ Thread consumer = new Thread(()->{ for (int i = 0; i < 1000; i++) { queue.handle((task)->System.out.println(Thread.currentThread().getName()+":處理"+task)); } System.out.println("c1執行結束"); latch.countDown(); },"消費者1"); Thread consumer2 = new Thread(()->{ for (int i = 0; i < 1000; i++) { queue.handle((task)->System.out.println(Thread.currentThread().getName()+":處理"+task)); } System.out.println("c2執行結束"); System.out.println(consumer.getState()); latch.countDown(); },"消費者2"); long startT = System.currentTimeMillis(); consumer.start(); consumer2.start(); supplier1.start(); supplier2.start(); latch.await(); long endT = System.currentTimeMillis(); System.out.println("耗費時間:"+(endT-startT)); System.out.println("c1:"+consumer.getState()); System.out.println("c2:"+consumer2.getState()); System.out.println("s1:"+supplier1.getState()); System.out.println("s2:"+supplier2.getState()); System.out.println("佇列剩餘數量:"+queue.size()); } } class Task{ private String name; public Task(String name) { this.name=name; } @Override public String toString() { return "Task [name=" + name + "]"; } }