1. 程式人生 > 實用技巧 >執行緒安全的集合筆記

執行緒安全的集合筆記

如果多個執行緒要併發地修改一個數據結構,例如散列表,那麼很容易破獲這個資料結構。例如,一個執行緒可能開始向表裡插入一個新元素。假定在調整散列表各個桶之間的連結關係的過程中,這個執行緒的控制權被搶佔。如果另一個執行緒開始遍歷同一個連結串列,可能使用無效的連結並造成混亂,可能會丟擲異常或者陷入無限迴圈。
可以通過鎖來保護共享的資料結構,但是選擇執行緒安全的實現可能更為容易。

1、阻塞佇列

很多執行緒問題可以使用一個或多個佇列以優雅而安全的方式來描述。生產者佇列向佇列插入元素,消費者執行緒則獲取元素。使用佇列,可以安全地從一個執行緒向另一個執行緒傳遞資料。
當試圖向佇列新增元素而佇列已滿,或是想從佇列移出元素而佇列為空的時候,阻塞佇列(blocking queue)將導致執行緒阻塞。在協調多個執行緒之間的合作時,阻塞佇列是一個有用的工具。工作執行緒可以週期性地將中間結果儲存在阻塞佇列中。其他工作執行緒移出中間結果,並進一步進行修改。
阻塞執行緒方法

方法 正常動作 特殊情況下的動作
add 新增一個元素 如果佇列滿,則丟擲IllegalStateException異常
element 返回隊頭元素 如果佇列空,則丟擲NoSuchElementException異常
offer 新增一個元素並返回true 如果佇列滿,則返回false
peek 返回隊頭元素 如果佇列空,則返回null
poll 移除並返回隊頭元素 如果佇列空,則返回null
put 新增一個元素 如果佇列滿,則阻塞
remove 移除並返回隊頭元素 如果佇列空,則丟擲NoSuchElementException異常
take 移除並返回隊頭元素 如果佇列空,則阻塞

java.util.concurrent包提供了阻塞佇列的幾個變體。預設情況下,LinkedBlockingQueue的容量沒有上界,但是,也可以選擇指定一個最大容量。LinkedBlockingDeque是一個雙端佇列。ArrayBlockingQueue在構造時需要指定容量,並且有一個可選的引數來指定是否需要公平性。若設定了公平引數,那麼等待了最長時間的執行緒會優先得到處理。通常,公平性會降低效能,只有在確實非常需要時才使用公平引數。
PriorityBlockingQueue是一個優先佇列,而不是先進先出佇列。元素按照它們的優先順序順序移除。這個佇列沒有容量上限,但是,如果佇列是空的,獲取元素的操作會阻塞。
接下來的程式展示瞭如何使用阻塞佇列來控制一組執行緒。程式在一個目錄及其所有子目錄下搜尋檔案,打印出包含指定關鍵字的行。

package com.company.Synchronize12.blockingQueue;


import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Created by kzm on 2020/11/3 7:44
 */
public class BlockingQueueTest {
    private static final int FILE_QUEUE_SIZE = 10;
    private static final int SEARCH_THREADS = 100;
    private static final Path DUMMY = Paths.get("");
    private static BlockingQueue<Path> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);

    public static void main(String[] args) {
        try (Scanner in = new Scanner(System.in)){
            System.out.print("Enter base directory (e.g. /opt/jdk-9-src): ");;
            String directory = in.nextLine();
            System.out.print("Enter keyword (e.g. volatile): ");
            String keyword = in.nextLine();

            Runnable enumerator = () -> {
                try {
                    enumerate(Paths.get(directory));
                    queue.put(DUMMY);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            };

            new Thread(enumerator).start();
            for (int i = 1; i <= SEARCH_THREADS; i++){
                Runnable searcher = () -> {
                    try {
                        boolean done = false;
                        while (!done){
                            Path file = queue.take();
                            if (file == DUMMY){
                                queue.put(file);
                                done = true;
                            }
                            else search(file, keyword);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                };
                new Thread(searcher).start();
            }
        }
    }


    /**
     * 遞迴列舉給定目錄及其子目錄中的所有檔案
     * @param directory
     * @throws IOException
     * @throws InterruptedException
     */
    public static void enumerate(Path directory) throws IOException, InterruptedException {
        try (Stream<Path> children = Files.list(directory)){
            for (Path child : children.collect(Collectors.toList())){
                if (Files.isDirectory(child))
                    enumerate(child);
                else
                    queue.put(child);
            }
        }
    }

    /**
     * 在檔案中搜索給定的關鍵字並列印所有匹配的行
     * @param file
     * @param keyword
     * @throws IOException
     */
    public static void search(Path file, String keyword) throws IOException {
        try (Scanner in = new Scanner(file, String.valueOf(StandardCharsets.UTF_8))){
            int lineNumber = 0;
            while (in.hasNextLine()){
                lineNumber++;
                String line = in.nextLine();
                if (line.contains(keyword)){
                    System.out.printf("%s:%d:%s%n", file, lineNumber, line);
                }
            }
        }
    }
}

生產者執行緒列舉所有子目錄下的所有檔案並把它們放到一個阻塞佇列中。
同時啟動了大量搜尋執行緒。每個搜尋執行緒從佇列中取出一個檔案,開啟它,列印所有包含該關鍵字的行,然後取出下一個檔案。使用了一個小技巧在工作結束後終止這個應用。為了發出完成訊號,列舉執行緒會在佇列中放置一個虛擬物件。當搜尋執行緒取到了這個虛擬物件時,將其放回並終止。
注意,這裡不需要顯示的執行緒同步。在這個應用程式中,我們使用佇列資料結構作為一種同步機制。

2、高效的對映、集和佇列

java.util.concurrent包提供了對映、有序集和佇列的高效實現:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue。
這些集合使用複雜的演算法,通過允許併發地訪問資料結構的不同部分儘可能減少競爭。
集合返回弱一致性的迭代器。這意味著迭代器不一定能反映出它們構造之後的所有更改,但是,它們不會將同一個值返回兩次,也不會丟擲ConcrrentModificationException異常。與之形成對照的是,對於java.util包中的集合,如果集合在迭代器構造之後發生改變,集合的迭代器將丟擲一個ConcurrentModificationException異常。
併發雜湊對映可以高效地支援大量閱讀器和一定數量地書寫器。預設情況下認為可以有至多16個同時執行地書寫器程序。當然可以有更多的書寫器程序,但是,同一時間如果多於16個,其他執行緒將暫時阻塞。

3、對映條目的原子更新

在老版本的Java中,必須使用replace操作,它會以原子方式用一個新值替換原值,前提是沒有其他執行緒把原值替換為其他值。必須一直這麼做,直到替換成功:

do
{
      oldValue = map.get(word);
      newValue = oldValue == null ? 1 : oldValue + 1;
}
while (!map.replace(word, oldValue, newValue));

或者,可以使用一個ConcurrentHashMap<String, AtomicLong>,以及以下更新程式碼:

map.putIfAbsent(word, new AtomicLong());
map.get(word).incrementAndGet();

很遺憾,這會為每個自增構造一個新的AtomicLong,而不管是否需要。
如今,Java API提供了一些新方法,可以更方便地完成原子更新。呼叫compute方法時可以提供一個鍵和一個計算新值的函式。這個函式接受鍵和相關聯的值(如果沒有值,則為null),它會重新計算新值。例如,可以如下更新一個整數計數器的對映:

map.compute(word, (k,v) -> v == null ? 1 : v + 1);

另外還有computeIfPresent和computeIfAbsent方法,它們分別只在已經有原值的情況下計算新值,或者只在沒有原值的情況下計算新值。
首次增加一個鍵時通常需要做些特殊的處理。利用merge方法可以非常方便地做到這一點。這個方法有一個引數表示鍵不存在時使用的初始值。否則,就會呼叫你提供的函式來結合原值與初始值。

map.merge(word, 1L, (existingValue, newValue) -> existingValue + newValue);

或者,更簡單地可以寫為:

map.merge(word, 1L, Long::sum);

下面的程式使用了一個併發雜湊對映來統計一個目錄數的Java檔案中的所有單詞。

package com.company.Synchronize12.concurrentHashMap;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Created by kzm on 2020/11/4 12:26
 */
public class CHMDemo {
    public static ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();

    /**
     * 將給定檔案中的所有單詞新增到併發雜湊圖
     * @param file
     */
    public static void process(Path file){
        try (Scanner in = new Scanner(file)){
            while (in.hasNext()){
                String word = in.next();
                map.merge(word, 1L, Long::sum);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 返回給定目錄的所有後代
     * @param rootDir
     * @return
     * @throws IOException
     */
    public static Set<Path> descendants(Path rootDir) throws IOException {
        try (Stream<Path> entries = Files.walk(rootDir)){
            return entries.collect(Collectors.toSet());
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        int processors = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = Executors.newFixedThreadPool(processors);
        Path pathToRoot = Paths.get(".");
        for (Path p : descendants(pathToRoot)){
            if (p.getFileName().toString().endsWith(".java"))
                executor.execute(() -> process(p));
        }
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.MINUTES);
        map.forEach((k, v) -> {
            if (v >= 10)
                System.out.println(k + " occurs " + v + " times");
        });
    }
}
4、對併發雜湊對映的批操作

Java API為併發雜湊對映提供了批操作,即使有其他執行緒在處理對映,這些操作也能安全地執行。批操作會遍歷對映,處理遍歷過程中找到的元素。這裡不會凍結對映的當前快照。除非你恰好知道批操作執行時對映不會被修改,否則就要把結果看作是對映狀態的一個近似。
有3中不同的操作:

  • search(搜尋)為每個鍵或值應用一個函式,直到函式生成了一個非null的結果。然後搜尋終止,返回這個函式的結果。
  • reduce(歸約)組合所有鍵或值,這裡要使用所提供的一個累加函式。
  • forEach為所有鍵或值應用一個函式。
    每個操作都有4個版本:
  • operationKeys: 處理鍵。
  • operationValue: 處理值。
  • operation: 處理鍵和值。
  • operationEntries: 處理Map.Entry物件。
    對於上述各個操作,需要指定一個引數化閾值(parallelism threshold)。如果對映包含的元素多於這個閾值,就會並行完成批操作。如果希望批操作在一個執行緒中執行,可以使用閾值Long.MAX_VALUE。如果希望用盡可能多的執行緒執行批操作,可以使用閾值1。
    下面首先來看search方法。有以下版本:
U searchKeys(long threshold, BiFunction<? super K, ? extends U> f)
U searchVaiues(long threshold, BiFunction<? super V, ? extends U> f)
U search(long threshold, BiFunction<? super K, ? super V,? extends U> f)
U searchEntries(long threshold, BiFunction<Map.Entry<K, V>, ? extends U> f)

例如,假設我們希望找出第一個出現次數超過1000次的單詞。需要搜尋鍵和值:

String result = map.search(threshold, (k, v) -> v > 1000 ? k : null);

result會設定為第一個匹配的單詞,如果搜尋函式對所有輸人都返回null,則返回null。
forEach方法有兩種形式。第一個只為各個對映條目提供一個消費者函式,例如:

map.forEach(threshold, (k, v) -> System.out.println(k + "->" + v));

第二種形式還有一個轉換器函式,這個函式要先提供,其結果會傳遞到消費者:

map.forEach(threshold,
    (k, v) -> k + "->" + v,// transformer
    System.out::println);// consumer

轉換器可以用作為一個過濾器。只要轉換器返回null,這個值就會被悄無聲息地跳過。例如,下面只打印有大值的條目:

map.forEach(threshold,
    (k, v) -> v > 1000 ? k + "->" + v : null,// filter and transformer
    System.out::println);// the nulls are not passed to the consumer

reduce操作用一個累加函式組合其輸入。例如,可以如下計算所有值的總和:

Long sum = map.reduceValues(threshold, Long::sum);

與forEach類似,也可以提供一個轉換器函式。可以如下計算最長的鍵的長度:

Integer maxlength = map.reduceKeys(threshold,
String::length,// transformer
Integer::max); // accumulator

轉換器可以作為一個過濾器,通過返回null來排除不想要的輸入。在這裡,我們要統計多少個條目的值>1000:

Long count = map.reduceValues(threshold,
v -> v > 1000 ? 1L : null,
Long::sum);

對於int、long和double輸出還有相應的特殊化操作,分別有後綴Tolnt、ToLong和ToDouble。需要把輸入轉換為一個基本型別值,並指定一個預設值和一個累加器函式。對映為空時返回預設值。

long sum = map.reduceValuesToLong(threshold,
Long::longValue,// transformer to primitive type
0,// default value for empty map
Long::sura); // primitive type accumulator
5、併發集檢視

靜態newKeySet方法會生成一個Set,這實際上是ConcurrentHashMap<K,Boolean>的一個包裝器。

Set<String> words = ConcurrentHashMap.<String>newKeySet();

當然,如果原來有一個對映,keySet方法可以生成這個對映的鍵集。這個集是可變的。如果刪除這個集的元素,這個鍵(以及相應的值)會從對映中刪除。不過,不能向鍵集增加元素,因為沒有相應的值可以增加。JavaSE8為ConcurrentHashMap增加了第二個keySet方法,包含一個預設值,可以在為集增加元素時使用:

Set<String> words = map.keySet(1L);
words.add("Java");

如果"Java”在words中不存在,現在它會有一個值1。

6、寫陣列的拷貝

CopyOnWriteArrayList和CopyOnWriteArraySet是執行緒安全的集合,其中所有的修改執行緒對底層陣列進行復制。如果在集合上進行迭代的執行緒數超過修改執行緒數,這樣的安排是很有用的。當構建一個迭代器的時候,它包含一個對當前陣列的引用。如果陣列後來被修改了,迭代器仍然引用舊陣列,但是,集合的陣列已經被替換了。因而,舊的迭代器擁有一致的(可能過時的)檢視,訪問它無須任何同步開銷。

7、並行陣列演算法

Arrays類提供了大量並行化操作。靜態Arrays.parallelSort方法可以對一個基本型別值或物件的陣列排序。例如,

String contents = new String(Fi1es.readAllBytes(
Paths.get("alice.txt")), StandardCharsets.UTF_8);// read file into string
String[] words = contents.split("[\\P{L}]+");// split along nonletters
Arrays.parallelSort(words);

對物件排序時,可以提供一個Comparator。

Arrays,parallelSort(words,Comparator.comparing(String::length));

對於所有方法都可以提供一個範圍的邊界,如:

values,parallelSort(values, length/2, values,length);// 排序上半部分

parallelSetAll方法會用由一個函式計算得到的值填充一個數組。這個函式接收元素索引,然後計算相應位置上的值。

Arrays.parallelSetAll(values, i -> i % 10);// fills values with 0123456789012...

顯然,並行化對這個操作很有好處。這個操作對於所有基本型別陣列和物件陣列都有相應的版本。最後還有一個parallelPrefix方法,它會用對應一個給定結合操作的字首的累加結果替換各個陣列元素。

8、較早的執行緒安全集合

從Java的初始版本開始,Vector和Hashtable類就提供了執行緒安全的動態陣列和散列表的實現。現在這些類被棄用了,取而代之的是ArrayList和HashMap類。這些類不是執行緒安全的,而集合庫中提供了不同的機制。任何集合類都可以通過使用同步包裝器(synchronization wrapper)變成執行緒安全的:

List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>());
Map<K, V> synchHashMap = Col1ections.synchronizedMap(new HashMap<K, V>());

結果集合的方法使用鎖加以保護,提供了執行緒安全訪問。
如果在另一個執行緒可能進行修改時要對集合進行迭代,仍然需要使用"客戶端"鎖定:

synchronized (synchHashMap)
{
    Iterator<K> iter = synchHashMap.keySet().iterator();
    while (iter.hasNext()) ...;
}

如果使用"for each"迴圈必須使用同樣的程式碼,因為迴圈使用了迭代器。注意:如果在迭代過程中,別的執行緒修改集合,迭代器會失效,丟擲ConcurrentModificationException異常。同步仍然是需要的,因此併發的修改可以被可靠地檢測出來。

最好使用java.Util.Concurrent包中定義的集合,不使用同步包裝器中的。特別是,假如它們訪問的是不同的桶,由於ConcurrentHashMap已經精心地實現了,多執行緒可以訪問它而且不會彼此阻塞。有一個例外是經常被修改的陣列列表。在那種情況下,同步的ArrayList可以勝過CopyOnWriteArrayList。