1. 程式人生 > >多執行緒集合佇列等等

多執行緒集合佇列等等

對於許多執行緒問題,可以通過使用一個或多個佇列以優雅且安全的方式將其形式化。當試圖向佇列新增元素而佇列已滿,或是想從佇列移出元素而佇列為空的時候, 阻塞佇列( blocking queue)導致執行緒阻塞。在協調多個執行緒之間的合作時,阻塞佇列是一個有用的工具。工作者執行緒可以週期性地將中間結果儲存在阻塞佇列中。其他的工作者執行緒移出中間結果並進一步加以修改。佇列會自動地平衡負載。

add 新增一個元素 如果佇列滿,則丟擲IllegalStateException異常
element 返回佇列的頭元素 如果佇列空,丟擲NoSuchElementException異常
offer 新增一個元素並返回true 如果佇列滿,返回false
peek 返回佇列的頭元素 如果佇列空,則返回null
poll 移出並返回佇列的頭元素 如果佇列空,則返回null
put 新增一個元素 如果佇列滿,則阻塞
remove 移出並返回頭元素 如果佇列空,則丟擲NoSuchElementException異常
take 移出並返回頭元素 如果佇列空,則阻塞

當試圖向滿的佇列中新增或從空的佇列中移出元素時, add、 remove和element操作丟擲異常。一定要使用offer、 poll和peek方法作為替代。向這些佇列中插入null值是非法的。
預設情況下,LinkedBlockingQueue的容量是沒有上邊界的,但是,也可以選擇指定最大容量。 LinkedBlockingDeque是一個雙端的版本。ArrayBlockingQueue在構造時需要指定容量,並且有一個可選的引數來指定是否需要公平性。
PriorityBlockingQueue是一個帶優先順序的佇列,而不是先進先出佇列。元素按照它們的優先級順序被移出。該佇列是沒有容量上限,但是,如果佇列是空的,取元素的操作會阻塞。
注意,不需要顯式的執行緒同步。我們使用佇列資料結構作為一種同步機制。

如果多執行緒要併發地修改一個數據結構,例如散列表,那麼很容易會破壞這個資料結構。可以通過提供鎖來保護共享資料結構,但是選擇執行緒安全的實現作為替代可能更容易些。阻塞佇列就是執行緒安全的集合。
java.util.concurrent包提供了映像、有序集和佇列的高效實現: ConcurrentHashMap、ConcurrentSkipListMap、 ConcurrentSkipListSet和ConcurrentLinkedQueue。這些集合使用複雜的演算法,通過允許併發地訪問資料結構的不同部分來使競爭極小化。與大多數集合不同, size方法不必在常量時間內操作。確定這樣的集合當前的大小通常需要遍歷。

CopyOnWriteArrayList和CopyOnWriteArraySet是執行緒安全的集合。

List<T> list =  Collections.synchronizedList(new ArrayList<T>());
Map<K, V> map = Collections.synchronizedMap(new HashMap<K, V>());

結果集合的方法使用鎖加以保護,提供了執行緒的安全訪問。應該確保沒有任何執行緒通過原始的非同步方法訪問資料結構。最便利的方法是確保不儲存任何指向原始物件的引用,簡單地構造一個集合並立即傳遞給包裝器。最好使用java.util.concurrent包中定義的集合,不使用同步包裝器中的。

Runnable封裝一個非同步執行的任務,可以把它想像成為一個沒有引數和返回值的非同步方法。Callable與Runnable類似,但是有返回值。 Callable介面是一個引數化的型別,只有一個方法call。

interface Callable<V>{
      V call() throws Exception;
} 
Future儲存非同步計算的結果。可以啟動一個計算,將Future物件交給某個執行緒,然後忘掉它。 Future物件的所有者在結果計算好之後就可以獲得它。

Future<V>{
 V get();
 v get(time,Timeunit);
 void cancel(boolean);
 boolean isCancelled();
 boolean isDone();
}
第一個get方法的呼叫被阻塞,直到計算完成。如果在計算完成之前,第二個方法的呼叫超時,丟擲一個TimeoutException異常。如果執行該計算的執行緒被中斷,兩個方法都將丟擲InterruptedException。如果計算已經完成,那麼get方法立即返回.

如果計算還在進行,isDone方法返回false;如果完成了,則返回true。可以用cancel方法取消該計算。如果計算還沒有開始,它被取消且不再開始。如果計算處於執行之中,那麼如果boolean引數為true,它就被中斷。

FutureTask包裝器是一種非常便利的機制,可將Callable轉換成Future和Runnable,它同時實現二者的介面。

Callable<V> a = new......;
FutureTask<v> b = new FutureTask<V>(a);
Thread c = new Thread(b);這裡是runnable
c.start();
.....
V d = b.get();//這裡是future

構建一個新的執行緒是有一定代價的,因為涉及與作業系統的互動。如果程式中建立了大量的生命期很短的執行緒,應該使用執行緒池(threadpool)。一個執行緒池中包含許多準備執行的空閒執行緒。將Runnable物件交給執行緒池,就會有一個執行緒呼叫run方法。當run方法退出時,執行緒不會死亡,而是在池中準備為下一個請求提供服務。
另一個使用執行緒池的理由是減少併發執行緒的數目。建立大量執行緒會大大降低效能甚至使虛擬機崩潰。如果有一個會建立許多執行緒的演算法,應該使用一個執行緒數“固定的”執行緒池以限制併發執行緒的總數。

執行器( Executor)類有許多靜態工廠方法用來構建執行緒池
newCachedThreadPool 必要時建立新執行緒;空閒執行緒會被保留60秒
newFixedThreadPool 該池包含固定數量的執行緒;空閒執行緒會一直被保留
newSingleThreadExecutor 只有一個執行緒的“池”,該執行緒順序執行每一個提交的任務
newScheduledThreadPool 用於預定執行而構建的固定執行緒池,替代java.util.Timer
newSingleThreadScheduledExecutor 用於預定執行而構建的單執行緒“池

newCachedThreadPool方法構建了一個執行緒池,對於每個任務,如果有空閒執行緒可用,立即讓它執行任務,如果沒有可用的空閒執行緒,則建立一個新執行緒。 newFixedThreadPool方法構建一個具有固定大小的執行緒池。如果提交的任務數多於空閒的執行緒數,那麼把得不到服務的任務放置到佇列中。當其他任務完成以後再執行它們。 newSingleThreadExecutor是一個退化了的大小為1的執行緒池:由一個執行緒執行提交的任務,一個接著一個。這 3個方法返回實現了ExecutorService介面的ThreadPoolExecutor類的物件。

可用下面的方法之一將一個Runnable物件或Callable物件提交給ExecutorService:

Future<?> submit(runnable);
Future<T> submit(runnable,T result);
Future<T> submit(Callable);

該池會在方便的時候儘早執行提交的任務。呼叫submit時,會得到一個Future物件,可用來查詢該任務的狀態。
第一個submit方法返回一個奇怪樣子的Future<?>。可以使用這樣一個物件來呼叫isDone、cancel或isCancelled。但是, get方法在完成的時候只是簡單地返回null。
第二個的Submit也提交一個Runnable,並且Future的get方法在完成的時候返回指定的result物件。
第三個的Submit提交一個Callable,並且返回的Future物件將在計算結果準備好的時候得到它。

當用完一個執行緒池的時候,呼叫shutdown。該方法啟動該池的關閉序列。被關閉的執行器不再接受新的任務。當所有任務都完成以後,執行緒池中的執行緒死亡。另一種方法是呼叫shutdownNow。該池取消尚未開始的所有任務並試圖中斷正在執行的執行緒。

下面總結了在使用連線池時應該做的事:
1)呼叫Executors類中靜態的方法newCachedThreadPool或newFixedThreadPool。
2)呼叫submit提交Runnable或Callable物件。
3)如果想要取消一個任務,或如果提交Callable物件,那就要儲存好返回的Future物件。
4)當不再提交任何任務時,呼叫 shutdown.


ExecutorService pool = Executors.newCachedThreadPool();
Future<V> a = pool.submit(new Runnable());
Future<V> b = pool.submit(new Runnable(),5);
Future<V> c = pool.submit(new Callable());
pool.shutdown();
((ThreadPoolExecutor)pool).getLargestPoolSize();//返回執行緒池在該執行器生命週期中的最大尺寸。

ScheduledExecutorService介面具有為預定執行( Scheduled Execution)或重複執行任務而設計的方法。它是一種允許使用執行緒池機制的java.util.Timer的泛化。Executors類的newScheduledThreadPool和newSingleThreadScheduledExecutor方法將返回實現了
ScheduledExecutorService介面的物件。

可以預定Runnable或Callable在初始的延遲之後只執行一次。也可以預定一個Runnable物件週期性地執行。

invokeAll方法提交所有物件到一個Callable物件的集合中,並返回一個Future物件的列表,代表所有任務的解決方案。當計算結果可獲得時,可以像下面這樣對結果進行處理:

List<Callable<V>>a=...;
List<Future<V>>list = pool.invokeAll(a);
for(){
    xxx.get();
}
這個方法的缺點是如果第一個任務恰巧花去了很多時間,則可能不得不進行等待。以結果按可獲得的順序儲存起來更有實際意義。可以用ExecutorCompletionService來進行排列.用常規的方法獲得一個執行器。然後,構建一個ExecutorCompletionService,提交任務給完成服務( completion service)。該服務管理Future物件的阻塞佇列,其中包含已經提交的任務的執行結果(當這些結果成為可用時)。

同步器

CyclicBarrier允許執行緒集等待直至其中預定數目的執行緒到達一個公共障柵( barrier),然後可以選擇執行一個處理障柵的動作,

什麼時候使用:當大量的執行緒需要在它們的結果可用之前完成時。

CountDownLatch允許執行緒集等待直到計數器減為0

使用:當一個或多個執行緒需要等待直到指定數目的事件發生

Exchanger允許兩個執行緒在要交換的物件準備好時交換物件

使用:當兩個執行緒工作在同一資料結構的兩個實例上的時候,一個向例項新增資料而另一個從例項清除資料

Semaphore允許執行緒集等待直到被允許繼續執行為止

使用:限制訪問資源的執行緒總數。如果許可數是1,常常阻塞執行緒直到另一個執行緒給出許可為止

SynchronousQueue允許一個執行緒把物件交給另一個執行緒
使用:在沒有顯式同步的情況下,當兩個執行緒準備好將一個物件從一個執行緒傳遞到另一個時

這些機制具有為執行緒之間的共用集結點模式( common rendezvous patterns)提供的“預置功能”(canned functionality)。如果有一個相互合作的執行緒集滿足這些行為模式之一,那麼應該直接重用合適的庫類而不要試圖提供手工的鎖與條件的集合。

相關推薦

執行集合佇列等等

對於許多執行緒問題,可以通過使用一個或多個佇列以優雅且安全的方式將其形式化。當試圖向佇列新增元素而佇列已滿,或是想從佇列移出元素而佇列為空的時候, 阻塞佇列( blocking queue)導致執行緒阻塞。在協調多個執行緒之間的合作時,阻塞佇列是一個有用的工具。工作者執行緒

Java執行 阻塞佇列和併發集合

本章主要探討在多執行緒程式中與集合相關的內容。在多執行緒程式中,如果使用普通集合往往會造成資料錯誤,甚至造成程式崩潰。Java為多執行緒專門提供了特有的執行緒安全的集合類,通過下面的學習,您需要掌握這些集合的特點是什麼,底層實現如何、在何時使用等問題。 3.1Blockin

PYTHON——執行佇列Queue資料結構

1、佇列模組簡介   佇列是一種資料結構,用於存放資料,類似列表。它是先進先出模式(FIFO模式),類似管道一般; 單執行緒不需要用到佇列Queue,它主要用在多執行緒之間的,Queue稱為多執行緒利器。 列表在多執行緒共享資源的話,與queue佇列比較,主要表現為列表在多執行緒中,資料不安全。多個執行

python 執行處理佇列

轉載自: https://blog.csdn.net/u011655220/article/details/79037032 from threading import Thread import time import random from queue import Queue from co

c# 執行使用佇列順序寫日誌的類 (需要再優化)

using System; using System.Collections.Generic; using System.Threading; public class LogManager { /// <summary> /// 建構函式 /// </su

Java執行---阻塞佇列詳解(舉例說明)

一. 前言   在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員

執行佇列和同/非同步執行問題

佇列分為並行佇列和序列佇列,執行方式分為同步執行和非同步執行,那麼組合一下就有四種方式,下面我會用GCD來驗證和總結一下各個組合的特性。 併發佇列,同步執行 //併發佇列+同步執行 //不會開啟新執行緒,任務順序執行 -(void)test1{ NSLog(@"併發佇列+

java執行任務佇列模型

此篇文章將從任務佇列的設計;任務排程的方式(序列和並行)。程式碼很簡單,主要是設計的思想。 任務佇列 final class PendingPostQueue {     // 含有頭、尾指標的連結串列結構實現佇列     private PendingPost head;

python_day29_通過類建立執行_佇列

#Author:'haijing'#date:2018/12/20import threadingimport time#通過類建立多執行緒class MyThread(threading.Thread): #MyThread類繼承threading.Thread類 def __init__(self

PHP 高階程式設計之執行-訊息佇列

1. 多執行緒環境安裝 1.1. PHP 5.5.9 安裝PHP 5.5.9 https://github.com/oscm/shell/blob/master/php/5.5.9.sh ./configure --prefix=/srv/php-5.5.9 \ --wi

015.執行-併發佇列

在併發佇列上JDK提供了兩套實現, 一個是以ConcurrentLinkedQueue為代表的高效能佇列, 一個是以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承自Queue。 ConcurrentLinkedQueue ConcurrentLinkedQu

執行池使用ExecutorService 執行處理佇列任務

最近轉到銀行工作,在做最核心的財務賬務部分,對我來說是一個比較新的東西,工作也已經四年有餘,接觸一些新的東西,也是不錯,每天也累得像狗... 不說了。/捂臉 接下來說一種非常實用的多執行緒操作模式,此方式能夠應對大部分的多執行緒操作,稍微改一下往裡面套就可以滿足大部分的業務

python爬蟲入門八:程序/多執行緒 python佇列Queue Python多執行緒(2)——執行緒同步機制 python學習筆記——程序中共享記憶體Value & Array python 之 程序 Python程序 Python 使用multiprocessing 特別耗記

什麼是多執行緒/多程序 引用蟲師的解釋: 計算機程式只不過是磁碟中可執行的,二進位制(或其它型別)的資料。它們只有在被讀取到記憶體中,被作業系統呼叫的時候才開始它們的生命期。 程序(有時被稱為重量級程序)是程式的一次執行。每個程序都有自己的地址空間,記憶體,資料棧以及其它記錄其執行軌跡的輔助資料

tensorflow常用影象處理程式碼,執行佇列等python程式碼

TensorFlow學習筆記(五)影象資料處理 目錄:   一、TFRecord輸入資料格式   1.1 TFrecord格式介紹   1.2 TFRecord樣例程式   二、影象資料處理   

從零開始學Python(八):Python執行佇列

很久沒有更新博文啦,在家過春節已經變懶了-_-,不過答應大家更完這個python的入門系列,偶還是會繼續努力的!另外祝願大家新年快樂,事事順心! 執行緒的概念 我們學習的很多程式語言,比如java,oc等,都會有執行緒這個概念.執行緒的用途非常的廣泛,給

JAVA執行佇列

         JAVA 已經給我們提供了比較好的佇列實現Queue,繼承於Collection。 本次我使用的是BlockingQueue,繼承於Queue。              在Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通

Python 執行|Queue佇列|生產者消費者模式|

Queue佇列 Python中,佇列是執行緒間最常用的交換資料的形式。Queue模組是提供佇列操作的模組,雖然簡單易用,但是不小心的話,還是會出現一些意外。Queue是執行緒安全的,自帶鎖,使用的時候,不用對佇列加鎖操作。 1. 將一個值放入佇列中 q.get()呼叫佇列物件的get

執行佇列的坑

在多執行緒下,我們一般認為push和pop 2個執行緒是沒有衝突的。但事實上如果我們在判斷size為不為空而去判斷pop時就要注意一個問題。將push和pop上鎖了,因為有可能size不再是正確的siz

java執行佇列例項

第一步:建立一個無邊界自動回收的執行緒池,在此用 JDK提供的ExecutorService類 此執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執

刀哥執行序列佇列gcd-04-dispatch_queue_serial

序列佇列 特點 以先進先出的方式,順序排程佇列中的任務執行 無論佇列中所指定的執行任務函式是同步還是非同步,都會等待前一個任務執行完成後,再排程後面的任務 佇列建立 dispatch_queue_t queue = dispatch_qu