Java 多執行緒工具類
阿新 • • 發佈:2019-02-17
一、CountDownLatch
CountDownLatch 可用於倒計數
getCount():獲取當前計數器剩餘計數
countDown():倒計數器釋放一次
await():用在多執行緒執行的後面,只有當 CountDownLatch 計數器全部釋放,及 getCount() == 0 時,才會喚醒,繼續執行
public class Test { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(5); while(latch.getCount() > 0) { //for(int i = 0; i < 2; i++) { es.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); latch.countDown(); } latch.await(); es.shutdown(); } }
二、CyclicBarrier
用於迴圈執行,只有達到指定個數的執行緒 執行await() 之後,才會喚醒
await():run 中 await 達到指定數之後,才喚醒
await(long timeout, TimeUnit unit):可設定超時時間,超時後如果還沒有達到指定的 await() 數量,丟擲超時異常
有一點:如果await() 未達到指定數量,則會導致執行緒一直等待。此時不會釋放執行緒佔用
public class Test08 { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(3); CyclicBarrier cb = new CyclicBarrier(3); es.execute(new Runnable() { @Override public void run() { try { System.out.println("wait 【1】"); System.out.println(Thread.currentThread().getName()); cb.await(); System.out.println(1); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); es.execute(new Runnable() { @Override public void run() { try { System.out.println("wait 【2】"); System.out.println(Thread.currentThread().getName()); cb.await(); System.out.println(2); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); es.execute(new Runnable() { @Override public void run() { try { System.out.println("wait 【3】"); System.out.println(Thread.currentThread().getName()); cb.await(); System.out.println(3); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); es.execute(new Runnable() { @Override public void run() { try { System.out.println("wait 【4】"); System.out.println(Thread.currentThread().getName()); cb.await(); System.out.println(4); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); es.execute(new Runnable() { @Override public void run() { try { System.out.println("wait 【5】"); System.out.println(Thread.currentThread().getName()); cb.await(); System.out.println(5); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); for (int i = 0; i < 10; i++) { es.execute(new Runnable() { @Override public void run() { // 上面 4 5 佔用了兩個執行緒,所以執行緒池中只剩下一個執行緒可用 System.out.println(Thread.currentThread().getName()); } }); } System.out.println("43412432432432"); es.shutdown(); } }
三、Semaphore
Semaphore(int permits):permits 許可數目
Semaphore(int permits, boolean fair):fair 是否公平
acquire():獲取許可證 -- 如果當前沒有許可可以獲取,則阻塞等待
acquire(int permits):permits 許可數量
release():釋放許可證 -- 釋放前需要先有獲取的許可證,阻塞
release(int permits):permits 許可數量
有一點:如果指定數量的許可證都被佔用,則會阻塞,若要不阻塞,可使用 tryAcquire() 方法
public class Test08 { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(5); Semaphore sh = new Semaphore(4); for (int i = 0; i < 10; i++) { es.execute(new Work(i, sh)); } es.shutdown(); } } class Work implements Runnable { private int worker; private Semaphore semaphore; public Work(int worker, Semaphore semaphore) { this.worker = worker; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(1); System.out.println("當前工作人員 【" + worker + "】" + " 獲取許可"); semaphore.release(1); System.out.println("當前工作人員 【" + worker + "】" + " 釋放"); } catch (InterruptedException e) { e.printStackTrace(); } } }
四、Exchanger
進行兩個執行緒之間資料交換
exchange():如果呼叫交換方法的執行緒為單數,則會有一個執行緒進行等待,阻塞執行緒
public class Test08 {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(5);
Exchanger<String> change = new Exchanger<>();
es.execute(new Runnable() {
@Override
public void run() {
try {
String s1 = "1";
System.out.println(s1);
String s2 = change.exchange(s1);
System.out.println(s1 + " - " +s2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.execute(new Runnable() {
@Override
public void run() {
try {
String s1 = "2";
System.out.println(s1);
String s2 = change.exchange(s1);
System.out.println(s1 + " - " +s2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.execute(new Runnable() {
@Override
public void run() {
try {
String s1 = "3";
System.out.println(s1);
String s2 = change.exchange(s1);
System.out.println(s1 + " - " +s2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.execute(new Runnable() {
@Override
public void run() {
try {
String s1 = "4";
System.out.println(s1);
String s2 = change.exchange(s1);
System.out.println(s1 + " - " +s2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.shutdown();
}
}