Qt 之 Concurrent 框架
簡述
QtConcurrent 名稱空間提供了高階 API,使得可以在不使用低階執行緒原語(例如:互斥、讀寫鎖、等待條件或訊號量)的情況下編寫多執行緒程式,使用 QtConcurrent 編寫的程式根據可用的處理器核心數自動調整所使用的執行緒數。這意味著,當在未來部署多核系統時,現在編寫的應用程式將繼續適應。
|
用法
在 C++ API changes 有關於 Qt Concurrent 的更改說明:
Qt Concurrent has been moved from Qt Core to its own module
意思是說,Qt Concurrent 已經被從 Qt Core 中移到自己的模組中了。所以,要連結到 Qt Concurrent 模組,需要在 qmake 專案檔案中新增:
QT += concurrent
注意: QtConcurrent::Exception 類被重新命名為 QException,並且 QtConcurrent::UnhandledException 類被重新命名為 QUnhandledException,他們仍然位於 Qt Core 中。
Qt Concurrent
QtConcurrent 包含了函數語言程式設計風格 APIs 用於並行列表處理,包括用於共享記憶體(非分散式)系統的 MapReduce 和 FilterReduce 實現,以及用於管理 GUI 應用程式非同步計算的類:
Concurrent Map 和 Map-Reduce
- QtConcurrent::map():將一個函式應用於一個容器中的每一項,就地修改 items。
- QtConcurrent::mapped():和 map() 類似,只是它返回一個包含修改內容的新容器。
- QtConcurrent::mappedReduced():和 mapped() 類似,只是修改後的結果減少或組合成一個單一的結果。
Concurrent Filter 和 Filter-Reduce
- QtConcurrent::filter():從一個容器中刪除所有 items,基於一個 filter 函式的結果。
- QtConcurrent::filtered():和 filter() 類似,只是它返回一個包含過濾內容的新容器。
- QtConcurrent::filteredReduced():和 filtered() 類似,只是過濾後的結果減少或組合成一個單一的結果。
Concurrent Run
- QtConcurrent::run():在另一個執行緒中執行一個函式。
QFuture:表示非同步計算的結果
- QFutureIterator:允許通過 QFuture 遍歷可用的結果
- QFutureWatcher:允許使用訊號槽來監控一個 QFuture
- QFutureSynchronizer:是一個方便的類,用於一些 QFutures 的自動同步
Qt Concurrent 支援多種相容 STL 的容器和迭代器型別,但是最好使用具有隨機訪問迭代器的 Qt 容器,例如:QList 或 QVector。map 和 filter 函式都接受容器和 begin/end 迭代器。
STL 迭代器支援概述:
迭代器型別 | 示例類 | 支援狀態 |
---|---|---|
Input Iterator | 不支援 | |
Output Iterator | 不支援 | |
Forward Iterator | std::slist | 支援 |
Bidirectional Iterator | QLinkedList, std::list | 支援 |
Random Access Iterator | QList, QVector, std::vector | 支援和推薦 |
在 Qt Concurrent 迭代大量輕量級 items 的情況下,隨機訪問迭代器可以更快,因為它們允許跳過容器中的任何點。此外,使用隨機訪問迭代器允許 Qt Concurrent 通過 QFuture::progressValue() 和 QFutureWatcher::progressValueChanged() 來提供進度資訊。
非就地修改的函式(例如:mapped() 和 filtered()),在呼叫時會建立容器的副本。如果正在使用的是 STL 容器,此複製操作可能需要一段時間,在這種情況下,建議為容器指定 begin 和 end 迭代器。
單詞統計
厲害了 Concurrent,來看一個單詞統計的示例:
#include <QList>
#include <QMap>
#include <QTextStream>
#include <QString>
#include <QStringList>
#include <QDir>
#include <QTime>
#include <QApplication>
#include <QDebug>
#include <qtconcurrentmap.h>
using namespace QtConcurrent;
// 遞迴搜尋檔案
QStringList findFiles(const QString &startDir, QStringList filters)
{
QStringList names;
QDir dir(startDir);
foreach (QString file, dir.entryList(filters, QDir::Files))
names += startDir + '/' + file;
foreach (QString subdir, dir.entryList(QDir::AllDirs | QDir::NoDotAndDotDot))
names += findFiles(startDir + '/' + subdir, filters);
return names;
}
typedef QMap<QString, int> WordCount;
// 單執行緒單詞計數器函式
WordCount singleThreadedWordCount(QStringList files)
{
WordCount wordCount;
foreach (QString file, files) {
QFile f(file);
f.open(QIODevice::ReadOnly);
QTextStream textStream(&f);
while (textStream.atEnd() == false)
foreach (const QString &word, textStream.readLine().split(' '))
wordCount[word] += 1;
}
return wordCount;
}
// countWords 計算單個檔案的單詞數,該函式由多個執行緒並行呼叫,並且必須是執行緒安全的。
WordCount countWords(const QString &file)
{
QFile f(file);
f.open(QIODevice::ReadOnly);
QTextStream textStream(&f);
WordCount wordCount;
while (textStream.atEnd() == false)
foreach (const QString &word, textStream.readLine().split(' '))
wordCount[word] += 1;
return wordCount;
}
// reduce 將 map 的結果新增到最終結果,該函式只能由一個執行緒一次呼叫。
void reduce(WordCount &result, const WordCount &w)
{
QMapIterator<QString, int> i(w);
while (i.hasNext()) {
i.next();
result[i.key()] += i.value();
}
}
int main(int argc, char** argv)
{
QApplication app(argc, argv);
qDebug() << "finding files...";
QStringList files = findFiles("../../", QStringList() << "*.cpp" << "*.h");
qDebug() << files.count() << "files";
int singleThreadTime = 0;
{
QTime time;
time.start();
// 單執行緒統計,與 mapreduce 機制實現的作對比
WordCount total = singleThreadedWordCount(files);
singleThreadTime = time.elapsed();
// 打印出所耗費的時間
qDebug() << "single thread" << singleThreadTime;
}
int mapReduceTime = 0;
{
QTime time;
time.start();
// mappedReduced 方式進行統計
WordCount total = mappedReduced(files, countWords, reduce);
mapReduceTime = time.elapsed();
qDebug() << "MapReduce" << mapReduceTime;
}
// 輸出 mappedReduced 方式比單執行緒處理方式要快的倍數
qDebug() << "MapReduce speedup x" << ((double)singleThreadTime - (double)mapReduceTime) / (double)mapReduceTime + 1;
}
輸出如下:
finding files…
2185 files
single thread 5221
MapReduce 2256
MapReduce speedup x 2.31427
可以看出,共查找了 2185 個檔案,單執行緒使用了 5221 毫秒,MapReduce 方式使用了 2256 毫秒,比單執行緒要快 2.31427 倍。經過反覆嘗試,基本都在 2 倍以上。