1. 程式人生 > >Java 多執行緒工具類

Java 多執行緒工具類

一、CountDownLatch

CountDownLatch 可用於倒計數

getCount():獲取當前計數器剩餘計數

countDown():倒計數器釋放一次

await():用在多執行緒執行的後面,只有當 CountDownLatch 計數器全部釋放,及 getCount() == 0 時,才會喚醒,繼續執行

public class Test {

	public static void main(String[] args) throws InterruptedException {

		ExecutorService es = Executors.newFixedThreadPool(10);
		CountDownLatch latch = new CountDownLatch(5);
		while(latch.getCount() > 0) {
		//for(int i = 0; i < 2; i++) {
			es.execute(new Runnable() {
				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName());
				}
			});
			latch.countDown();
		}
		latch.await();
		es.shutdown();
	}

}

二、CyclicBarrier

用於迴圈執行,只有達到指定個數的執行緒 執行await() 之後,才會喚醒

await():run 中 await 達到指定數之後,才喚醒

await(long timeout, TimeUnit unit):可設定超時時間,超時後如果還沒有達到指定的 await() 數量,丟擲超時異常

有一點:如果await() 未達到指定數量,則會導致執行緒一直等待。此時不會釋放執行緒佔用

public class Test08 {

	public static void main(String[] args) throws InterruptedException {

		ExecutorService es = Executors.newFixedThreadPool(3);
		CyclicBarrier cb = new CyclicBarrier(3);
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("wait 【1】");
					System.out.println(Thread.currentThread().getName());
					cb.await();
					System.out.println(1);
				} catch (InterruptedException | BrokenBarrierException e) {
					e.printStackTrace();
				}
			}
		});
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("wait 【2】");
					System.out.println(Thread.currentThread().getName());
					cb.await();
					System.out.println(2);
				} catch (InterruptedException | BrokenBarrierException e) {
					e.printStackTrace();
				}
			}
		});
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("wait 【3】");
					System.out.println(Thread.currentThread().getName());
					cb.await();
					System.out.println(3);
				} catch (InterruptedException | BrokenBarrierException e) {
					e.printStackTrace();
				}
			}
		});
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("wait 【4】");
					System.out.println(Thread.currentThread().getName());
					cb.await();
					System.out.println(4);
				} catch (InterruptedException | BrokenBarrierException e) {
					e.printStackTrace();
				}
			}
		});
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("wait 【5】");
					System.out.println(Thread.currentThread().getName());
					cb.await();
					System.out.println(5);
				} catch (InterruptedException | BrokenBarrierException e) {
					e.printStackTrace();
				}
			}
		});

		for (int i = 0; i < 10; i++) {
			es.execute(new Runnable() {
				@Override
				public void run() {
					// 上面 4 5 佔用了兩個執行緒,所以執行緒池中只剩下一個執行緒可用
					System.out.println(Thread.currentThread().getName());
				}
			});
		}
		System.out.println("43412432432432");
		es.shutdown();
	}
}

三、Semaphore

Semaphore(int permits):permits 許可數目

Semaphore(int permits, boolean fair):fair 是否公平

acquire():獲取許可證 -- 如果當前沒有許可可以獲取,則阻塞等待

acquire(int permits):permits 許可數量

release():釋放許可證 -- 釋放前需要先有獲取的許可證,阻塞

release(int permits):permits 許可數量

有一點:如果指定數量的許可證都被佔用,則會阻塞,若要不阻塞,可使用 tryAcquire() 方法

public class Test08 {

	public static void main(String[] args) throws InterruptedException {

		ExecutorService es = Executors.newFixedThreadPool(5);
		Semaphore sh = new Semaphore(4);
		for (int i = 0; i < 10; i++) {
			es.execute(new Work(i, sh));
		}

		es.shutdown();
	}
}

class Work implements Runnable {
	private int worker;
	private Semaphore semaphore;

	public Work(int worker, Semaphore semaphore) {
		this.worker = worker;
		this.semaphore = semaphore;
	}

	@Override
	public void run() {
		try {
			semaphore.acquire(1);
			System.out.println("當前工作人員 【" + worker + "】" + " 獲取許可");
			semaphore.release(1);
			System.out.println("當前工作人員 【" + worker + "】" + " 釋放");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

四、Exchanger

進行兩個執行緒之間資料交換

exchange():如果呼叫交換方法的執行緒為單數,則會有一個執行緒進行等待,阻塞執行緒

public class Test08 {

	public static void main(String[] args) throws InterruptedException {

		ExecutorService es = Executors.newFixedThreadPool(5);
		Exchanger<String> change = new Exchanger<>();
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String s1 = "1";
					System.out.println(s1);
					String s2 = change.exchange(s1);
					System.out.println(s1 + " - " +s2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String s1 = "2";
					System.out.println(s1);
					String s2 = change.exchange(s1);
					System.out.println(s1 + " - " +s2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String s1 = "3";
					System.out.println(s1);
					String s2 = change.exchange(s1);
					System.out.println(s1 + " - " +s2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		es.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String s1 = "4";
					System.out.println(s1);
					String s2 = change.exchange(s1);
					System.out.println(s1 + " - " +s2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		
		es.shutdown();
	}
}