1. 程式人生 > >二十九、併發程式設計之併發工具類Semaphore詳解

二十九、併發程式設計之併發工具類Semaphore詳解

一、簡介

Semaphore是一個計數訊號量,常用於限制可以訪問某些資源(物理或邏輯的)執行緒數目。
Semaphore是計數訊號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可證這個物件,Semaphore只是維持了一個可獲得許可證的數量。
Semaphore經常用於限制獲取某種資源的執行緒數量。

二、原理

1、構造方法

Semaphore有兩個構造方法,如下:

    public Semaphore
(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

從上面可以看到兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,預設非公平模式。
Semaphore內部基於AQS的共享模式,所以實現都委託給了Sync類。
這裡就看一下NonfairSync的構造方法:

Sync(int permits) {
  setState(permits);
}

可以看到呼叫了setState方法,也就是說AQS中的資源就是許可證的數量。

2、獲取許可

先從獲取一個許可看起,並且先看非公平模式下的實現。首先看acquire方法,acquire方法有幾個過載,但主要是下面這個方法

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly
(permits); }

從上面可以看到,呼叫了Sync的acquireSharedInterruptibly方法,該方法在父類AQS中,如下:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  //如果執行緒被中斷了,丟擲異常
  if (Thread.interrupted())
      throw new InterruptedException();
  //獲取許可失敗,將執行緒加入到等待佇列中
  if (tryAcquireShared(arg) < 0)
      doAcquireSharedInterruptibly(arg);
}

AQS子類如果要使用共享模式的話,需要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:

protected int tryAcquireShared(int acquires) {
   return nonfairTryAcquireShared(acquires);
}

該方法呼叫了父類中的nonfairTyAcquireShared方法,如下:

	final int nonfairTryAcquireShared(int acquires) {
		for (;;) {
			//獲取剩餘許可數量
			int available = getState();
			//計算給完這次許可數量後的個數
			int remaining = available - acquires;
			//如果許可不夠或者可以將許可數量重置的話,返回
			if (remaining < 0 ||
					compareAndSetState(available, remaining))
				return remaining;
		}
	}

從上面可以看到,只有在許可不夠時返回值才會小於0,其餘返回的都是剩餘許可數量,這也就解釋了,一旦許可不夠,後面的執行緒將會阻塞。看完了非公平的獲取,再看下公平的獲取,程式碼如下:

	protected int tryAcquireShared(int acquires) {
		for (;;) {
			//如果前面有執行緒再等待,直接返回-1
			if (hasQueuedPredecessors())
				return -1;
			//後面與非公平一樣
			int available = getState();
			int remaining = available - acquires;
			if (remaining < 0 ||
					compareAndSetState(available, remaining))
				return remaining;
		}
	}

從上面可以看到,FairSync與NonFairSync的區別就在於會首先判斷當前佇列中有沒有執行緒在等待,如果有,就老老實實進入到等待佇列;而不像NonfairSync一樣首先試一把,說不定就恰好獲得了一個許可,這樣就可以插隊了。
看完了獲取許可後,再看一下釋放許可。

3、釋放許可

釋放許可也有幾個過載方法,但都會呼叫下面這個帶引數的方法,

public void release(int permits) {
   if (permits < 0) throw new IllegalArgumentException();
   sync.releaseShared(permits);
}

releaseShared方法在AQS中,如下:

public final boolean releaseShared(int arg) {
 	//如果改變許可數量成功
   if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
   }
   return false;
}

AQS子類實現共享模式的類需要實現tryReleaseShared類來判斷是否釋放成功,實現如下:

protected final boolean tryReleaseShared(int releases) {
		for (;;) {
			//獲取當前許可數量
			int current = getState();
			//計算回收後的數量
			int next = current + releases;
			if (next < current) // overflow
				throw new Error("Maximum permit count exceeded");
			//CAS改變許可數量成功,返回true
			if (compareAndSetState(current, next))
				return true;
		}
	}

從上面可以看到,一旦CAS改變許可數量成功,那麼就會呼叫doReleaseShared()方法釋放阻塞的執行緒。
減小許可數量。
Semaphore還有減小許可數量的方法,該方法可以用於用於當資源用完不能再用時,這時就可以減小許可證。程式碼如下:

	protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

可以看到,委託給了Sync,Sync的reducePermits方法如下:

 	final void reducePermits(int reductions) {
            for (;;) {
                //得到當前剩餘許可數量
                int current = getState();
                //得到減完之後的許可數量
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                //如果CAS改變成功
                if (compareAndSetState(current, next))
                    return;
            }
        }

從上面可以看到,就是CAS改變AQS中的state變數,因為該變數代表許可證的數量。
獲取剩餘許可數量  
Semaphore還可以一次將剩餘的許可數量全部取走,該方法是drain方法,如下:

	public int drainPermits() {
        return sync.drainPermits();
    }

Sync的實現如下:

	final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }

可以看到,就是CAS將許可數量置為0。

三、使用

操場上有5個跑道,一個跑道一次只能有一個學生在上面跑步,一旦所有跑道在使用,那麼後面的學生就需要等待,直到有一個學生不跑了。

/**
 * 操場,有5個跑道
 */
public class Playground {

	/**
	 * 跑道類
	 */
	static class Track {
		private int num;

		public Track(int num) {
			this.num = num;
		}

		@Override
		public String toString() {
			return "Track{" +
					"num=" + num +
					'}';
		}
	}

	private Track[] tracks = {new Track(1), new Track(2), new Track(3), new Track(4), new Track(5)};
	private volatile boolean[] used = new boolean[5];
	private Semaphore semaphore = new Semaphore(5, true);

	/**
	 * 獲取一個跑道
	 */
	public Track getTrack() throws InterruptedException {
		semaphore.acquire(1);
		return getNextAvailableTrack();
	}

	/**
	 * 返回一個跑道
	 */
	public void releaseTrack(Track track) {
		if (makeAsUsed(track))
			semaphore.release(1);
	}

	/**
	 * 遍歷,找到一個沒人用的跑道
	 */
	private Track getNextAvailableTrack() {
		for (int i = 0; i < used.length; i++) {
			if (!used[i]) {
				used[i] = true;
				return tracks[i];
			}
		}
		return null;
	}

	/**
	 * 返回一個跑道
	 */
	private boolean makeAsUsed(Track track) {
		for (int i = 0; i < used.length; i++) {
			if (tracks[i] == track) {
				if (used[i]) {
					used[i] = false;
					return true;
				} else {
					return false;
				}

			}
		}
		return false;
	}
}

建立了5個跑道物件,並使用一個boolean型別的陣列記錄每個跑道是否被使用了,初始化了5個許可證的Semaphore,在獲取跑道時首先呼叫acquire(1)獲取一個許可證,在歸還一個跑道是呼叫release(1)釋放一個許可證。接下來再看啟動程式:

public class SemaphoreDemo {
	static class Student implements Runnable {
		private int num;
		private Playground playground;

		public Student(int num, Playground playground) {
			this.num = num;
			this.playground = playground;
		}

		@Override
		public void run() {
			try {
				//獲取跑道
				Playground.Track track = playground.getTrack();
				if (track != null) {
					System.out.println("學生" + num + "在" + track.toString() + "上跑步");
					TimeUnit.SECONDS.sleep(2);
					System.out.println("學生" + num + "釋放" + track.toString());
					//釋放跑道
					playground.releaseTrack(track);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		Executor executor = Executors.newCachedThreadPool();
		Playground playground = new Playground();
		for (int i = 0; i < 100; i++) {
			executor.execute(new Student(i+1,playground));
		}
	}
}

四、總結

Semaphore是訊號量,用於管理一組資源。其內部是基於AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,執行緒將會被掛起;而一旦有一個執行緒釋放一個資源,那麼就有可能重新喚醒等待佇列中的執行緒繼續執行。

原文