1. 程式人生 > >Java處理多執行緒併發相關類

Java處理多執行緒併發相關類

    自從JDK5釋出以來,在java.util.concurrent包中提供了一些非常有用的輔助類來幫助我們進行併發程式設計,下面就介紹一下這些輔助類中的Semaphore、CyclicBarrier、CountDownLatch以及Exchanger的相關用法。

一、Semaphore

    Semaphore是計數訊號量,是作業系統中的一個概念,經常用於限制獲取某種資源的執行緒數量,在new 這個類的時候需要給這個類傳遞一個引數permits,這個引數是整數型別,這個引數的意思是同一時間內,最多允許多少個執行緒同時執行acquire方法和release方法之間的程式碼,如果方法acquire沒有引數則預設是一個許可。例如售票視窗,假如有兩個視窗,有三十個使用者需要買票,控制併發進行程式設計,程式碼如下:


public class SemaphoreTest {
	
	class SemaphoreRunnable implements Runnable{
		private Semaphore semaphore;
		private int user;
		public SemaphoreRunnable(Semaphore semaphore, int user) {
			this.semaphore = semaphore;
			this.user = user;
		}
		@Override
		public void run() {
			try {
				//獲取semaphore並且釋放
				semaphore.acquire();
				System.out.println("使用者"+user+"進入視窗買票。。。");
				Thread.sleep((long)(Math.random()*10000));
				System.out.println("使用者"+user+"已經買票。。。");
				Thread.sleep((long)(Math.random()*10000));
				System.out.println("使用者"+user+"離開視窗。。。");
				semaphore.release();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	private void execute(){
		final Semaphore semaphore = new Semaphore(2);
		ExecutorService threadPool = Executors.newCachedThreadPool();
		for(int i=0; i<20; i++){
			threadPool.execute(new SemaphoreRunnable(semaphore, i+1));
		}
		threadPool.shutdown();
	}
	public static void main(String[] args) {
		SemaphoreTest st = new SemaphoreTest();
		st.execute();
	}
}

    充分的模擬了現實售票視窗的售票業務,semaphore.acquire()相當於進入了其中一個視窗,買完票後semaphore.release()相當於買票者離開了售票視窗,這種情況下才允許下一個買票者進入售票視窗。

二、CyclicBarrier

CyclicBarrier直譯過來叫做記憶體屏障,它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續下面的業務。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。與CountDownLatch不同的是該barrier在釋放等待執行緒後可以重用,所以稱它為迴圈(Cyclic)的屏障(Barrier)。業務場景:假設公司突然開竅了,出錢讓程式設計師出去團建,肯定是等所有人員到達指定地點時,才發車,發車之前可能要拍一個集體照,加入就三個程式設計師,業務程式碼如下:


public class CyclicBarrierTest {
	public static void main(String[] args) {
		final CyclicBarrier cb = new CyclicBarrier(3,new Runnable() {
			@Override
			public void run() {
				System.out.println("人員已經到齊,開始拍照。。。");
				try {
					Thread.sleep((long)(Math.random()*10000));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		ExecutorService threadPool = Executors.newCachedThreadPool();
		for (int i = 0; i < 3; i++) {
			final int coder = i+1;
			Runnable r = new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep((long)(Math.random()*10000));
						System.out.println(coder+"到達集合地點,當前已有"+(cb.getNumberWaiting()+1)+"到達。。。");
						cb.await();
						System.out.println("拍照完畢,開始出發。。。");
						Thread.sleep((long)(Math.random()*1000));
						System.out.println("到達遊玩地,"+coder+"開始下車。。。");
					} catch (InterruptedException|BrokenBarrierException e) {
						e.printStackTrace();
					}
				}
			};
			threadPool.execute(r);
		}
		threadPool.shutdown();
	}
}

    其中cb.await()相當於發出屏障指令,要等所有程式設計師到達目的地之後才進行下面的安排。檢視CyclicBarrier的原始碼,有兩個構造方法:

/**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    CyclicBarrier(int parties)中的parties只是指定了需要等待的執行緒數,而CyclicBarrier(int parties, Runnable barrierAction)則是追加了一個執行緒,這個執行緒可以等parties數目執行緒都到達觸發一些業務處理,如上面那個例子當中的人員到達後拍照留念。

三、CountDownLatch

    CountDownLatch可以實現類似計數器的功能,計數器的初始值為指定的執行緒的數量,每當一個執行緒完成了自己的任務,計數器的值就會減1,當計數器的值達到了0時,它表示所有的執行緒都完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。構造器上的計數值實際上就是閉鎖需要等待的執行緒數量,這個值只能被設定一次,而且CountDownLatch沒有提供任何機制去重新設定這個值。

業務場景:運動員賽跑,要在同一時刻開始起跑,但是要等最後一個運動員到達終點時才會彙總成績,假如有8個運動員:

public class PlayerTest {
	public static void main(String[] args) {
		CountDownLatch beginLatch = new CountDownLatch(1);
		CountDownLatch endLatch = new CountDownLatch(8);
		for (int i = 0; i < 8; i++) {
			new Thread(new PlayerTest().new Work(i+1, beginLatch, endLatch)).start();
		}
		try {
			System.out.println("準備起跑。。。");
			beginLatch.countDown();
			endLatch.await();
			System.out.println("成績彙總。。。");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	class Work implements Runnable{
		private int id;
		private CountDownLatch beginLatch;
		private CountDownLatch endLatch;
		public Work(int id, CountDownLatch beginLatch, CountDownLatch endLatch) {
			super();
			this.id = id;
			this.beginLatch = beginLatch;
			this.endLatch = endLatch;
		}
		@Override
		public void run() {
			try {
				beginLatch.await();
				System.out.println("運動員{"+id+"}到達終點。。。");
				endLatch.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

四、Exchanger

    Exchanger是用於執行緒間協作的工具類,用於執行緒間的資料交換,它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange方法交換資料, 如果第一個執行緒先執行exchange方法,它會一直等待第二個執行緒也執行exchange,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方。

    業務場景:我們經常看一些販毒的警匪片,最經典的臺詞就是“一手交錢,一手交貨”,程式碼如下:

public class ExchangerTest {
	public static void main(String[] args) {
		ExecutorService threadPool = Executors.newCachedThreadPool();
		final Exchanger<String> ex = new Exchanger<>();
		threadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String reStr = ex.exchange("一手交貨");
					System.out.println("吸毒者:"+reStr);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		threadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String reStr = ex.exchange("一手交錢");
					System.out.println("販毒者:"+reStr);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		threadPool.shutdown();
	}
}
    吸毒者要想獲得毒品,就要換取“一手交錢”的資訊,販毒者要想獲得金錢,就要換取“一手交貨”的資訊,當雙方到達一個小黑屋的同步點時,就可以交錢收貨和交貨收錢了!

    熟悉了各個併發程式設計提供的控制類的介紹和用法,在一些沒有必要使用重量級鎖的業務場景中,就可以應用併發揮它們的作用了。