1. 程式人生 > >『併發包入坑指北』之向大佬彙報任務

『併發包入坑指北』之向大佬彙報任務

boxed-water-is-better-1463964-unsplash.jpg

前言

目錄如下:

image

在面試過程中聊到併發相關的內容時,不少面試官都喜歡問這類問題:

當 N 個執行緒同時完成某項任務時,如何知道他們都已經執行完畢了。

這也是本次討論的話題之一,所以本篇為『併發包入坑指北』的第二篇;來聊聊常見的併發工具。

自己實現

其實這類問題的核心論點都是:如何在一個執行緒中得知其他執行緒是否執行完畢。

假設現在有 3 個執行緒在執行,需要在主執行緒中得知他們的執行結果;可以分為以下幾步:

  • 定義一個計數器為 3。
  • 每個執行緒完成任務後計數減一。
  • 一旦計數器減為 0 則通知等待的執行緒。

所以也很容易想到可以利用等待通知機制來實現,和上文的『併發包入坑指北』之阻塞佇列

的類似。

按照這個思路自定義了一個 MultipleThreadCountDownKit 工具,建構函式如下:

image

考慮到併發的前提,這個計數器自然需要保證執行緒安全,所以採用了 AtomicInteger

所以在初始化時需要根據執行緒數量來構建物件。

計數器減一

當其中一個業務執行緒完成後需要將這個計數器減一,直到減為0為止。

    /**
     * 執行緒完成後計數 -1
     */
    public void countDown(){

        if (counter.get() <= 0){
            return;
        }

        int count = this.counter.decrementAndGet();
        if (count < 0){
            throw new RuntimeException("concurrent error") ;
        }

        if (count == 0){
            synchronized (notify){
                notify.notify();
            }
        }

    }

利用 counter.decrementAndGet() 來保證多執行緒的原子性,當減為 0 時則利用等待通知機制來 notify 其他執行緒。

等待所有執行緒完成

而需要知道業務執行緒執行完畢的其他執行緒則需要在未完成之前一直處於等待狀態,直到上文提到的在計數器變為 0 時得到通知。

    /**
     * 等待所有的執行緒完成
     * @throws InterruptedException
     */
    public void await() throws InterruptedException {
        synchronized (notify){
            while (counter.get() > 0){
                notify.wait();
            }

            if (notifyListen != null){
                notifyListen.notifyListen();
            }

        }
    }

原理也很簡單,一旦計數器還存在時則會利用 notify 物件進行等待,直到被業務執行緒喚醒。

同時這裡新增了一個通知介面可以自定義實現喚醒後的一些業務邏輯,後文會做演示。

併發測試

主要就是這兩個函式,下面來做一個演示。

image.png

  • 初始化了三個計數器的併發工具 MultipleThreadCountDownKit
  • 建立了三個執行緒分別執行業務邏輯,完畢後執行 countDown()
  • 執行緒 3 休眠了 2s 用於模擬業務耗時。
  • 主執行緒執行 await() 等待他們三個執行緒執行完畢。

image.png

通過執行結果可以看出主執行緒會等待最後一個執行緒完成後才會退出;從而達到了主執行緒等待其餘執行緒的效果。

    MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
    multipleThreadKit.setNotify(() -> LOGGER.info("三個執行緒完成了任務"));

也可以在初始化的時候指定一個回撥介面,用於接收業務執行緒執行完畢後的通知。

image.png

當然和在主執行緒中執行這段邏輯效果是一樣的(和執行 await() 方法處於同一個執行緒)。

CountDownLatch

當然我們自己實現的程式碼沒有經過大量生產環境的驗證,所以主要的目的還是嘗試窺探官方的實現原理。

所以我們現在來看看 juc 下的 CountDownLatch 是如何實現的。

image

通過建構函式會發現有一個 內部類 Sync,他是繼承於 AbstractQueuedSynchronizer ;這是 Java 併發包中的基礎框架,都可以單獨拿來講了,所以這次重點不是它,今後我們再著重介紹。

這裡就可以把他簡單理解為提供了和上文類似的一個計數器及執行緒通知工具就行了。

countDown

其實他的核心邏輯和我們自己實現的區別不大。

    public void countDown() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

利用這個內部類的 releaseShared 方法,我們可以理解為他想要將計數器減一。

image

看到這裡有沒有似曾相識的感覺。

image.png

沒錯,在 JDK1.7 中的 AtomicInteger 自減就是這樣實現的(利用 CAS 保證了執行緒安全)。

只是一旦計數器減為 0 時則會執行 doReleaseShared 喚醒其他的執行緒。

image image.png

這裡我們只需要關心紅框部分(其他的暫時不用關心,這裡涉及到了 AQS 中的佇列相關),最終會呼叫 LockSupport.unpark 來喚醒執行緒;就相當於上文呼叫 object.notify()

所以其實本質上還是相同的。

await

其中的 await() 也是借用 Sync 物件的方法實現的。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //判斷計數器是否還未完成    
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

一旦還存在未完成的執行緒時,則會呼叫 doAcquireSharedInterruptibly 進入阻塞狀態。

image

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

同樣的由於這也是 AQS 中的方法,我們只需要關心紅框部分;其實最終就是呼叫了 LockSupport.park 方法,也就相當於執行了 object.wait()

  • 所有的業務執行緒執行完畢後會在計數器減為 0 時呼叫 LockSupport.unpark 來喚醒執行緒。
  • 等待執行緒一旦計數器 > 0 時則會利用 LockSupport.park 來等待喚醒。

這樣整個流程也就串起來了,它的使用方法也和上文的類似。

image

就不做過多介紹了。

實際案例

同樣的來看一個實際案例。

在上一篇《一次分表踩坑實踐的探討》提到了對於全表掃描的情況下,需要利用多執行緒來提高查詢效率。

比如我們這裡分為了 64 張表,計劃利用 8 個執行緒來分別處理這些表的資料,虛擬碼如下:

CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0;i<=63;i++){
	executor.execute(new Runnable(){
		@Override
		public void run(){
			List value = queryTable(i);
			total.put(value,NULL);
			count.countDown();
		}
	}) ;
	
}

count.await();
System.out.println("查詢完畢");

這樣就可以實現所有資料都查詢完畢後再做統一彙總;程式碼挺簡單,也好理解(當然也可以使用執行緒池的 API)。

總結

CountDownLatch 算是 juc 中一個高頻使用的工具,學會和理解他的使用會幫助我們更容易編寫併發應用。

文中涉及到的原始碼:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/communication/MultipleThreadCountDownKit.java

你的點贊與分享是對我最大的支援

相關推薦

併發彙報任務

前言 目錄如下: 在面試過程中聊到併發相關的內容時,不少面試官都喜歡問這類問題: 當 N 個執行緒同時完成某項任務時,如何知

PaddlePaddle踩系列——Linux安裝(一)

本週我們精選出社群問答進行整理彙總,開發者在使用PaddlePaddle過程中遇到任何技術難題,都可以到PaddlePaddle公眾號FAQ專欄上尋求解決方案,希望能幫助新使用者在Linux安裝過程中解答疑惑。   1.問題:cuda9.0需要安裝哪一個版本的paddle,安裝包在哪? •

.NET 跨平臺框架Avalonia UI: 填(一):熟悉UI操作

Avalonia 是一個跨平臺的 .NET UI 框架,支援 Windows、Linux、Mac OSX... (以及Android  IOS soon..) 本篇主要介紹Avalonia開發過程和Linux適配環節的幾個疑難點.. avalonia for vs安裝和.deb安裝包部署可參考:ht

Mint建站四 網站的打包上傳篇

OK,網站審批、環境搭建都弄好了,這時候就要編寫網站了。 編寫好網站,我們可以開始上傳了。 零、準備工作 編寫並測試好的網站、搭建完成的伺服器 一、打war包 以idea為例: 點選File→Project Structure→Artifacts,進入Artifa

Angular 從到挖坑 - Router 路由使用入門

### 一、Overview Angular 入坑記錄的筆記第五篇,因為一直在加班的緣故拖了有一個多月,主要是介紹在 Angular 中如何配置路由,完成重定向以及引數傳遞。至於路由守衛、路由懶載入等“高階”特性,並不會在本篇文章中呈現 對應官方文件地址: - [路由與導航](https://ang

兩個月,java.lang下的Thread類

java程序 edm r+ 阻塞 https color dex prime 好的 Thread類API中的英文描述: file:///D:/Java/JDK8/Docs/docs/api/index.html 英語不好,大致的意思是: 線程是程序執行時的線程,java虛

JDK併發--重

重入鎖基本使用:    使用java.util.concurrent.locks.ReentrantLock 類來實現,可以替代synchronized關鍵字。如下例,重入鎖有著顯示的操作過程,開發者可以手動指定在哪裡加鎖,在哪裡釋放(退出臨界區時必須釋放,不然其他執行緒沒有機會

c++讀與輸出的探索

對於讀入與輸出的探索和優化 ·cin和cout ·scanf和printf ·關閉同步流 ·freopen和fclose ·整型read ·浮點型與字串read ·快輸 ·fread <更新提示> <第一次更新> <正

JEECG快速開發平臺報錯解決第一波——pom.xml中jeecg的依賴jar沒匯入成功

不做多餘解釋,錯誤見上圖;jar包沒匯入成功,自然要想辦法匯入jar包,可選方法如下: 專案關閉,重新開啟,maven會自動下載; 選中專案,右鍵選擇RunAS,再選擇Maven install,控制檯會下載專案所依賴的包; 選中專案,右鍵選擇Maven,再選擇

git爬不完全:Permission to xxx.git denied to user的解決方案

  由於對之前github賬戶名不太滿意,又不像大神一樣需要兩個賬號繫結。所以今天註冊了新賬號,想把電腦繫結到新賬號上,結果就踩坑了……報錯如標題。   在網上按照這個題目可以搜尋出一大堆的解決方案,但都是一個思路,那就是兩個賬號繫結兩個金鑰。先不說操作過程稍微複雜點吧,關鍵在於這不是我想要的結果,我就要一個

JAVA多執行緒併發(六)死鎖

  在多執行緒的中,因為要保證執行緒安全,需要對一些操作進行加鎖,但是如果操作不當,會造成死鎖,導致程式無法執行下去。   形成死鎖的場景:如果有兩個執行緒,執行緒1和執行緒2,執行緒1執行,獲得鎖A,執行緒2執行,獲得B,執行緒1等待鎖B的釋放,執行緒2等待

JAVA多執行緒併發(五)生產者消費者模式

生產者消費者模式對於理解多執行緒是一個很經典,也很好的例子 資源類: class Resource{ //資源初始化個數 private int init; //資源最大個數 private int Max; p

JAVA多執行緒併發(二)執行緒的生命週期和常用方法

執行緒的生命週期大致分為五種狀態: 1. 新建: 新建一個執行緒物件。 2.可執行: 啟動執行緒,呼叫start方法或者呼叫執行緒池的excute方法等,此時執行緒會進入可執行執行緒池中,等待獲取CPU的時間片。 3.執行 執行狀態,也就

JAVA多執行緒併發(八)詳解ThreadLocal使用和原理

  ThreadLocal是一個用於儲存多執行緒變數的類,它可以把執行緒與設定的值對應起來,因為它為變數在每個執行緒都建立了一個副本。訪問的時候每個執行緒只能訪問到自己的副本變數。 例項 看如下程式碼: public class Main {

JAVA多執行緒併發(四)賣火車票瞭解一下

多執行緒最常用的兩個例子就是:火車票和生產者消費者問題了,本文簡單的實現一下賣火車票的例子, 首先建立車票類: class Ticket implements Runnable{ private int num; private int

JAVA多執行緒併發(九)CAS和ABA

  如果瞭解資料庫的悲觀鎖和樂觀鎖的話,對於理解CAS就很簡單了,因為CAS就是樂觀鎖的具體實現。   悲觀鎖:在操作資料庫時本能的覺得一定會有競爭,所以把資料鎖住,不讓其他事物對對應的資料進行操作,在本次操作之後把鎖釋放,其它事物才可以進行操作。這個在jav

java執行緒學習(六):JDK併發鎖-ReentrantLock

本章將學習重入鎖的以下方法: lock(): 獲得鎖,如果鎖已被佔用,則等待; lockInterruptbly(): 獲得鎖,但優先響應中斷; tryLock(): 嘗試獲得鎖,如果成功,返回true,反之返回false,該方法不會等待,執行則立即返回,可用tryL

vue踩不完全(1)

1.Vuejs元件 vuejs構建元件使用 Vue.component('componentName',{ /*component*/ }); 這裡注意一點,元件要先註冊再使用,也就是說: Vue.component('mine',{ templa

vue2隨記(一)

change 奇怪 變量 ble 全局 ole 獲取 cit push 都說Vue2簡單,上手容易,但小馬過河,自己試了才曉得,除了ES6語法和webpack的配置讓你感到陌生,重要的是思路的變換,以前隨便拿全局變量和修改dom的錘子不能用了,變換到關註數據本身。vue的

強力推薦:Atom必備插件

格式化 方便 tom 一個 rip 設置 等等 依靠 同步 強力推薦:Atom入坑必備插件 Atom作為Javascript/CSS/HTML等前端編輯器利器,其強大功能依靠各種插件,以下是筆者在入坑階段,精挑細選總結出的必不可少的插件,熟悉運用這些插件,一定成噸提高生產效