1. 程式人生 > 其它 >4.集合類不安全

4.集合類不安全

1.List集合在多執行緒下是不安全的

1.測試程式碼:建立30個執行緒同時操作一個資源list
    public class NotSafeDemo {
        public static void main(String[] args) {
            List<String> list=new ArrayList<>();
            for(int i=0;i<30;i++){
                new Thread(()->{
                    list.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(list);
                },"執行緒"+i).start();
            }
        }
    }
執行時會報錯誤(併發修改異常):java.util.ConcurrentModificationException

問題:、
    1.為什麼會這樣?
        因為 ArrayList 執行緒不安全。
        
    2.那為什麼 ArrayList 執行緒不安全?
        因為它的 add 方法沒有加鎖,多個執行緒併發過來add,就可能會出現異常。
原始碼如下:發現其add方法並沒有加鎖!

2.解決集合類不安全的方法

2.1Vector

ArrayList 和 Vector 的區別:
1.Vector 是 List 介面的古老實現類,ArrayList 是 List 介面後面新增的實現類。
2.除了執行緒安全問題與擴容方式不同,Vector 幾乎與 ArrayList 一樣。
3.可以把 Vector 作為解決 ArrayList 執行緒安全的一種方式(不過 Vector 效率太低),只是加上了synchronized關鍵字。
4.如果不需要執行緒安全性,推薦使用ArrayList替代Vector
測試程式碼如下:
    public class NotSafeDemo {
        public static void main(String[] args) {
            重點1:建立Vector
            List<String> list=new Vector<>();
            for(int i=0;i<30;i++){
                new Thread(()->{
                    list.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(list);
                },"執行緒"+i).start();
            }
        }
    }
    執行不會報錯!
    
分析原因是:
    1.Vector 的 add 方法加了鎖,如下截圖:
    2.其實 Vector 讀方法也加了鎖,相當於讀的時候,同一時刻也只能有一個執行緒能讀!

2.2 Collections

Collections是Collection的工具類,其中就提供了一個方法,可以將執行緒不安全的 ArrayList 轉換成執行緒安全的!
具體測試程式碼如下:重點:List<String> list= Collections.synchronizedList(new ArrayList<>());
    public class NotSafeDemo {
        public static void main(String[] args) {
            重點1:使用Collections.synchronizedList(new ArrayList<>())將不安全的ArrayList改為執行緒安全的
            List<String> list= Collections.synchronizedList(new ArrayList<>());
            for(int i=0;i<30;i++){
                new Thread(()->{
                    list.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(list);
                },"執行緒"+i).start();
            }
        }
    }
具體的原始碼如下:發現add方法也是加了鎖的,並且有個mutex物件,這個物件賦值為this,即鎖定的是呼叫者物件!
並且 Collections 工具類也支援將 HashMap, HashSet 等轉換成安全的。
這個地方要注意兩個地方:
    1.迭代操作必須加鎖,可以使用synchronized關鍵字修飾;
    2.synchronized持有的監視器物件必須是synchronized (list),即包裝後的list,
      使用其他物件如synchronized (new Object())會使add,remove等方法與迭代方法使用的鎖不一致,
      無法實現完全的執行緒安全性。
  原始碼迭代:
        //迭代操作並未加鎖,所以需要手動同步
        public ListIterator<E> listIterator() {
                return list.listIterator(); 
        }
所以在遍歷時需要手動加鎖:
    List list = Collections.synchronizedList(new ArrayList());
    //必須對list進行加鎖
    synchronized (list) {
      Iterator i = list.iterator();
      while (i.hasNext())
          foo(i.next());
    }

2.3 CopyOnWriteArrayList(寫時複製)

JUC的常用!
實現程式碼如下:
    List<String> list= new CopyOnWriteArrayList<>();
    
原始碼結構:(可以看到CopyOnWriteArrayList底層實現為Object[] array陣列。)
    1.先看一下 CopyOnWriteArrayList 的結構:
        public class CopyOnWriteArrayList<E>implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
             private static final long serialVersionUID = 8673264195747942595L;
            final transient ReentrantLock lock = new ReentrantLock();
            private transient volatile Object[] array;
            public CopyOnWriteArrayList() {
                setArray(new Object[0]);
            }
        }
    2.新增元素:(可以看到每次新增元素時都會進行Arrays.copyOf操作,代價非常昂貴,並且發下其加鎖方式是手動加鎖lock)
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            //重點1:手動加鎖
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                重點2:陣列複製擴容//創建出一個新的陣列去操作,讀寫分離的思想
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }

重點:有兩點必須講一下。我認為CopyOnWriteArrayList這個併發元件,其實反映的是兩個十分重要的分散式理念:
    1.讀寫分離
        我們讀取CopyOnWriteArrayList的時候讀取的是CopyOnWriteArrayList中的Object[] array,
        但是修改的時候,操作的是一個新的Object[] array,讀和寫操作的不是同一個物件,這就是讀寫分離。
        這種技術資料庫用的非常多,在高併發下為了緩解資料庫的壓力,即使做了快取也要對資料庫做讀寫分離,
        讀的時候使用讀庫,寫的時候使用寫庫,然後讀庫、寫庫之間進行一定的同步,這樣就避免同一個庫上讀、寫的IO操作太多
    
    2.最終一致
        對CopyOnWriteArrayList來說,執行緒1讀取集合裡面的資料,未必是最新的資料。
        因為執行緒2、執行緒3、執行緒4四個執行緒都修改了CopyOnWriteArrayList裡面的資料,
        但是執行緒1拿到的還是最老的那個Object[] array,新新增進去的資料並沒有,
        所以執行緒1讀取的內容未必準確。不過這些資料雖然對於執行緒1是不一致的,
        但是對於之後的執行緒一定是一致的,它們拿到的Object[] array一定是三個執行緒都操作完畢之後的Object array[],
        這就是最終一致。最終一致對於分散式系統也非常重要,它通過容忍一定時間的資料不一致,提升整個分散式系統的可用性與分割槽容錯性。
        當然,最終一致並不是任何場景都適用的,像火車站售票這種系統使用者對於資料的實時性要求非常非常高,就必須做成強一致性的。

Vector/CopyOnWriteArrayList/Collections.synchronizedList的效能比較

通過前面的分析可知:
    1.Vector對所有操作進行了synchronized關鍵字修飾,效能應該比較差
    2.CopyOnWriteArrayList在寫操作時需要進行copy操作,讀效能較好,寫效能較差
    3.Collections.synchronizedList效能較均衡,但是迭代操作並未加鎖,所以需要時需要額外注意
併發測試結果如圖:
    1.可以看到隨著執行緒數的增加,三個類操作時間都有所增加。
    2.Vector的遍歷操作和CopyOnWriteArrayList的寫操作(圖片中標紅的部分)效能消耗尤其嚴重
    3.出乎意料的是Vector的讀寫操作和Collections.synchronizedList比起來並沒有什麼差別
     (印象中Vector效能很差,實際效能差的只是遍歷操作,看來還是紙上得來終覺淺,絕知此事要躬行啊)
   4.仔細分析了下程式碼,雖然Vector使用synchronized修飾方法,Collections.synchronizedList使用synchronized修飾語句塊,
      但實際鎖住內容並沒有什麼區別,效能相似也在情理之中

總結
    1.CopyOnWriteArrayList的寫操作與Vector的遍歷操作效能消耗尤其嚴重,不推薦使用。
    2.CopyOnWriteArrayList適用於讀操作遠遠多於寫操作的場景。
    3.Vector讀寫效能可以和Collections.synchronizedList比肩,但Collections.synchronizedList不僅可以包裝ArrayList,
      也可以包裝其他List,擴充套件性和相容性更好。

3.set安全問題

set:用於儲存無序(存入和取出的順序不一定相同)元素,值不能重複
測試程式碼如下:
    public class NotSafeDemo {
        public static void main(String[] args) {
            //重點1:建立一個執行緒不安全的set
            Set<String> set = new HashSet();
            for (int i=0;i<=30;i++){
                new Thread(()->{
                    set.add(UUID.randomUUID().toString().substring(0,5));
                    System.out.println(set);
                },"執行緒"+i).start();
            }
        }
    }
    執行會出現問題:(併發修改異常)java.util.ConcurrentModificationException
如何解決上述問題呢:
    1.Collections工具類:
        Set<String> set=Collections.synchronizedSet(new HashSet<>());
    2.CopyOnWriteArraySet:
        Set<String> set=new CopyOnWriteArraySet<>();
        底層是:CopyOnWriteArrayList
            CopyOnWriteArraySet的構造方法
            public CopyOnWriteArraySet() {
                al = new CopyOnWriteArrayList<E>();
            }
        調到set的add方法最終會呼叫到:
            public boolean addIfAbsent(E e) {
                Object[] snapshot = getArray();
                //會判斷list中是否有重複的,有的話就不放,沒有再放
                return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
                    addIfAbsent(e, snapshot);
            }
    
set特徵:
     1.無序性
     2.不能重複
     3.允許有null但是隻能有一個
 
 hashset的底層是什麼呢?
     set原始碼:發現其底層就是HashMap
         public HashSet() {
            map = new HashMap<>();
         }
     add方法:
         public boolean add(E e) {
            //發現其放入的是map的key,值是固定的,所以說set裡的值是hashmap的key, 
            return map.put(e, PRESENT)==null;
        }

4.HashMap的執行緒安全問題

不使用Map<String, Object> map = new HashMap<> ();
而是用:
    1.Map<String,Object> map=new ConcurrentHashMap<>();
    2.Map<String,Object> map= Collections.synchronizedMap(new HashMap<>());

5.Callable

Callable 介面類似於 Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是 Runnable 不會返回結果,並且無法丟擲經過檢查的異常。
特點:
    1.Callable 可以丟擲異常
    2.Callable 可以返回結果
    3.Callable 呼叫get方法時會阻塞
    4.Callable 需要藉助FutureTask 去和Thread關聯啟動
樣例:
    public class Juc_Test_Lock {
        public static void main(String[] args) {
            //重點1:建立Callable介面例項
            MyThread myThread=new MyThread();
            //重點2:建立Callable關聯的FutureTask
            FutureTask futureTask=new FutureTask(myThread);
            new Thread(futureTask,"執行緒1").start();
            //重點3:這個方法會阻塞,因為要等待執行緒執行完畢拿到結果
            Integer result= (Integer) futureTask.get();
            System.out.println("執行緒返回:"+result);
        }
    }
    //重點4:實現Callable介面
    class MyThread implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println("call方法");
            return 1024;
        }
    }
    輸出:
        call方法
        執行緒返回:1024
    
問題:如果是兩個執行緒同時啟動呢?
    new Thread(futureTask,"執行緒1").start();
    new Thread(futureTask,"執行緒2").start();
輸出:發現也是呼叫了一次call方法,理由如下!
    call方法
    執行緒返回:1024

 FutureTask的構造器如下:會有一個state去標記,如果執行了一次,這個狀態會變化!下次就不會執行了
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

6.常用輔助類

1CountDownLatch(減法計數器)

1.CountDownLatch:減法記數器
    有三個重要方法:
        1.初始化,並確定計數器最大值
            CountDownLatch countDownLatch = new CountDownLatch(6);
        2.計數器數量-1
            countDownLatch.countDown();
        3.等待計數器歸0,然後再往下執行
            countDownLatch.await();  
    樣例程式碼如下:
        public class CountDownLatch_Test {
            public static void main(String[] args) throws InterruptedException {
                //重點1:建立CountDownLatch減法計數器,初始值為6
                CountDownLatch countDownLatch = new CountDownLatch(6);
                for (int i = 0; i < 6; i++) {
                    //重點2:每個執行緒間隔2秒啟動
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    new Thread(() -> {
                        countDownLatch.countDown();
                        System.out.println(Thread.currentThread().getName() + "當前時間:" + DateTime.now() + ":數量減一 當前數量:" + countDownLatch.getCount());
                    }, "執行緒" + i).start();
                }
                //重點3:等待記數器歸0,然後往下執行
                countDownLatch.await();
                System.out.println("所有執行緒等待");
            }
        }
輸出:
    執行緒0當前時間:2021-08-04 22:52:54:數量減一 當前數量:6
    執行緒1當前時間:2021-08-04 22:52:56:數量減一 當前數量:5
    執行緒2當前時間:2021-08-04 22:52:58:數量減一 當前數量:4
    執行緒3當前時間:2021-08-04 22:53:00:數量減一 當前數量:3
    執行緒4當前時間:2021-08-04 22:53:02:數量減一 當前數量:2
    執行緒5當前時間:2021-08-04 22:53:04:數量減一 當前數量:1
    所有執行緒等待
注意點:這裡不是等待所有執行緒都執行完畢後再執行countDownLatch.await();後的方法
而是等計數器歸0,即執行了countDownLatch.countDown();後,將記數器歸0,切記!
所以需要等執行緒都執行完畢後再執行,可以將countDownLatch.countDown()方法放在每個執行緒的最後!

場景:需要多個執行緒執行完畢/或者啟動某些執行緒後,才能執行後續程式碼!

2.CyclicBarrier(加法計數器)

加法記數器:
    主要方法:
        1.構造方法,第一個引數是從0加到多少時,會執行第二個引數中的方法
            CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{})
        2.該方法,底層會呼叫--count,但是也會堵塞該執行緒,cyclicBarrier.await()後面的程式碼會等CyclicBarrier條件滿足後再一起執行,看下執行結果!
            cyclicBarrier.await();
樣例程式碼如下:
    public class CyclicBarrier_Test {
        public static void main(String[] args) throws InterruptedException {
            //重點1:構造方法,如果加法計數器上達到最大值7時,會執行下面的輸出方法
            CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
                System.out.println("召喚神龍成功!");
            });
            for (int i = 0; i < 7; i++){
                TimeUnit.SECONDS.sleep(3);
                new Thread(()->{
                    try {
                        System.out.println(Thread.currentThread().getName()+":時間:"+ DateTime.now()+" 當前數量:"+cyclicBarrier.getNumberWaiting());
                        //重點2:該方法底層會呼叫--count,但是會阻塞住該佇列,等條件滿足後,會一起執行後續方法!
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread().getName()+"等待完畢!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                },"執行緒:"+i).start();
            }
        }
    }
輸出:
    執行緒:0:時間:2021-08-04 23:15:45 當前數量:0
    執行緒:1:時間:2021-08-04 23:15:48 當前數量:1
    執行緒:2:時間:2021-08-04 23:15:51 當前數量:2
    執行緒:3:時間:2021-08-04 23:15:54 當前數量:3
    執行緒:4:時間:2021-08-04 23:15:57 當前數量:4
    執行緒:5:時間:2021-08-04 23:16:00 當前數量:5
    執行緒:6:時間:2021-08-04 23:16:03 當前數量:6
    召喚神龍成功!
    執行緒:6等待完畢!
    執行緒:0等待完畢!
    執行緒:1等待完畢!
    執行緒:2等待完畢!
    執行緒:4等待完畢!
    執行緒:3等待完畢!
    執行緒:5等待完畢!

3.Semaphore(訊號量)

常用方法:類似於執行緒池的概念
    1.獲取執行緒資源
        semaphore.acquire();
    2.釋放執行緒資源:
        semaphore.release();

樣例程式碼如下:
    public class Semaphore_Test {
        public static void main(String[] args) {
            //類比停車場:3個停車位,有六輛車
            //重點1:規定同時訪問的執行緒數
            Semaphore semaphore = new Semaphore(3);
            for (int i =0; i < 6; i++){
                new Thread(()->{
                    //1.獲取停車位
                    try {
                        //重點2:獲取執行緒資源
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"得到車位,停2秒!");
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(Thread.currentThread().getName()+"開走了..");
                        //重點3:釋放執行緒資源
                        semaphore.release();
                    }
                },"執行緒"+i).start();
            }
        }
    }
    輸出:
        執行緒2得到車位,停2秒!
        執行緒0得到車位,停2秒!
        執行緒1得到車位,停2秒!
        執行緒0開走了..
        執行緒3得到車位,停2秒!
        執行緒1開走了..
        執行緒4得到車位,停2秒!
        執行緒2開走了..
        執行緒5得到車位,停2秒!
        執行緒5開走了..
        執行緒4開走了..
        執行緒3開走了..

結論:
    發現只能有3個執行緒同時訪問,其他的等待!
    作用:
        1.多個共享資源互斥的使用!
        2.併發限流,控制最大額執行緒數!