第六十七條 避免過度同步
上一條,我們知道了併發中要對資料進行安全性保護,防止資料出錯,一般是進行同步保護,但做事情不能從一個極端走向另外一個極端,在同步保護的同時,要注意防止過度同步,因為它會導致效能下降、死鎖,甚至是其他的一些不確定行為。本條目中的一個醒目觀點就是,如果一個方法被同步了,那麼,這個同步方法裡涉及的有其他方法,那麼這些方法不應該是回撥,或者可以被重寫。舉個例子,我們在 16 條中舉例個 ForwardingSet 集合,那麼以它為藍本,舉例
public class ObservableSet<E> extends ForwardingSet<E> {
public ObservableSet(Set<E> s) {
super(s);
}
private final List<SetObserver<E>> observers =
new ArrayList<SetObserver<E>>();
public void addObserver(SetObserver<E> observer){
synchronized (observer) {
observers.add(observer);
}
}
public boolean removeObserver(SetObserver<E> observer){
synchronized (observer) {
return observers.remove(observer);
}
}
private void notifyElemetnAdded(E element){
synchronized (observers) {
for(SetObserver<E> observer : observers){
observer.added(this, element);
}
}
}
@Override
public boolean add(E element) {
boolean added = super.add(element);
if(added)
notifyElemetnAdded(element);
return added;
}
@Override
public boolean addAll(Collection<? extends E> c) {
boolean result = false;
for(E element : c){
result |= add(element);
}
return result;
}
}
public interface SetObserver<E> {
void added(ObservableSet<E> set, E element);
}
測試
public class Test {
public static void main(String[] args) {
ObservableSet<Integer> set =
new ObservableSet<Integer>(new HashSet<Integer>());
// add 方法呼叫後,新增元素成功,會呼叫notifyElemetnAdded(E element)方法,執行 SetObserver added方法
set.addObserver(new SetObserver<Integer>() {
public void added(ObservableSet<Integer> set, Integer element) {
System.out.println(element);
}
});
for(int i = 0; i < 100; i++){
set.add(i);
}
}
}
執行後,沒有問題,會列印新增的數字。因為我們重寫了集合的add()方法,在新增元素的基礎上,如果新增成功,就會呼叫 notifyElemetnAdded(E element) 方法,注意看這個方法,它裡面是個同步鎖,功能為遍歷新增的監聽集合,呼叫監聽,這妥妥的觀察者模式啊,我們測試程式碼中的監聽只添加了一個,並且裡面的程式碼是列印這個物件,所以就有了輸出臺上的資訊。上述程式碼沒問題,我們修改一下,加入我們修改 addObserver()方法,比如我們在元素新增到23時,把當前監聽取消,會如何呢?
public class Test {
public static void main(String[] args) {
ObservableSet<Integer> set =
new ObservableSet<Integer>(new HashSet<Integer>());
// add 方法呼叫後,新增元素成功,會呼叫notifyElemetnAdded(E element)方法,執行 SetObserver added方法
set.addObserver(new SetObserver<Integer>() {
public void added(ObservableSet<Integer> set, Integer element) {
System.out.println(element);
if(element == 23)
set.removeObserver(this);
}
});
for(int i = 0; i < 100; i++){
set.add(i);
}
}
}
這次以,出錯了,看看控制檯輸出
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at com.duan.rand.ObservableSetTest$ObservableSet.notifyElemetnAdded(ObservableSetTest.java:69)
at com.duan.rand.ObservableSetTest$ObservableSet.add(ObservableSetTest.java:79)
at com.duan.rand.ObservableSetTest.main(ObservableSetTest.java:32)
本意是我們列印數字,列印到23時,把這個列印的觀察者取消,意思是列印0-23為止,停止列印,程式正常,但此時程式明顯與預想不一樣,問題出在哪了?我們在 set.add(i); 時,是會呼叫 notifyElemetnAdded(element) 方法,這個方法,本身就是一個遍歷 observers 集合,遍歷集合有回撥 observer.added(this, element);,我們在這個回撥裡又呼叫了set.removeObserver(this); ,意思就是 observers集合把這個監聽remove掉,這就變成了同一個ArrayList集合,用增強for迴圈遍歷元素時,又呼叫remove()方法,但然就會出問題。雖然我們使用了同步鎖,可以防止併發修改,但我們無法防止單一執行緒中本身的同時操作,一邊增強for遍歷一邊刪除,所以就出錯了。
我們再次做出嘗試
public class Test {
public static void main(String[] args) {
ObservableSet<Integer> set =
new ObservableSet<Integer>(new HashSet<Integer>());
// add 方法呼叫後,新增元素成功,會呼叫notifyElemetnAdded(E element)方法,執行 SetObserver added方法
set.addObserver(new SetObserver<Integer>() {
public void added(final ObservableSet<Integer> set, Integer element) {
System.out.println(element);
if(element == 23){
ExecutorService executor = Executors.newSingleThreadExecutor();
final SetObserver<Integer> observer = this;
try {
executor.submit(new Runnable(){
public void run() {
set.removeObserver(observer);
}
}).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally{
executor.shutdown();
}
}
}
});
for(int i = 0; i < 100; i++){
set.add(i);
}
}
}
在我的電腦上,仍舊報錯;在以前舊版本的jvm上,會遇到死鎖,後臺執行緒呼叫set.removeObserver,企圖鎖定observers,但它無法獲得該鎖,因為主執行緒已經有鎖了,主執行緒一直等待後臺執行緒完成對它的刪除。對於上述問題,怎麼辦?兩種解決方法
一、我們 notifyElemetnAdded(E element) 方法中,對集合observers做個轉換,相當於臨時變數,每次呼叫notifyElemetnAdded(E element) 方法時,都會檢查一下observers集合,然後把集合observers的元素通過clone的形式給予snapshot集合,遍歷snapshot集合,這樣就把遍歷和remove()拆分了開來,互不影響
private void notifyElemetnAdded(E element){
List<SetObserver<E>> snapshot = null;
synchronized (observers) {
snapshot = new ArrayList<SetObserver<E>>(observers);
}
for(SetObserver<E> observer : snapshot){
observer.added(this, element);
}
}
二、用系統 CopyOnWriteArrayList 集合代替 ArrayList 集合,這個類通過clone底層陣列,實現元素的增刪,可以在併發的情況下,保持元素的正確增刪。我們可以看一下它的add()實現方法
android 版本原始碼:
public synchronized boolean add(E e) {
Object[] newElements = new Object[elements.length + 1];
System.arraycopy(elements, 0, newElements, 0, elements.length);
newElements[elements.length] = e;
elements = newElements;
return true;
}
public synchronized void add(int index, E e) {
Object[] newElements = new Object[elements.length + 1];
System.arraycopy(elements, 0, newElements, 0, index);
newElements[index] = e;
System.arraycopy(elements, index, newElements, index + 1, elements.length - index);
elements = newElements;
}
public synchronized E remove(int index) {
@SuppressWarnings("unchecked")
E removed = (E) elements[index];
removeRange(index, index + 1);
return removed;
}
java 版本原始碼
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
public void add(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index;
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
newElements[index] = element;
setArray(newElements);
} finally {
lock.unlock();
}
}
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
不難看出,android 和 java 雖然實現細節不同,但原理一樣,都是對這個方法加上鎖,然後clone元素到新陣列,不同的是,android 用的是關鍵字 synchronized ,而java用的是ReentrantLock 這個lock鎖,在 jdk1.5 之前,synchronized 效率比 lock鎖 效率低,但1.5以後,jvm大大優化了 synchronized ,效率大大提高,現在兩個的效率不相上下。
由此我們可以得出結論,外部呼叫的內部集合應避免上鎖,而是建立一個快照,然後對快照進行上鎖。