1. 程式人生 > 其它 >多執行緒與高併發(5)

多執行緒與高併發(5)

執行緒不安全類與寫法

【執行緒不安全】:如果一個類類物件同時可以被多個執行緒訪問,如果沒有做同步或者特殊處理就會出現異常或者邏輯處理錯誤。
【1. 字串拼接】:
StringBuilder(執行緒不安全)、
StringBuffer(執行緒安全)
【2. 日期轉換】:
SimpleDateFormat(執行緒不安全,最好使用區域性變數[堆疊封閉]保證執行緒安全)
JodaTime推薦使用(執行緒安全)
【3. ArrayList、HashSet、HashMap等Collections】:
ArrayList(執行緒不安全)
HashSet(執行緒不安全)
HashMap(執行緒不安全)
【**同步容器**synchronized修飾】

Vector、Stack、HashTable
Collections.synchronizedXXX(List、Set、Map)
【**併發容器** J.U.C】
ArrayList->CopyOnWriteArrayList:(讀時不加鎖,寫時加鎖,避免複製多個副本出來將資料搞亂)寫操作時複製,當有新元素新增到CopyOnWriteArrayList中時,先從原有的陣列中拷貝一份出來,在新的陣列上進行寫操作,寫完之後再將原來的陣列指向新的陣列。

HashSet、TreeSet->CopyOnWriteArraySet、ConcurrentSkipListSet
HashMap、TreeMap

->ConcurrentHashMap、ConcurrentSkipListMap
相比ConcurrentHashMap,ConcurrentSkipListMap具有如下優勢:

  1. ConcurrentSkipListMap的存取速度是ConcurrentSkipListMap的4倍左右
  2. ConcurrentSkipListMap的key是有序的
  3. ConcurrentSkipListMap支援更高的併發(它的存取時間和執行緒數幾乎沒有關係,更高併發的場景下越能體現出優勢)

六、安全共享物件策略 - 總結

  1. 執行緒限制:一個被執行緒限制的物件,由執行緒獨佔,並且只能被佔有它的執行緒修改
  2. 共享只讀:一個共享只讀的物件,在沒有額外同步的情況下,可以被多個執行緒併發訪問,但是任何執行緒都不能修改它
  3. 執行緒安全物件:一個執行緒安全的物件或者容器,在內部通過同步機制來保證執行緒安全,所以其他執行緒無需額外的同步就可以通過公共介面隨意訪問它
  4. 被守護物件:被守護物件只能通過獲取特定鎖來訪問

七、J.U.C 之 AQS

7.1、 AQS

AQS:AbstractQueneSynchronizer

  1. 使用Node實現FIFO佇列,可以用於構建鎖或者其他同步裝置的基礎框架
  2. 利用int型別表示狀態
  3. 使用方法是繼承
  4. 子類通過繼承並通過實現它的方法管理其狀態{ acquire和release }的方法操縱狀態
  5. 可以同時實現排它鎖和共享鎖模式(獨佔、共享)

7.2、 AQS的同步元件如下:

7.2.1、CountDownLatch:閉鎖,通過計數來保證執行緒是否一直阻塞.
CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。構造器中的計數值(count)實際上就是閉鎖需要等待的執行緒數量。這個值只能被設定一次,而且CountDownLatch沒有提供任何機制去重新設定這個計數值。

與CountDownLatch的第一次互動是主執行緒等待其他執行緒。主執行緒必須在啟動其他執行緒後立即呼叫CountDownLatch.await()方法。這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。

其他N 個執行緒必須引用閉鎖物件,因為他們需要通知CountDownLatch物件,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每呼叫一次這個方法,在建構函式中初始化的count值就減1。所以當N個執行緒都調 用了這個方法,count的值等於0,然後主執行緒就能通過await()方法,恢復執行自己的任務。

解釋一下CountDownLatch概念?
`CountDownLatch`和 `CyclicBarrier`的不同之處?
給出一些CountDownLatch使用的例子?
 CountDownLatch類中主要的方法?

public class CountDownLatchExample1 {
    
    // 執行緒數
    private final static int threadCount = 200;
    
    public static void main(String[] args) throws InterruptedException{
        // 使用執行緒池進行排程
        ExecutorService exec = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    System.out.println("exception:" + e);
                }finally{
                    countDownLatch.countDown(); // 計數器減一
                }
            });
        }
        countDownLatch.await(10, TimeUnit.MILLISECONDS);
        System.out.println("===finished===");
        exec.shutdown();
    }
    
    
    private static void test(int threadNum) throws InterruptedException{
        Thread.sleep(100);
        System.out.println("threadNum:" + threadNum);
    }
    

}
7.2.2、Semaphore(訊號量):可以控制同一時間併發執行緒的數目
主要函式:acquire、release、tryAcquire
    
public class SemaphoreExample1 {
    
    // 執行緒數
    private final static int threadCount = 20;
    
    public static void main(String[] args) throws InterruptedException{
        // 使用執行緒池進行排程
        ExecutorService exec = Executors.newCachedThreadPool();
        //併發控制(允許併發數20)
        final Semaphore semaphore = new Semaphore(3);
        
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if(semaphore.tryAcquire(5, TimeUnit.SECONDS)){
                        test(threadNum);
                        semaphore.release();
                    }
                    /** 多個許可:在程式碼中一共有10個許可,每次執行semaphore.acquire(5);
                     * 程式碼時耗費掉5個,所以20/5=4,
                     * 說明同一時間只有4個執行緒允許執行acquire()和release()之間的程式碼。
                     * */
//                    semaphore.acquire(3); // 獲取許可
//                    test(threadNum);
//                    semaphore.release(3); // 釋放許可
                } catch (Exception e) {
                    System.out.println("exception:" + e);
                }finally{
                    countDownLatch.countDown(); // 計數器減一
                }
            });
        }
//        countDownLatch.await(100, TimeUnit.MILLISECONDS);
        System.out.println("===finished===");
        exec.shutdown();
    }
    
    
    private static void test(int threadNum) throws InterruptedException{
        System.out.println("threadNum:" + threadNum);
        Thread.sleep(1000);
    }
    

}
7.2.3、CyclicBarrier:可以完成多個執行緒之間相互等待,只有當每個執行緒都準備就緒後,才能各自繼續往下執行
應用場景:需要所有的子任務都完成時,才執行主任務,這個時候就可以選擇使用CyclicBarrier。

簡單理解【`人滿發車`】:
長途汽車站提供長途客運服務。
當等待坐車的乘客到達20人時,汽車站就會發出一輛長途汽車,讓這20個乘客上車走人。
等到下次等待的乘客又到達20人是,汽車站就會又發出一輛長途汽車。
public class CyclicBarrierExample1 {
    
    // 執行緒數
    private final static int threadCount = 10;
    
    // 屏障的執行緒數目 5 
    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        System.out.println("===continue===");
    });
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool(); 
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(500);
            executorService.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        
        
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        System.out.println("===" + threadNum + " is ready.");
        try{
            barrier.await(2000, TimeUnit.MILLISECONDS);
        }catch(Exception e){
            System.out.println("e:"+e);
        }
        System.out.println("===" + threadNum + " continue");
    }

}
7.2.4、ReentrantLock
1. api:
    - lock()
    - unlock()
    - tryLock()
   private static Lock lock = new ReentrantLock();
   private static void test(int threadNum){
           lock.lock();
           try{
               count++;
           }finally{
               lock.unlock();
           }
       }
2. ReentrantLock和synchronized的區別
    - 1. `可重入性`
    - 2. `鎖的實現`:synchronized是jvm實現,ReentrantLock是jdk實現
    - 3. `效能區別`
    - 4. `功能方面的區別`
3. ReentrantLock獨有的功能
    - 1. 可指定是公平鎖還是非公平鎖,synchronized只能是非公平鎖(公平鎖:先等待的執行緒先獲得鎖)
    - 2. 提供了一個Condition類,可以分組喚醒需要喚醒的執行緒
    - 3. 提供能夠中斷等待鎖的執行緒的機制,lock.lockInterruptibly()
4. ReentrantReadWriteLock
5. StampedLock
6. 鎖的使用
   - 當只有少量競爭者執行緒的時候,`synchronized`是一個很好的通用的鎖的實現(synchronized不會引發死鎖,jvm會自動解鎖)
   - 競爭者執行緒不少,但是執行緒增長的趨勢是可以預估的,這時候使用`ReentrantLock`是一個很好的通用的鎖的實現
7.2.5、Condition
public class LockExample3 {
public static void main(String[] args){
    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();
    int u=1;
    
    
    new Thread(() -> {
        try{
            reentrantLock.lock();
            System.out.println("wait signal"); // 1
            condition.await();
        }catch(InterruptedException e){
            e.printStackTrace();
        }
        System.out.println("get signal");
        reentrantLock.unlock();
    }).start();
    
    new Thread(() -> {
        reentrantLock.lock();
        System.out.println("get lock");
        try{
            Thread.sleep(3000);
        }catch(InterruptedException e){
            e.printStackTrace();
        }
        condition.signalAll();
        System.out.println("send signal");
        reentrantLock.unlock();
    }).start();
    
    
}

}
7.2.6、FutureTask
    建立執行緒兩種方式繼承Thread,實現Runnable介面,這兩種方式,在任務執行完畢之後獲取不到執行結果
    FutureTask、Callable可以獲取到執行結果
    1. Callable和Runnable對比
    2. Future介面
    3. FutureTask
    ```
    public static void main(String[] args) throws InterruptedException, ExecutionException {
    FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println("do something in callable...");
            Thread.sleep(3000);
            return "Done";
        }
    });
    
    new Thread(futureTask).start();
    System.out.println("do something in main...");
    Thread.sleep(1000);
    String result = futureTask.get();
    System.out.println("result:"+result);
}

}

7.2.7、Fork/Join框架:將大模組切分成多個小模組進行計算

八、執行緒池

初始化好執行緒池例項之後,將任務丟進去等待排程執行。

8.1、Thread弊端

  1. 每次new Thread都要新建物件,效能差
  2. 執行緒缺乏統一管理,可能無限制的新建執行緒,相互競爭,有可能佔用過多的系統資源導致宕機或者OOM
  3. 缺少更多功能,如更多執行,定期執行,執行緒中斷

8.2、執行緒池的好處

  1. 可以重用存在的執行緒,減少物件的建立、消亡的開銷,效能佳
  2. 可以有效的控制最大併發數,提供系統資源利用率,同時可以避免過多的資源競爭,避免阻塞
  3. 提供定時執行、定期執行、單執行緒、併發數控制等功能
  4. ThreadPoolExecutor的初始化引數】
    corePoolSize:核心執行緒數量
    maximumPoolSize:縣城最大執行緒數
    workQueue:阻塞佇列,儲存等待執行的任務,很重要,會對執行緒池執行過程產生重大影響
    keepAliveTime:執行緒沒有任務執行時,最多保持多久時間終止
    unit:keepAliveTime的時間單位
    hreadFactory:執行緒工廠,用來建立執行緒
    rejectHandler:當拒絕處理任務時的策略

執行緒池-ThreadPoolExecutor狀態

執行緒池-ThreadPoolExecutor方法

1. execute():提交任務,交給執行緒池執行
2. submit():提交任務能夠返回執行結果execute + Future
3. shutdown():關閉執行緒池,等待任務都執行完
4. shutdownNow():關閉執行緒池,不等待任務執行完
5. getTaskCount():執行緒池已執行和未執行的任務總數
6. getCompletedTaskCount():已完成的任務總數
7. getPoolSize():執行緒池當前的執行緒數量
8. getActiveCount:當前執行緒池中正在執行任務的執行緒數量

8.3、執行緒池 - Executors框架(建立執行緒池)

  1. Executors.newCachedThreadPool:建立一個可快取的執行緒池,如果執行緒池長度超過了處理的需要可以靈活回收空閒執行緒,如果沒有可以回收的,那麼就新建執行緒
public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 往執行緒池中放任務
        for (int i = 0; i < 10; i++) {
            final int index = i; // 任務的序號
            executorService.execute(() -> {
                System.out.println("===task:"+index);
            });
        }
        executorService.shutdown(); // 關閉執行緒池
    }
  1. Executors.newFixedThreadPool:建立的是一個定長的執行緒池,可以控制執行緒的最大併發數,超出的執行緒會在佇列中等待
  2. Executors.newScheduledThreadPool:建立的也是定長執行緒池,支援定時以及週期性的任務執行
public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        
        // 往執行緒池中放任務
        executorService.scheduleAtFixedRate(() -> {
            log.info("===sechedule run");
        }, 1, 3, TimeUnit.SECONDS); // 延遲一秒,每隔三秒執行任務
        
        
        executorService.schedule(() -> {
            log.info("===sechedule run");
        }, 3, TimeUnit.SECONDS);
        
        executorService.shutdown(); // 關閉執行緒池
    }
  1. Executors.newSingleThreadExecutor:建立的是一個單執行緒化的執行緒池,會用唯一的一個工作執行緒來執行任務,保證所有任務按照指令順序執行(指令順序可以指定它是按照先入先出,優先順序執行)

newSingleThreadExecutor列印結果是按照順序輸出

8.4、執行緒池 - 合理配置

1. CPU密集型任務,就需要儘量壓榨CPU,參考可以設定為NCPU+1
2. IO密集型任務,參考可以設定為2*NCPU
> NCPU = CPU的數量
> UCPU = 期望對CPU的使用率 0 ≤ UCPU ≤ 1
> W/C = 等待時間與計算時間的比率
> 如果希望處理器達到理想的使用率,那麼執行緒池的最優大小為:
> 執行緒池大小=NCPU *UCPU(1+W/C)