馬士兵老師高併發同步容器
手寫固定同步容器
寫一個固定容量同步容器,擁有put和get方法,以及getCount方法,能夠支援2個生產者執行緒以及10個消費者執行緒的阻塞呼叫。
使用wait與notify
思路:使用一個集合來當做生產或者消費的中轉站,然後每當生產或者消費的時刻都判斷集合的容量,如果不滿足條件那麼就對這種操作進行阻塞也就是wait同時notify其它的所有執行緒。當其它執行緒啟動之後也會遇到“不合格的執行緒”這時候也會阻塞,直到合格的執行緒進行執行。
核心程式碼:
public class MyContainer1<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10個元素 private int count = 0; public synchronized void put(T t) { while(lists.size() == MAX) { //想想為什麼用while而不是用if? try { this.wait(); //effective java } catch (InterruptedException e) { e.printStackTrace(); } } lists.add(t); ++count; this.notifyAll(); //通知消費者執行緒進行消費 } public synchronized T get() { T t = null; while(lists.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } t = lists.removeFirst(); count --; this.notifyAll(); //通知生產者進行生產 return t; } public static void main(String[] args) { MyContainer1<String> c = new MyContainer1<>(); //啟動消費者執行緒 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //啟動生產者執行緒 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }
注意事項:
為什麼是while?
有下面的一個場景,當消費者消費的閾值不滿足條件那麼它將會被wait阻塞。此時還有其的9個消費者也處於阻塞狀態。當notifyAll的時候我們希望的是讓生產者來生產元素,但是這時候被喚醒的消費者會繼續去消費,程式碼會從wait()處直接向下執行。如果是一個if判斷,再加上這時的集合中沒有元素那麼此時一定會出異常。但是如果使用的是while的話那麼被喚醒的消費者就會迴圈檢測發現不滿足條件就繼續阻塞。整個程式順利進行。
使用signalAll喚醒對應條件的執行緒
signalAll與ReentrantLock共用達到只喚醒對應條件的執行緒。比如說當生產者生產的元素超過閾值的時候他就會呼叫signalAll這時所有的消費者被喚醒而所有的生產者則不受影響。這樣就可以避免喚醒不必要的執行緒節省資源。
此處注意要給每一種執行緒都定義一個Condition,在上鎖的時候就只用這個Condition的鎖去鎖定對應的執行緒。具體程式碼如下:
public class MyContainer2<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10個元素 private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public void put(T t) { try { lock.lock(); while(lists.size() == MAX) { producer.await(); } lists.add(t); ++count; consumer.signalAll(); //通知消費者執行緒進行消費 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T get() { T t = null; try { lock.lock(); while(lists.size() == 0) { consumer.await(); } t = lists.removeFirst(); count --; producer.signalAll(); //通知生產者進行生產 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } public static void main(String[] args) { MyContainer2<String> c = new MyContainer2<>(); //啟動消費者執行緒 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) { System.out.println(c.get()); // System.out.println("c"); } }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //啟動生產者執行緒 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }
售票員案例
* 場景
* 有N張火車票,每張票都有一個編號
* 同時有10個視窗對外售票
* 請寫一個模擬程式
*
* 分析下面的程式可能會產生哪些問題?
* 重複銷售?超量銷售?
Solution1:使用執行緒不安全的集合而且不上鎖
public class TicketSeller1 {
static List<String> tickets = new ArrayList<>();
static {
for(int i=0; i<1000; i++) tickets.add("票編號:" + i);
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(tickets.size() > 0) {
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("銷售了--" + tickets.remove(0));
}
}).start();
}
}
}
為了使得問題呈現的效果明顯我們加上了睡眠時間。此程式會出現的問題:
總的來說會有兩個點出現問題:
①程式邏輯的執行緒不安全,有可能有多個執行緒湧入while迴圈中這是造成併發問題的根本。這個原因會導致在size為1的時候湧入很多的執行緒進而執行多次刪除操作下標越界。
②集合的執行緒不安全,remove的方法本來就不是執行緒安全的。為了說明問題我,我們把remove方法放大:
可以看出如果兩個執行緒同時執行remove方法的話,由於index一樣所以他們的remove的返回值就會得到同一個oldValue。也就是重複賣出。
Solution2:使用集合Vector但是程式碼邏輯不加鎖
程式碼上的邏輯與solution1是一樣的,但是Vector集合是執行緒安全的,所以它只會出現程式邏輯不安全帶來的併發問題。也就是會出現有可能有多個執行緒湧入while迴圈中這是造成併發問題的根本。這個原因會導致在size為1的時候湧入很多的執行緒進而執行多次刪除操作下標越界。但是絕對不會出現賣出同一張票的情況。我們把remove的程式碼放大:
這是一個同步的方法,每一個執行緒過來如果得不到鎖得話都會陷入等待。雖然都是remove(0)但是當下一個執行緒來到的時候0位置已經是一個全新的元素。
Solution3:給程式碼邏輯上鎖使用執行緒不安全的集合
不多說了無論如何都可以防止執行緒安全問題,因為在Solution1中已經提到過了程式碼的併發問題是一切問題的原因。直接上程式碼:
public class TicketSeller3 {
static List<String> tickets = new LinkedList<>();
static {
for(int i=0; i<1000; i++) tickets.add("票 編號:" + i);
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(true) {
synchronized(tickets) {
if(tickets.size() <= 0) break;
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("銷售了--" + tickets.remove(0));
}
}
}).start();
}
}
}
Solution4使用併發集合不加鎖
首先說併發的集合是執行緒安全的,而且效率較高因為使用了局部鎖。這樣的話只在取值的時候加了鎖,而且如果是以下標來取值的話還可以同時取走多個地方的值這樣的話效率大大提高。而且這裡使用一種取值然後再判斷的邏輯巧妙的避免了下表越界的錯誤,而前面的案例中都是先判斷再取值這樣就造成了執行緒不安全:
public class TicketSeller4 {
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static {
for(int i=0; i<1000; i++) tickets.add("票 編號:" + i);
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(true) {
String s = tickets.poll();
if(s == null) break;
else System.out.println("銷售了--" + s);
}
}).start();
}
}
}
高併發集合簡介
ConcurrectHashMap:使用了局部鎖,也就是細粒度的鎖來提高併發效果。
ConcurrectSkipListMap:使用了局部鎖,但是結果也排序的map集合。對比TreeMap一個元素排序的map集合。
CopyOnWriteArrayList:讀取時不加鎖,但是寫的時候回拷貝原有的資料然後對拷貝的資料進行操作最後將指標指向修改過的集合。這個集合適用於讀操作遠遠大於寫操作的情況。
BlockingQueue:阻塞佇列,當佇列中沒有元素的時候就會對取元素產生阻塞,當佇列中滿元素的時候就會對新增元素產生阻塞。而且不允許新增null的值而且在取值與新增值的情況下都會加鎖,換句話說它是一個執行緒安全的集合。以下為部分原始碼:
DelayQueue:執行定時任務,他的內部會裝有很多的task接受的task都實現了Delay介面,因此task內部也就維護了一個conpareTo的方法,如果按照時間排序的話那麼就能夠實現任務的定時執行。
public class T07_DelayQueue {
static BlockingQueue<MyTask> tasks = new DelayQueue<>();
static Random r = new Random();
static class MyTask implements Delayed {
long runningTime;
MyTask(long rt) {
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "" + runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask(now + 1000);
MyTask t2 = new MyTask(now + 2000);
MyTask t3 = new MyTask(now + 1500);
MyTask t4 = new MyTask(now + 2500);
MyTask t5 = new MyTask(now + 500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for(int i=0; i<5; i++) {
System.out.println(tasks.take());
}
}
}
TransferQueue:當有消費者的話那麼就直接將生產出來的元素交給消費者,但是如果沒有消費者的話就會將生產的元素放到佇列中。當佇列中的元素消耗完的時候消費者就會阻塞。