1. 程式人生 > >第五章 基礎構建模塊

第五章 基礎構建模塊

結構 間接 tac 也會 信息 吞吐量 非線程安全 返回 完成

5.1 同步容器類

  實現方式 : 將他們的狀態封裝起來,並對每個公有方法都進行同步, 使得每次只有一個線程可以訪問.

5.1.1 存在的問題

  復合操作 並非線程安全. 比如 叠代, 條件運算等.

  在對同步容器類的復合操作加鎖時一定要以容器對象為鎖對象, 保證復合操作的鎖對象容器使用的鎖對象一致.才能實現一個線程安全的復合操作

 public static void getLast(Vector<?> list) {
        // 此處的鎖對象必須和 Vector 內部的鎖對象一致
        synchronized (list) {
            
int lastIndex = list.size()-1; list.remove(lastIndex); } }

5.1.2 叠代器與ConcurrentModificationException

  在容器的叠代過程中被修改(結構上被改變)時會拋出 ConcurrentModificationException

  解決方法 : 在叠代過程中持有容器的鎖. 並在所有對共享容器進行叠代的地方加鎖

5.1.3 隱藏叠代器

  以下操作也會間接的進行容器的叠代操作

  toString() , hashCode() , equals() 等很多方法都出觸發容器的叠代操作.

5.2 並發容器

5.2.1 ConcurrentHashMap

  •   加鎖策略 : 分段鎖(粒度更細的加鎖機制), 同步方法塊
  •   支持多個線程並發的訪問 ConcurrentHashMap , 實現更高的吞吐量 .
  •   ConcurrentHashMap 返回的叠代器具有弱一致性 , 可以(但是不保證)將修改操作立即反映給容器
  •   叠代過程中不會加鎖 , 也不會拋出 ConcurrentModificationException ,
  •   將一些復合操作(putIfAbsent() 若沒有則添加) 實現為原子操作

5.2.3 CopyOnWriteArrayList

  • 每次修改容器時先復制數組, 引用依舊指向原來的數組 , 然後修改新的數組, 最後將引用指向新的數組.
  • 寫入時復制 也可理解為 修改時復制
  • 返回的叠代器不會拋出 ConcurrentModificationException
  • 使用場景 : 叠代操作遠遠多於修改操作. 復制數組的操作有一定的開銷.
  • 修改操作使用 ReentrantLock 進行加鎖

5.3 阻塞隊列

  • 提供阻塞的 put 和 take 方法
  • put 方法將阻塞到直到有空間可用 , take 方法將阻塞到直到有元素可用
  • 隊列可以有界, 也可以無界
  • 修改容器時統一使用創建隊列實例時創建的 ReentrantLock 對象

  BlockingQueue(阻塞隊列接口)

    LinkedBlockingQueue 類似與 LinkedList

    ArrayBlockingQueue 類似與 ArrayList

    PriorityBlockingQueue 按優先級排序的隊列

    SynchronousQueue 每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。非常適合做交換工作,生產者的線程和消費者的線程同步以傳遞某些信息、事件或者任務。

5.3.1 生產者與消費者的特點

  • 生產者和消費者只需要完成各自的任務
  • 阻塞隊列將負責所有的控制流
  • 每個功能的代碼和邏輯更加清楚
  • 他們整體的並行度取決於兩者中較低的並行度

  生產者和消費者設計也可以使用 Executor 任務執行框架來實現, 其本身也使用 生產者--消費者模式

5.3.2 串行線程封閉 

  • 線程封閉對象只能由單個對象擁有,可以通過安全的發該對象來轉移所有權.並且發布對象的線程不會再訪問它
  • 轉移所有權之後,只有另外一個線程獲得這個對象的訪問權限, 它可以對它做任意的修改,因為它有獨占訪問權.

5.3.3 雙端隊列和工作密取

  雙端隊列

  • ArrayDeque 和 LinkedBlockingDeque
  • 實現在隊列頭和隊列尾的高效插入和移除.

  工作密取

  • 每個消費者有自己的雙端隊列 , 當一個消費者完成自己隊列的所有任務後 , 那麽它可以從其他消費者的雙端隊列秘密的獲取任務 .

5.4 阻塞方法和中斷方法

  處理 InterruptedException

  • 將 InterruptedException 傳遞給方法的調用者
  • 捕獲這個異常, 並恢復中斷狀態

5.5 同步工具類

5.5.1 閉鎖(CountDownLatch)

  作用 : 用來確保某些活動直到其他活動都完成後才執行.

  • 計數器 : 表示需要等待的事件數量
  • await() : 阻塞調用此方法的線程直到計數器為0
  • countDown() : 計數器遞減

用法一 : 創建一定數量的線程,多個線程並發的執行任務

  • 使用兩個閉鎖, 分別表示起始門和結束門. 起始門初始值為1 , 結束門初始值為等待的事件數量.
  • 每個工作線程在起始門等待,
  • 所有線程就緒後起始門調用 countDown()直到起始門的值為0, 之後所有線程同時開始執行(因為線程之前都在起始門等待)
  • 主線程調用結束門的await(), 主線程阻塞直到所有工作線程結束
  • 工作線程全部執行完畢, 主線程運行
package com.pinnet.test;

import java.util.concurrent.CountDownLatch;

public class CountLatchTest {

    public void timeTask(int threadNumbers, Runnable task) throws InterruptedException {
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(threadNumbers);

        for (int i = 0; i < threadNumbers; i++) {
            new Thread() {
                public void run() {
                    try {
                        // 所有線程在起始門等待
                        start.await();
                        // 執行任務
                        task.run();
                        // 結束門遞減
                        end.countDown();
                    } catch (InterruptedException e) {

                    }
                }
            }.start();
        }
        // 所有工作線程開始執行
        start.countDown();
        // 所有工作線程啟動後主線程立即登待
        end.await();
        System.out.println("開始主線程");

    }

}

用法二: 創建一定數量的線程,多個線程依次的執行任務

  • 使用一個閉鎖
  • 線程依次啟動, 執行完成後countDown()
  • 主線程 await() , 直到計數器為0 ,主線程執行
public void timeTask2(int threadNumbers, Runnable task) throws InterruptedException {
        CountDownLatch start = new CountDownLatch(threadNumbers);
        // 任務一次執行
        for (int i = 0; i < threadNumbers; i++) {
            new Thread() {
                public void run() {
                        // 任務開始
                        task.run();
                        // 遞減
                        start.countDown();
                }
            }.start();
        }
        // 主線程阻塞直到計數器為0
        start.await();
        System.out.println("開始主線程");

    }

5.5.2 FutureTask

  一種可生成結果,可異步取消的計算.

  • 計算通過 Callable 實現
  • 可以將計算結果從執行任務的線程 傳遞 到獲取這個結果的線程 , 並保證其安全性
  • get() 獲取結果可阻塞到任務完成
public class FutureTaskTest {
    // 創建任務
    private final FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
        public Integer call() {
            return 123;
        }
    });
    // 創建線程
    private final Thread thread = new Thread(future);
    // 對外提供方法啟動線程
    public void start() {
        thread.start();
    }
    // 獲取計算結果
    public Integer get() {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return null;
        }
    }
}

5.5.3 信號量(Semaphore)

  • 控制同時訪問某個資源或執行某個操作的數量.
  • Semaphore 管理一定數量的許可, 執行操作時先獲取許可,執行完操作後釋放許可.
  • 獲取許可時可阻塞.

5.5.4 柵欄

  閉鎖和柵欄的區別 :

  • 閉鎖是一次性對象 , 一旦進入終止狀態 , 就不能被重置
  • 柵欄是所有線程必須同時到達柵欄位置 , 才能繼續執行.
  • 閉鎖用於等待事件 , 柵欄用於等待線程.

  基本使用 :

  • 指定數量線程到達柵欄位置後,所有線程被釋放 . 柵欄將被重置
  • 成功通過柵欄,await()將會返回一個到達索引號
  • await() 超時 或 await()阻塞的線程被中斷 , 則柵欄被打破.所有阻塞的await() 調用被終止 , 並拋出 BrokenBarrierException.

  用法一 : CyclicBarrier(int number)

      await() 調用 number 次後所有調用 await() 的線程繼續執行 , 否則線程在await() 阻塞

public void barrier() {
        int number = 6;
        // 參數表示屏障攔截的線程數量 
        // barrier.await() 調用 number 次後所有線程的阻塞狀態解除
        CyclicBarrier barrier = new CyclicBarrier(number);
        
        for (int i = 0; i < number; i++) {
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    System.out.println("此線程任務已經完成");
                    try {
                        // 調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。
                        barrier.await();
                        System.out.println("所有線程執行完成");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

用法二 : CyclicBarrier(int parties, Runnable runnable) 指定數量的線程到達屏障點後 執行 runnable

 public void barrier2() {
        int number = 6;
        // 參數表示屏障攔截的線程數量 
        // barrier.await() 調用 number 次後所有線程的阻塞狀態解除
        CyclicBarrier barrier = new CyclicBarrier(number, new Runnable() {
            
            @Override
            public void run() {
                //  指定數量的線程到達屏障點後執行  barrier()
                barrier();
            }
        });
        
        for (int i = 0; i < number; i++) {
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    System.out.println("此線程任務已經完成");
                    try {
                        // 調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。
                        barrier.await();
                        System.out.println("所有線程執行完成");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

雙方形式的柵欄 Exchanger

第五章 基礎構建模塊