1. 程式人生 > >第六十七條 避免過度同步

第六十七條 避免過度同步

上一條,我們知道了併發中要對資料進行安全性保護,防止資料出錯,一般是進行同步保護,但做事情不能從一個極端走向另外一個極端,在同步保護的同時,要注意防止過度同步,因為它會導致效能下降、死鎖,甚至是其他的一些不確定行為。本條目中的一個醒目觀點就是,如果一個方法被同步了,那麼,這個同步方法裡涉及的有其他方法,那麼這些方法不應該是回撥,或者可以被重寫。舉個例子,我們在 16 條中舉例個 ForwardingSet 集合,那麼以它為藍本,舉例

public class ObservableSet<E> extends ForwardingSet<E> {

    public ObservableSet(Set<E> s) {
        super(s);
    }

    private final List<SetObserver<E>> observers =
            new ArrayList<SetObserver<E>>();

    public void addObserver(SetObserver<E> observer){
        synchronized (observer) {
            observers.add(observer);
        }
    }

    public boolean removeObserver(SetObserver<E> observer){
        synchronized (observer) {
            return observers.remove(observer);
        }
    }

    private void notifyElemetnAdded(E element){
        synchronized (observers) {
            for(SetObserver<E> observer : observers){
                observer.added(this, element);
            }
        }
    }

    @Override
    public boolean add(E element) {
        boolean added = super.add(element);
        if(added)
            notifyElemetnAdded(element);
        return added;
    }

    @Override
    public boolean addAll(Collection<? extends E> c) {
        boolean result = false;
        for(E element : c){
            result |= add(element);
        }
        return result;
    }
}


public interface SetObserver<E> {
    void added(ObservableSet<E> set, E element);
}

測試 

public class Test {
    public static void main(String[] args) {
        ObservableSet<Integer> set =
                new ObservableSet<Integer>(new HashSet<Integer>());

        // add 方法呼叫後,新增元素成功,會呼叫notifyElemetnAdded(E element)方法,執行 SetObserver added方法
        set.addObserver(new SetObserver<Integer>() {
            public void added(ObservableSet<Integer> set, Integer element) {
                System.out.println(element);
            }
        });

        for(int i = 0; i < 100; i++){
            set.add(i);
        }
    }
}

執行後,沒有問題,會列印新增的數字。因為我們重寫了集合的add()方法,在新增元素的基礎上,如果新增成功,就會呼叫 notifyElemetnAdded(E element) 方法,注意看這個方法,它裡面是個同步鎖,功能為遍歷新增的監聽集合,呼叫監聽,這妥妥的觀察者模式啊,我們測試程式碼中的監聽只添加了一個,並且裡面的程式碼是列印這個物件,所以就有了輸出臺上的資訊。上述程式碼沒問題,我們修改一下,加入我們修改 addObserver()方法,比如我們在元素新增到23時,把當前監聽取消,會如何呢?

public class Test {
    public static void main(String[] args) {
        ObservableSet<Integer> set =
                new ObservableSet<Integer>(new HashSet<Integer>());

        // add 方法呼叫後,新增元素成功,會呼叫notifyElemetnAdded(E element)方法,執行 SetObserver added方法
        set.addObserver(new SetObserver<Integer>() {
            public void added(ObservableSet<Integer> set, Integer element) {
                System.out.println(element);
                if(element == 23)
                    set.removeObserver(this);
            }
        });

        for(int i = 0; i < 100; i++){
            set.add(i);
        }
    }
}


這次以,出錯了,看看控制檯輸出

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
    at java.util.ArrayList$Itr.next(ArrayList.java:851)
    at com.duan.rand.ObservableSetTest$ObservableSet.notifyElemetnAdded(ObservableSetTest.java:69)
    at com.duan.rand.ObservableSetTest$ObservableSet.add(ObservableSetTest.java:79)
    at com.duan.rand.ObservableSetTest.main(ObservableSetTest.java:32)

本意是我們列印數字,列印到23時,把這個列印的觀察者取消,意思是列印0-23為止,停止列印,程式正常,但此時程式明顯與預想不一樣,問題出在哪了?我們在 set.add(i); 時,是會呼叫 notifyElemetnAdded(element) 方法,這個方法,本身就是一個遍歷 observers 集合,遍歷集合有回撥 observer.added(this, element);,我們在這個回撥裡又呼叫了set.removeObserver(this); ,意思就是 observers集合把這個監聽remove掉,這就變成了同一個ArrayList集合,用增強for迴圈遍歷元素時,又呼叫remove()方法,但然就會出問題。雖然我們使用了同步鎖,可以防止併發修改,但我們無法防止單一執行緒中本身的同時操作,一邊增強for遍歷一邊刪除,所以就出錯了。

我們再次做出嘗試

public class Test {
    public static void main(String[] args) {
        ObservableSet<Integer> set =
                new ObservableSet<Integer>(new HashSet<Integer>());

        //  add 方法呼叫後,新增元素成功,會呼叫notifyElemetnAdded(E element)方法,執行 SetObserver added方法
        set.addObserver(new SetObserver<Integer>() {
            public void added(final ObservableSet<Integer> set, Integer element) {
                System.out.println(element);
                if(element == 23){
                    ExecutorService executor = Executors.newSingleThreadExecutor();
                    final SetObserver<Integer> observer = this;
                    try {
                        executor.submit(new Runnable(){
                            public void run() {
                                set.removeObserver(observer);
                            }
                        }).get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }finally{
            executor.shutdown();
            }
                }
            }
        });

        for(int i = 0; i < 100; i++){
            set.add(i);
        }
    }
}

在我的電腦上,仍舊報錯;在以前舊版本的jvm上,會遇到死鎖,後臺執行緒呼叫set.removeObserver,企圖鎖定observers,但它無法獲得該鎖,因為主執行緒已經有鎖了,主執行緒一直等待後臺執行緒完成對它的刪除。對於上述問題,怎麼辦?兩種解決方法

一、我們  notifyElemetnAdded(E element) 方法中,對集合observers做個轉換,相當於臨時變數,每次呼叫notifyElemetnAdded(E element) 方法時,都會檢查一下observers集合,然後把集合observers的元素通過clone的形式給予snapshot集合,遍歷snapshot集合,這樣就把遍歷和remove()拆分了開來,互不影響

    private void notifyElemetnAdded(E element){
        List<SetObserver<E>> snapshot = null;
        synchronized (observers) {
            snapshot = new ArrayList<SetObserver<E>>(observers);
        }
        for(SetObserver<E> observer : snapshot){
            observer.added(this, element);
        }
    }

二、用系統 CopyOnWriteArrayList 集合代替 ArrayList 集合,這個類通過clone底層陣列,實現元素的增刪,可以在併發的情況下,保持元素的正確增刪。我們可以看一下它的add()實現方法

android 版本原始碼:
    public synchronized boolean add(E e) {
        Object[] newElements = new Object[elements.length + 1];
        System.arraycopy(elements, 0, newElements, 0, elements.length);
        newElements[elements.length] = e;
        elements = newElements;
        return true;
    }

    public synchronized void add(int index, E e) {
        Object[] newElements = new Object[elements.length + 1];
        System.arraycopy(elements, 0, newElements, 0, index);
        newElements[index] = e;
        System.arraycopy(elements, index, newElements, index + 1, elements.length - index);
        elements = newElements;
    }

    public synchronized E remove(int index) {
        @SuppressWarnings("unchecked")
        E removed = (E) elements[index];
        removeRange(index, index + 1);
        return removed;
    }

java 版本原始碼

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

    public void add(int index, E element) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            if (index > len || index < 0)
                throw new IndexOutOfBoundsException("Index: "+index+
                                                    ", Size: "+len);
            Object[] newElements;
            int numMoved = len - index;
            if (numMoved == 0)
                newElements = Arrays.copyOf(elements, len + 1);
            else {
                newElements = new Object[len + 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index, newElements, index + 1,
                                 numMoved);
            }
            newElements[index] = element;
            setArray(newElements);
        } finally {
            lock.unlock();
        }
    }

    public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays.copyOf(elements, len - 1));
            else {
                Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

不難看出,android 和 java 雖然實現細節不同,但原理一樣,都是對這個方法加上鎖,然後clone元素到新陣列,不同的是,android 用的是關鍵字 synchronized ,而java用的是ReentrantLock 這個lock鎖,在 jdk1.5 之前,synchronized 效率比 lock鎖 效率低,但1.5以後,jvm大大優化了 synchronized ,效率大大提高,現在兩個的效率不相上下。


由此我們可以得出結論,外部呼叫的內部集合應避免上鎖,而是建立一個快照,然後對快照進行上鎖。