1. 程式人生 > 其它 >關於多執行緒的三兩事

關於多執行緒的三兩事

https://www.cnblogs.com/idiotgroup/p/15032036.html

多執行緒一直是程式設計中的重要的工具,它可以分充分的利用硬體資源,是我們用更少的時間去完成更多的事情。在之前的部落格中,我有介紹了OpenMP的基本使用,OpenMP可以理解為多執行緒的一個合理和高效的一套抽象工具。這次,打算仔細的介紹多執行緒程式設計中的常見的概念和典型的案例。

典型的案例

說到多執行緒,最核心的問題就是保證資料的讀寫安全。為了達到此目的,我們需要多很多常見的資料結構做一些改造,從而適應多執行緒的場景。以下是我工作中比較常見到的一些使用場景:

  1. 執行緒池
  2. 讀寫鎖
  3. 訊息佇列
  4. ConcurrentCache
  5. PingPang Buffer

在具體介紹這些使用場景之前,我們還是需要了解需要使用到的一些基本的工具:互斥量、條件變數、原子操作等。

互斥量

互斥量,顧名思義,就是互斥的資料,一個執行緒持有的時候,其他執行緒就必須等待。

在C++11中,使用<mutex>標頭檔案引入。以下是一個簡單的計數器的實現。

emit函式通過mutex_進行加鎖,使得同一時間僅有一個執行緒可以執行++ x_的操作,從而保證了計數的正確性。

std::lock_guard是個工具類,lck在構造時,呼叫了lock函式,在析構時呼叫了unlock,從而避免我們自行呼叫的時候忘記unlock。

#include <mutex>
#include <thread>
#include <iostream>

class Counter {
public:
    Counter(): x_(0) {}
    void emit() {
        mutex_.lock();
        ++ x_;
        mutex_.unlock();
        // or
        // std::lock_guard<std::mutex> lck(mutex_);
        // ++ x_;
    }
    int count() {
        return x_;
    }
private:
    int x_;
    std::mutex mutex_;
};

int main() {
    Counter c;
    std::thread t1([&c]{
        for (int i = 0; i < 10000000; ++ i) {
            c.emit();
        }
    });
    std::thread t2([&c]{
        for (int i = 0; i < 10000000; ++ i) {
            c.emit();
        }
    });
    t1.join();
    t2.join();
    std::cout << c.count() << std::endl; // 20000000
}

基於Mutex,我們可以方便的實現讀寫鎖。讀寫鎖的作用是,保證資料可以供多個執行緒併發讀,僅一個執行緒允許寫。在存線上程讀的情況下,寫的執行緒會阻塞,直到沒有任何執行緒有讀操作。

讀寫鎖

首先讀寫鎖會存在一個write_mutex,讀執行緒和寫執行緒都需要搶佔這個mutex,從而保證讀和寫不會同時進行。但是隻需要第一個讀執行緒搶佔write_mutex即可,其他的讀執行緒不需要再搶佔(搶佔的話,就不支援併發讀了)。當不存在讀執行緒的時候,需要釋放write_mutex,這才執行寫執行緒搶佔。

因此我們還需要一個計數器,記錄當前讀執行緒的個數,並使用另一個read_mutex保證計數器的準確。

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>

class ReadWriteLock {
public:
    ReadWriteLock():reader_count_(0) {}
    void lock_read() {
        read_mutex_.lock();
        if (reader_count_ == 0) {
            write_mutex_.lock();
        }
        ++ reader_count_;
        read_mutex_.unlock();
    }
    void unlock_read() {
        read_mutex_.lock();
        -- reader_count_;
        if (reader_count_ == 0) {
            write_mutex_.unlock();
        }
        read_mutex_.unlock();
    }
    void lock_write() {
        write_mutex_.lock();
    }
    void unlock_write() {
        write_mutex_.unlock();
    }
private:
    std::mutex read_mutex_;
    std::mutex write_mutex_;
    int64_t reader_count_;
};

ReadWriteLock rw_lock;

void read_fn(int idx, int start, int end) {
    std::this_thread::sleep_for(std::chrono::seconds(start));
    rw_lock.lock_read();
    std::cout << "read thread #" << idx << ": read data" << std::endl;
    std::this_thread::sleep_for (std::chrono::seconds(end - start));
    std::cout << "read thread #" << idx << ": read over" << std::endl;
    rw_lock.unlock_read();
}

void write_fn(int idx, int start, int end) {
    std::this_thread::sleep_for(std::chrono::seconds(start));
    rw_lock.lock_write();
    std::cout << "write thread #" << idx << ": write data" << std::endl;
    std::this_thread::sleep_for (std::chrono::seconds(end - start));
    std::cout << "write thread #" << idx << ": write over" << std::endl;
    rw_lock.unlock_write();
}

int main() {

    std::vector<std::thread> threads;
    threads.push_back(std::thread([](){read_fn(1, 0, 3);}));
    threads.push_back(std::thread([](){read_fn(2, 2, 4);}));
    threads.push_back(std::thread([](){read_fn(3, 6, 10);}));

    threads.push_back(std::thread([](){write_fn(1, 1, 4);}));
    threads.push_back(std::thread([](){write_fn(2, 5, 7);}));
    
    for (auto &&t : threads) {
        t.join();
    }
}

// output
// read thread #1: read data
// read thread #2: read data
// read thread #1: read over
// read thread #2: read over
// write thread #1: write data
// write thread #1: write over
// write thread #2: write data
// write thread #2: write over
// read thread #3: read data
// read thread #3: read over

可以看到讀執行緒1和2同時進行的讀操作,而寫執行緒1在讀執行緒結束之後才進行。

條件變數

條件變數主要作用是是執行緒間進行通訊,用於一個執行緒告知其他執行緒當前的狀態。

一般可以用於控制執行緒的執行順序,告知其他執行緒資源是否可用等。

條件變數的使用需要搭配互斥量。

#include <mutex>
#include <iostream>
#include <thread>
#include <condition_variable>

std::mutex mutex_;
std::condition_variable cv_;
bool ready_ = false;

void print_id(int id) {
    std::unique_lock<std::mutex> lck(mutex_);
    while (!ready_) {
        cv_.wait(lck);
    }
    std::cout << "thread -- " << id << std::endl;
}

void go() {
    std::unique_lock<std::mutex> lck(mutex_);
    ready_ = true;
    cv_.notify_all();
}

int main() {
    std::thread threads[10];
    // spawn 10 threads:
    for (int i = 0; i < 10; ++i) {
        threads[i] = std::thread(print_id, i);
    }

    std::cout << "10 threads ready to race...\n";
    go(); // go!

    for (auto &th : threads) {
        th.join();
    }

    return 0;
}

這裡使用std::unique_lock來管理互斥量。相對於lock_guardunique_lock的功能更豐富,可以通過它來對mutex進行lockunlock,具體的使用可以檢視相關的文件。

condition_variable通過wait操作,可以等待喚醒。wait操作有兩個行為:

  1. 將當前執行緒加入條件變數的等待佇列
  2. 釋放鎖

喚醒條件變數的方法有兩個:notify_onenotify_all,分別喚醒一個和所有的執行緒。

當一個wait中的執行緒被喚醒時,它會搶佔住mutex,因此後續的操作均是執行緒安全的。

為什麼condition_variable需要一個mutex呢?

  1. 一方面是有些變數的訪問,我們需要保證它的互斥性,比如這裡的ready_欄位
  2. 保證wait的兩個操作(等待和鎖釋放)是原子的。

可以參考下面這篇文章:

C++面試問題:為什麼條件變數要和互斥鎖一起使用?

那麼使用條件變數,我們可以創造哪些有意思的工具呢?阻塞佇列就是一個巧妙的應用。

BlockQueue

阻塞佇列是一種非常常見的資料結構,它允許一個或多個生產者向Queue中寫入資料,如果Queue滿了,則阻塞住。允許一個或多個消費者讀取Queue的資料,如果Queue為空,則一直阻塞直至Queue中有資料。

根據BlockQueue的兩種阻塞行為,我們可以大膽的推測,這裡可以用兩個條件變數,分別控制寫入阻塞和讀取阻塞。

#include <deque>
#include <mutex>
#include <condition_variable>

template<typename TaskType>
class BlockQueue {
public:
    BlockQueue(size_t capacity): capacity_(capacity) {}
    size_t capacity() {
        std::lock_guard<std::mutex> lck(this->mutex_);
        return this->capacity_;
    }
    size_t size() {
        std::lock_guard<std::mutex> lck(this->mutex_);
        return this->task_queue_.size();
    }
    void push(TaskType *task) {
        std::unique_lock<std::mutex> lck(this->mutex_);
        while (this->task_queue_.size() >= this->capacity_) {
            this->full_cv_.wait(lck);
        }
        this->task_queue_.push_back(task);
        this->empty_cv_.notify_all();
    }
    void get(TaskType **task) {
        std::unique_lock<std::mutex> lck(this->mutex_);
        while (this->task_queue_.empty()) {
            this->empty_cv_.wait(lck);
        }
        *task = task_queue_.front();
        task_queue_.pop_front();
        this->full_cv_.notify_all();
    }

private:
    std::deque<TaskType *> task_queue_;
    size_t capacity_;

    std::mutex mutex_;
    std::condition_variable full_cv_;
    std::condition_variable empty_cv_;
};

上述的例子,如果將wait改為wait_for的話,還可以方便的實現帶timeout的BlockQueue,感興趣的同學可以自己嘗試一下。

原子型別與原子操作

C++中的原子型別的定義和使用十分簡單。僅需要包含標頭檔案<atomic>即可。使用std::atomic<T>的方式即可構造原子型別的變數。

#include <atomic>
std::atomic<int32_t> i32_count;
std::atomic<uint64_t> u64_count;

針對原子型別的變數,有許多的操作可用。最常用到的就是++用來計數。比如我們前面的使用mutex完成計數器的例子,其實使用原子型別會更加的簡單和高效。

#include <atomic>

class Counter {
public:
    Counter(): x_(0) {}
    void emit() {
        ++ x_;
    }
    int count() {
        return x_;
    }
private:
    std::atomic<int> x_;
};

以下是具體的幾個方法:

函式功能
store 用非原子物件替換當前物件的值。相等於執行緒安全的=操作
load 原子地獲取原子物件的值
fetch_add/fetch_sub 原子地對原子做加減操作,返回操作之前的值
+=/-= 同上
fetch_and/fetch_or/fetch_xor 原子地對原子物件做與/或/異或地操作,返回操作之前的值
&=/|=/^= 同上

另外,atomic型別的函式可以指定memory_order引數,用於約束atomic型別資料在多執行緒中的檢視。感興趣可以看這篇文章:https://zhuanlan.zhihu.com/p/31386431

一般我們使用預設的memory_order就已經足夠了。

之後我們再介紹一個複雜但十分有用的原子操作:CAS(Compare And Swap)

看名字就知道,他的作用是,比較兩個值,如果相同就交換。

百度上給了一個比較直觀的解釋:

  • compare and swap,解決多執行緒並行情況下使用鎖造成效能損耗的一種機制,CAS操作包含三個運算元——記憶體位置(V)、預期原值(A)和新值(B)。如果記憶體位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。

通過CAS操作,我們可以方便的實現無鎖的執行緒安全佇列:

#include <atomic>

class Node {
public:
    Node(int val): val_(val), next_(nullptr) {}
public:
    int val_;
    class Node *next_;
};

void push(std::atomic<Node *> &head, Node *new_node) {
    new_node->next_ = head;
    while (head.compare_exchange_weak(new_node->next_, new_node));
}

int main() {
    std::atomic<Node *> head;
    Node *new_node = new Node(100);
    push(head, new_node);
}

當我們插入一個節點的時候,首先嚐試加入它,也就是new_node->next_ = head;然後如果head沒有變化的話,那麼就更新head為我們新的節點,如果變化的話就不斷重試。也就是while (head.compare_exchange_weak(new_node->next_, new_node));的邏輯。

上面這個例子是所有的CAS介紹都會說到的,可以非常容易地幫助我們理解CAS地功能,但是對於POP操作,並不好實現。

另外其實還存在一個ABA地問題,需要解決。這裡就不展開了。感興趣地可以搜一下相關的資料,這裡僅做簡單地介紹。

其他

最後我們看幾個非常有意思地設計。

PingPang Buffer

PingPang Buffer也被稱為雙Buffer。它的核心是這樣地,由於一些系統配置需要不斷地更新,而更新地過程中也會被不斷地讀取。如果使用之前的讀寫鎖,就可能永遠都更新不了(讀執行緒一直佔著鎖),同時執行緒同步也是非常低效地一個過程。然後就誕生了PingPang Buffer這麼個結構。

它的核心是有兩塊記憶體,一塊用來給所有執行緒進行讀操作,另一塊用來給寫執行緒進行更新,在更新完畢之後,交換這兩個記憶體。新的記憶體變成了讀記憶體,舊記憶體變成了寫記憶體。

以下是一個簡單的實現,和網上的其他版本可能略有不同,看思路即可。

#include <atomic>
#include <memory>

template<typename T>
class PingPangBuffer {
public:
    PingPangBuffer(std::shared_ptr<T> read_buffer, std::shared_ptr<T> write_buffer) {
        data_[0] = read_buffer;
        data_[1] = write_buffer;
        read_idx_ = 0;
    }
    std::shared_ptr<T> read_data() {
        return data_[read_idx_];
    }
    std::shared_ptr<T> write_data() {
        int write_idx = 1 - read_idx_;
        while (data_[write_idx].use_count() <= 1) {
            // sleep 1s
            return data_[write_idx];
        }
    }
    bool update() {
        read_idx_ = 1 - read_idx_;
    }
private:
    std::shared_ptr<T> data_[2];
    std::atomic<int> read_idx_;
};

這裡read_data函式被多個讀執行緒去呼叫。而write_dataupdate只有一個寫執行緒進行呼叫。

使用一個read_idx_記錄讀的Buffer的下標(似乎沒有必要是atomic的?),那麼交換讀寫Buffer的操作就可以簡化為read_idx_ = 1 - read_idx_。不過下標切換之後,切換之前的讀執行緒還在讀舊資料。

而獲取寫資料的操作需要等待當前Buffer不再被使用了才可以再次被使用(反正早晚它都是可以被使用的),這裡就直接使用了shared_ptruse_count

執行緒安全的LRUCache

一般Cache是使用std::unordered_map來實現的。和前面的讀寫鎖類似,map支援多執行緒的讀,但是僅支援單執行緒寫入。這就會造成這個map的寫入效能可能會較差。因此這裡一般採用分shard的方式進行庫的拆分。

一個簡單的實現,先根據key分shard,然後每個分片都使用讀寫鎖。(多執行緒的測試不太好寫,這裡只測試了過期時間和容量)

#include <mutex>
#include <thread>
#include <iostream>
#include <chrono>
#include <vector>
#include <list>
#include <unordered_map>

// 讀寫鎖,就是前面原封不動的程式碼
class ReadWriteLock {
public:
    ReadWriteLock():reader_count_(0) {}
    void lock_read() {
        read_mutex_.lock();
        if (reader_count_ == 0) {
            write_mutex_.lock();
        }
        ++ reader_count_;
        read_mutex_.unlock();
    }
    void unlock_read() {
        read_mutex_.lock();
        -- reader_count_;
        if (reader_count_ == 0) {
            write_mutex_.unlock();
        }
        read_mutex_.unlock();
    }
    void lock_write() {
        write_mutex_.lock();
    }
    void unlock_write() {
        write_mutex_.unlock();
    }
private:
    std::mutex read_mutex_;
    std::mutex write_mutex_;
    int64_t reader_count_;
};

template<typename KeyType, typename ValType>
class ConcurrentLRUCache {
public:
    class Node {
    public:
        Node(const KeyType& key, const ValType& val, size_t time_ms): key_(key), val_(val), time_ms_(time_ms) {}
        KeyType key_;
        ValType val_;
        size_t time_ms_;
    };
    using node_iter_type = typename std::list<Node>::iterator;
public:
    ConcurrentLRUCache(size_t capacity, size_t shard, size_t expire_time /* ms */) {
        capacity_ = capacity;
        shard_ = shard;
        capacity_per_cache_ = capacity_ / shard_;
        expire_time_ = expire_time;
        cache_shard_list_.resize(shard_);
        node_data_list_shard_list_.resize(shard_);
    }
    bool get(const KeyType& key, ValType& val) {
        auto &cache = cache_shard_list_[get_shard_idx(key)];
        rw_lock_.lock_read();
        bool ok = false;
        do {
            auto iter = cache.find(key);
            if (iter == cache.end()) { // not found
                break;
            }
            size_t cur_ms = get_cur_time_ms();
            size_t record_ms = iter->second->time_ms_;
            if (cur_ms - record_ms > expire_time_) { // found but expired
                break;
            }
            val = iter->second->val_;
            ok = true;
        } while (0);
        rw_lock_.unlock_read();
        return ok;
    }
    void set(const KeyType& key, ValType& val) {
        size_t shard_idx = get_shard_idx(key);
        auto &cache = cache_shard_list_[shard_idx];
        auto &data_list = node_data_list_shard_list_[shard_idx];
        rw_lock_.lock_write();

        do {
            // when found, del the older
            auto iter = cache.find(key);
            if (iter != cache.end()) {
                data_list.erase(iter->second);
                cache.erase(iter);
            }

            // when cache full, del the oldest
            while (cache.size() >= capacity_per_cache_) {
                cache.erase(data_list.front().key_);
                data_list.pop_front();
            }

            size_t cur_ms = get_cur_time_ms();
            data_list.emplace_back(key, val, cur_ms);
            cache[key] = --data_list.end();
        } while (0);

        rw_lock_.unlock_write();
    }
private:
    static size_t get_cur_time_ms() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
    }
    size_t get_shard_idx(const KeyType& key) {
        static std::hash<KeyType> hash_func;
        return hash_func(key) % shard_;
    }

    ReadWriteLock rw_lock_;

    size_t capacity_;
    size_t shard_;
    size_t capacity_per_cache_;
    size_t expire_time_;

    std::vector<std::unordered_map<KeyType, node_iter_type>> cache_shard_list_;
    std::vector<std::list<Node>> node_data_list_shard_list_;
};

int main() {
    ConcurrentLRUCache<int, int> cache(20, 2, 1000 /* 1s */);
    for (int i = 0; i < 20; ++ i) {
        cache.set(i, i);
        std::cout << "set: (" << i << ", " << i << ") " << std::endl;
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    for (int i = 20; i < 30; ++ i) {
        cache.set(i, i);
        std::cout << "set: (" << i << ", " << i << ") " << std::endl;
    }

    // 此時0-9已經被覆蓋(容量),10-19已經過去500ms,20-29是最新時間

    for (int i = 0; i < 30; ++ i) {
        int data = -1;
        bool is_ok = cache.get(i, data);
		// 這裡只有10-29被查到了
        std::cout << "get: (" << i << ", " << data << ") " << is_ok << std::endl;
    }

    // 總共過去800ms,10-29都沒過期
	std::this_thread::sleep_for(std::chrono::milliseconds(300));
    for (int i = 0; i < 30; ++ i) {
        int data = -1;
        bool is_ok = cache.get(i, data);
        // 只有20-29被查到
        std::cout << "get: (" << i << ", " << data << ") " << is_ok << std::endl;
    }

    // 總共過去1100ms,20-29沒過期
    std::this_thread::sleep_for(std::chrono::milliseconds(300));
    for (int i = 0; i < 30; ++ i) {
        int data = -1;
        bool is_ok = cache.get(i, data);
        // 20-29
        std::cout << "get: (" << i << ", " << data << ") " << is_ok << std::endl;
    }
}

寫在最後

知識的總結一直是一件令人愉悅的事情,時隔1年多又一次撿起技術部落格。