1. 程式人生 > >C++多核高階程式設計

C++多核高階程式設計

一,什麼是執行緒

       執行緒是程序中可執行程式碼流的序列,它被作業系統呼叫,並在處理器或核心上執行。所有程序都有一個主執行緒,主執行緒是程序的控制流或執行路線。執行緒分為使用者執行緒和系統執行緒,執行緒在建立,維護和管理方面給系統帶來的負擔要輕得多。執行緒用於執行系統中的併發任務,可以簡化系統中固有的併發的程式的結構。

使用者級執行緒和核心級執行緒:

執行緒有三種實現模型: 使用者級或應用程式級,核心級,使用者級和核心級混合執行緒。

他們之間主要的區別在於他們的模式以及要指派給處理器的執行緒的能力。

使用者模式下,執行緒駐留在使用者空間,是執行程式或連線庫中的指令,由庫排程器進行排程。

核心模式下,執行緒駐留在系統空間,可以進行系統呼叫,由作業系統排程器排程。

使用者執行緒在執行時,任意給定時刻,每個程序只有一個執行緒在執行,而且只有一個處理器核心被分配給該程序。執行時排程庫從程序的多個執行緒中選擇一個,然後該執行緒和程序允許的一個核心執行緒關聯起來,使用者級執行緒是一種多對一的執行緒對映。

user_thread

核心級執行緒駐留在核心空間,它們是核心物件,由作業系統排程器管理。有了核心執行緒,每個使用者執行緒被對映或繫結到一個核心執行緒。 使用者執行緒在其生命期內都會繫結到該核心執行緒。一旦使用者執行緒終止,兩執行緒都將離開系統。從核心執行緒到使用者執行緒是一種一對一對映。

sys_thread

 混合執行緒是使用者執行緒和系統執行緒的交叉,使得執行庫和作業系統都可以管理執行緒。在這種實現中程序有自己的核心執行緒池。可執行的使用者執行緒由執行時庫分派並標記為準備好執行的可用執行緒。作業系統選擇使用者執行緒並將它對映到執行緒池中可用的核心執行緒。

hy_thread

執行緒的上下文

作業系統管理很多程序的執行。它們來著不同的程式或系統。當一個程序從核心中移出,另一個程序成為活動的,這些程序之間便發生了上下文切換。作業系統必須記錄重啟程序和啟動新程序使之活動所需要的所有資訊。執行緒也有相同的處理方式。上下文儲存的內容。

上下文內容進    程線    程
指向可執行檔案的指標X
XX
記憶體(資料段和棧)X
狀態XX
優先順序XX
程式 IO 的狀態X
授予許可權X
排程資訊X
審計資訊X
有關資源的資訊
-        檔案描述符
-        讀/寫 指標
X
有關事件和訊號的資訊X
暫存器組
-        棧指標
-        指令計數器
-        諸如此類
XX

二, 執行緒和程序的比較

執行緒和程序都能提供併發的程式執行,在決定使用程序或執行緒時可以從上下文切換開銷,吞吐量,實體間通訊,程式簡化等方面進行考慮。

-    上下文切換:如果只有一個處理器,執行緒的上下文切換的開銷較小。

-    吞吐量:使用多個執行緒可以增加程式的吞吐量,否則只有一個執行緒時,執行緒的IO將使整個程式被阻塞。

-    實體間通訊:執行緒與同一程序間其他執行緒通訊時不要求特殊的通訊機制,可以直接進行資料的傳輸。程序間則必須建立和維護它們之間的通訊機制。

-    破壞程序的資料:執行緒可以很輕鬆的破壞整個程序的資料。程序有自己的地址空間,相互隔離,資料也受到保護。

-    刪除整個程序:執行緒出錯時可以導致整個程序的終止,它導致的錯誤往往比程序導致的錯誤代價更大。

-    重用性:執行緒依賴於程序,不能從它所屬的程序分離,不可以直接被重用,程序則更加的獨立。

執行緒與程序的類似執行緒與程序的差別
都有ID 暫存器 狀態優先順序和排程策略執行緒共享建立它的程序的地址空間,程序有自己獨立的地址空間
都有用於為作業系統描述的實體屬性執行緒能對所屬程序的資料段進行直接訪問,程序有著父程序的資料段的自己的副本
都包含一個資訊塊執行緒可以同所屬程序的執行緒直接進行通訊,程序必須使用程序間通訊才能通兄弟程序通訊
都與父程序共享資源執行緒幾乎沒有開銷,程序的開銷相當大
都可以作為與父程序獨立的實體建立新執行緒很容易,建立新程序則需複製父程序
建立者可以對執行緒或程序進行一些控制執行緒可以對相同的程序或其他執行緒進行相當大的控制,程序只能對子程序進行控制
都可以改變屬性對主執行緒的改動(取消,優先順序等改動)可能影響到程序中其他執行緒的行為,對父程序的改動不會影響到子程序
都可以建立新的資源
都不能訪問另一個程序的資源

三, 設定執行緒屬性

有些執行緒的資訊可以描述執行緒的上下文切換,這些資訊可以用於重建執行緒的環境。

同一程序中不同執行緒的主要區別是 id,定義執行緒狀態的暫存器組,優先順序和棧。

POSIX定義了執行緒屬性物件,它封裝了執行緒屬性的一個子集,使使用者可以方便的建立訪問和更改。

-        競爭範圍  (系統範圍,程序範圍)

-        棧大小

-        棧地址

-        分離狀態 (是否從它的建立者中分離出來,在終止或退出時是否同其他對等執行緒或主執行緒同步)

-        優先順序

-        排程策略和引數

四, 執行緒的結構

與程序一樣,執行緒也有自己的上下文和屬性。程序有程式碼段,資料段和棧段,執行緒同進程共享程式碼段和棧段。程序通常從記憶體的高地址開始,向下增長,執行緒則以下一個執行緒開始的地址為邊界。

thread_system_constructor

執行緒有著和程序一樣的狀態和狀態轉換關係。主執行緒可以決定整個程序的狀態。對於多個執行緒的程序,只有所有的執行緒都處於休眠狀態整個程序才處於休眠狀態。

執行緒在系統中有著兩種競爭範圍:程序競爭和系統競爭。

程序競爭:執行緒與相同程序內的其他執行緒進行競爭。

系統競爭:執行緒與系統範圍內的程序的執行緒進行競爭。

舉例:程序A有4個執行緒,程序B有2個執行緒,系統有8個CPU,程序A的前3個執行緒使用CPU 0,1 在程序範圍內競爭。程序B的一個執行緒和程序A的一個執行緒公用一個CPU,在系統範圍內進行競爭。

system_and_process

由於競爭,執行緒也有著相關的排程策略。程序的排程策略和優先順序屬於主執行緒。每個執行緒可以有著和主執行緒不同的排程策略和優先順序。
排程策略分為輪詢(RR)和先進先出(FIFO)兩種。

五, 建立執行緒

下面將用一個簡單的示例來說明執行緒的建立和管理的方式。

這種時序圖可以較好的描述執行緒的啟動和終止的時間上的關係。

thread_work

下面的程式將建立一個多線陣列,並使其執行。

using namespace std;

#include <iostream>
#include <pthread.h>


void *task(void* X)
{
    int *Tmp;
    Tmp = static_cast<int *>(X);
    
    for(int Count = 0; Count < *Tmp; Count++)
    {
        cout << "Work from thread: " << Count << endl;
    }
    cout << "Thread complete" << endl;
    return NULL;
}


int main(int argc, char *argv[])
{
    int N = 10;
    pthread_t  MyThread[10];
    
    for(int Count = 0; Count < 10; Count++)
    {
        pthread_create(&MyThread[Count], NULL, task, &N);
    }
    
    for(int Count = 0; Count < 10; Count++)
    {
        pthread_join(MyThread[Count], NULL);
    }
    
    return 0;
}


六, 管理執行緒

這裡將介紹pthread中關於執行緒管理的一些函式和管理方式。

(1)  建立執行緒

int pthread_create(pthread_t* restrict thread, const pthread_attr_t *restrict attr, void*(*start_routine), void *restrict arg)

pthread_t* restrict thread: 描述執行緒ID

const pthread_attr_t *restrict attr: 新執行緒的專有屬性,如果設定為NULL則使用預設值

void*(*start_routine): 新執行緒將要執行的指令函式

void *restrict arg: 指令函式需要的引數

關鍵字restrict: 與之前的IEEE標準保持一致

(2)  結合線程

int pthread_join(pthread_t thread, void** value_ptr)

用於結合或再次結合程序中的控制流,pthread_join呼叫將導致呼叫執行緒執行掛起,直到目標執行緒終止。它類似於程序中的wait()函式,該函式由執行緒的建立者呼叫,該呼叫執行緒等待新執行緒終止並返回,然後再次結合到呼叫執行緒的控制流中。如果執行緒控制控制代碼可以公共訪問,該函式也可以被對等執行緒呼叫。使得任何執行緒可以將控制流同進程中的任何其他執行緒結合。如果呼叫執行緒目標執行緒返回前被取消,則會導致目標執行緒成為僵死執行緒。

pthread_t thread: 描述執行緒ID

void** value_ptr: 如果目標執行緒返回成功,該引數儲存執行緒的退出狀態。

(3)  獲得執行緒ID

pthead_t pthread_self(void)

執行緒被建立後,該函式返回執行緒ID。

(4)  終止執行緒

執行緒終止方式:-     通過從它被分配的任務返回-     顯示終止自身並提供一個退出狀態
-     被相同地址空間的其他執行緒終止

自終止:int pthread_exit(void** value_ptr)

當終止執行緒呼叫pthread_exit()後,它在 value_ptr中得到該執行緒的退出狀態。退出狀態被返回到pthread_join中。當呼叫這個函式時,執行緒所使用的資源不會被釋放。

終止對等執行緒:int pthread_cancel(pthread_t thread)

應用程式中可能有執行緒監視其他執行緒的工作,如果發現有的執行緒執行不力或不再需要時,有必要使其終止。pthread_cancle的呼叫是取消一個對等執行緒的請求。這個請求可能立刻被同意,稍後被同意,甚至被忽略。目標執行緒可能立刻終止,稍後終止,或拒絕終止。

在取消一個對等執行緒的請求被同意時,會有一個取消過程,目標執行緒的取消狀態(可取消 or 不可取消)和 取消型別(取消何時發生)決定了收到取消請求後執行緒繼續執行的能力。

設定取消狀態和取消型別:

pthread_setcancelstate(int stat, int *oldstat);

pthread_setcanceltype(int type, int* oldtype);

使用取消點

當推遲取消請求時,執行緒的終止會推遲到執行緒的函式執行後期。當取消發生時,應該是安全的,因為它不處於互斥量加鎖,執行關鍵程式碼,令資料處於某種不可用狀態的情況中。程式碼執行中,這些位置是很好的取消點。取消點是一個檢查點,執行緒在這裡檢查是否有任何取消請求未決,如果有,則終止。

void pthread_testcancel(void);

另外可利用安全取消的庫函式和系統呼叫

-     pthread_cond_wait()

-     pthread_timewait()

-     pthread_join()

當執行緒因呼叫這些函式而阻塞時,取消執行緒是安全的。

(4)  管理執行緒棧
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstacksize(pthread_attr_t *attr, size_t *stacksize);

int pthread_attr_setstackaddr(pthread_attr_t *attr, void *stackaddr);
int pthread_attr_getstackaddr(pthread_attr_t *attr, void **stackaddr);

int pthread_attr_setstack(pthread_attr_t *attr, void *stackaddr, size_t stacksize);
int pthread_attr_getstack(pthread_attr_t *attr, void **stackaddr, size_t *stacksize);

(5)  管理執行緒排程和優先順序
int pthread_attr_setinheritsched(pthread_attr_t *attr, int inheritsched);
int pthread_attr_getinheritsched(pthread_attr_t *attr, int *inheritsched);

int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
int pthread_attr_getschedpolicy(pthread_attr_t *attr,int *policy);

int pthread_attr_setschedparam(pthread_attr_t *attr,const struct sched_param *param);

int pthread_attr_getschedparam(pthread_attr_t *attr,struct sched_param *param);

(6)  管理執行緒競爭範圍

int pthread_attr_setscope(pthread_attr_t *attr, int contentionscope);

int pthread_attr_getscope(const pthread_attr_t *restrict attr, int *restrict contentionscope);

七, 擴充套件執行緒介面

執行緒的介面類將把執行緒管理等功能進行封裝,使得執行緒更易使用,功能更強,也更加靈活。

程式共4部分:

thread_object.h:定義介面類。

thread_object.cpp:介面類的實現

thread_filter.cpp:派生自介面類基類,實現簡單的測試功能。

main.cpp:測試入口程式。

thread_object.h

#ifndef __THREAD_OBJECT_H
#define __THREAD_OBJECT_H

using namespace std;

#include <iostream>
#include <pthread.h>
#include <string>

class thread_object
{
public:
    thread_object();
    ~thread_object();

    void setPriority(int nPriority);
    void setSchedPolicy(int nPolicy);
    void setContentionScope(int nScope);
    void setDetached(void);
    void setJoinable(void);

    void name(string strName);
    void run(void);
    void join(void);
    friend void *thread(void *buff);
private:
    pthread_t  m_Tid;

protected:
    virtual void do_something(void) = 0;
    pthread_attr_t       m_attr_SchedAttr;
    struct sched_param   m_attr_SchedParam;
    string               m_Name;
    int                  m_nNewPolicy;
    int                  m_nNewState;
    int                  m_nNewScope;
};

class filter_thread: public thread_object
{
protected:
    void do_something(void);

public:
    filter_thread(void);
    ~filter_thread(void);
};

#endif

thread_object.cpp
#include "thread_object.h"


thread_object::thread_object()
{
    pthread_attr_init(&m_attr_SchedAttr);
    pthread_attr_setinheritsched(&m_attr_SchedAttr, PTHREAD_EXPLICIT_SCHED);
    m_nNewState = PTHREAD_CREATE_JOINABLE;
    m_nNewScope = PTHREAD_SCOPE_PROCESS;
    m_nNewPolicy = SCHED_OTHER;
}

thread_object::~thread_object()
{

}

void thread_object::join(void)
{
    if (m_nNewState == PTHREAD_CREATE_JOINABLE)
    {
        pthread_join(m_Tid, NULL);
    }
}

void thread_object::setPriority(int nPriority)
{
    int nPolicy;
    struct sched_param stParam;

    stParam.sched_priority = nPriority;
    pthread_attr_setschedparam(&m_attr_SchedAttr, &stParam);
}

void thread_object::setSchedPolicy(int nPolicy)
{
    if (nPolicy == 1)
    {
        pthread_attr_setschedpolicy(&m_attr_SchedAttr, SCHED_RR);
        pthread_attr_getschedpolicy(&m_attr_SchedAttr, &m_nNewPolicy);
    }
    if (nPolicy == 2)
    {
        pthread_attr_setschedpolicy(&m_attr_SchedAttr, SCHED_FIFO);
        pthread_attr_getschedpolicy(&m_attr_SchedAttr, &m_nNewPolicy);
    }
}

void thread_object::setContentionScope(int nScope)
{
    if (nScope == 1)
    {
        pthread_attr_setscope(&m_attr_SchedAttr, PTHREAD_SCOPE_SYSTEM);
        pthread_attr_getscope(&m_attr_SchedAttr, &m_nNewScope);
    }
    if (nScope == 2)
    {
        pthread_attr_setscope(&m_attr_SchedAttr, PTHREAD_SCOPE_PROCESS);
        pthread_attr_getscope(&m_attr_SchedAttr, &m_nNewScope);
    }
}

void thread_object::setDetached(void)
{
    pthread_attr_setdetachstate(&m_attr_SchedAttr, PTHREAD_CREATE_DETACHED);
    pthread_attr_getdetachstate(&m_attr_SchedAttr, &m_nNewState);
}

void thread_object::setJoinable(void)
{
    pthread_attr_setdetachstate(&m_attr_SchedAttr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_getdetachstate(&m_attr_SchedAttr, &m_nNewState);
}

void thread_object::run(void)
{
    pthread_create(&m_Tid, &m_attr_SchedAttr, thread, this);
}

void thread_object::name(string strName)
{
    m_Name = strName;
}

void* thread(void* buff)
{
    thread_object* pThread;

    pThread = static_cast<thread_object *> (buff);
    pThread->do_something();
    return NULL;
}

thread_filter.cpp
<pre name="code" class="cpp">#include "thread_object.h"


filter_thread::filter_thread(void)
{
    pthread_attr_init(&m_attr_SchedAttr);
}

filter_thread::~filter_thread(void)
{}

void filter_thread::do_something(void)
{
    struct sched_param stParam;

    int nPolicy;
    pthread_t thread_id = pthread_self();
    string strSchedule;
    string strState;
    string strScope;

    pthread_getschedparam(thread_id, &nPolicy, &stParam);
    switch (m_nNewPolicy)
    {
    case SCHED_RR:
        strSchedule.assign("RR");
        break;
    case SCHED_FIFO:
        strSchedule.assign("FIFO");
        break;
    case SCHED_OTHER:
        strSchedule.assign("OTHER");
        break;
    default:
        strSchedule.assign("unknown");
        break;
    }

    switch (m_nNewState)
    {
    case PTHREAD_CREATE_DETACHED:
        strState.assign("DETACHED");
        break;
    case PTHREAD_CREATE_JOINABLE:
        strState.assign("JOINABLE");
        break;
    default:
        strState.assign("unknown");
        break;
    }

    switch (m_nNewScope)
    {
    case PTHREAD_SCOPE_PROCESS:
        strScope.assign("PROCESS");
        break;
    case PTHREAD_SCOPE_SYSTEM:
        strScope.assign("SYSTEM");
        break;
    default:
        strScope.assign("unknown");
    }

    cout << m_Name << " : " << thread_id << endl
         << "--------------------------------------"<< endl
         << " priority : " << stParam.sched_priority << endl
         << " policy   : " << strSchedule            << endl
         << " state    : " << strState               << endl
         << " scope    : " << strScope               << endl << endl;
}

main.cpp
#include "thread_object.h"
#include <unistd.h>

int main(int argc, char * argv[])
{
    filter_thread MyThread[4];

    MyThread[0].name("Proteus");
    MyThread[0].setSchedPolicy(2);
    MyThread[0].setPriority(7);
    //MyThread[0].setDetached(); // 若設定為Detached則在列印時會出現亂序現象


    MyThread[1].name("Stand Along Complex");
    MyThread[1].setContentionScope(1);
    MyThread[1].setPriority(5);
    MyThread[1].setSchedPolicy(2);


    MyThread[2].name("Krell Space");
    MyThread[2].setPriority(3);


    MyThread[3].name("Cylon Space");
    MyThread[3].setSchedPolicy(2);
    MyThread[3].setPriority(2);

    for (int i = 0; i < 4; i++)
    {
        MyThread[i].run();
        MyThread[i].join();
    }
    return 0;
}