1. 程式人生 > >reactor模式基本框架

reactor模式基本框架

一、moduo網路庫的reactor模式(上)

moduo網路庫的reactor模式基本構成為“non-blocking I/O + I/O multiplexing”,程式的基本結構是一個事件迴圈(event loop),以事件驅動(event-driven)和事件回撥(event callback)的方式實現業務邏輯。此文參照陳碩的moduo網路庫稍作簡化,抽出reactor模式的基本框架。 

1、執行緒同步:採用底層同步原語,互斥器+條件變數

執行緒同步儘量使用高階併發程式設計構建。若要使用底層同步原語,則儘量只使用互斥器和條件變數。互斥器和條件變數構成了多執行緒程式設計的全部必備同步原語,用它們即可完成任何多執行緒同步任務。

(1)封裝互斥器

封裝互斥器保護物件共享資源,不同物件間不會構成鎖爭用。其中:

MutexLock封裝臨界區。用RAII手法封裝互斥器的建立與摧毀。

MutexLockGuard封裝臨界區的進入和退出,即加鎖和解鎖。它是一個棧上物件,作用域剛好等於臨界區域。

#ifndef MUTEXLOCKGUARD_H_
#define MUTEXLOCKGUARD_H_

#include <pthread.h>
#include "Thread.hpp"
#include <iostream>

class MutexLock //: boost::noncopyable
{
private:
    pthread_mutex_t mutex_;
    MutexLock(const MutexLock&);
    MutexLock& operator=(const MutexLock&);

public:
    MutexLock(){ pthread_mutex_init(&mutex_,NULL); }
    ~MutexLock(){ pthread_mutex_destroy(&mutex_); }
    void lock() { pthread_mutex_lock(&mutex_); }
    void unlock() { pthread_mutex_unlock(&mutex_); }
    pthread_mutex_t* getPthreadMutex() { return &mutex_; }
};

class MutexLockGuard //: boost::noncopyable
{
private:
    MutexLock mutex_; 
    MutexLockGuard(const MutexLockGuard&);
    MutexLockGuard& operator=(const MutexLockGuard&);

public:
    explicit MutexLockGuard( MutexLock& mutex ) : mutex_(mutex)
    {
        mutex_.lock();
        std::cout<<"tid "<<CurrentThreadtid()<<": MutexLockGuard lock!"<<std::endl;
    }
    ~MutexLockGuard()
    {
        mutex_.unlock();
        std::cout<<"tid "<<CurrentThreadtid()<<": MutexLockGuard unlock!"<<std::endl;
    }
};

#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")
//C++0x中引入了static_assert這個關鍵字,用來做編譯期間的斷言,因此叫做靜態斷言。
//如果第一個引數常量表達式的值為false,會產生一條編譯錯誤,錯誤位置就是該static_assert語句所在行,第二個引數就是錯誤提示字串。

#endif // MUTEXLOCKGUARD_H_

(2)封裝條件變數

條件變數是非常底層的同步原語,用於執行緒同步。很少直接使用,一般只用於實現高層同步措施,如阻塞佇列(blockingQueue<T>),倒計時(countDownLatch)等。需要注意:

在wait端,要與mutex一起使用以保護while中條件表示式的讀寫。用while而非if來等待條件表示式的成立是防止虛假喚醒(spurious wakeup)。

在signal/broadcast端,修改條件表示式需要用mutex保護。

另外,條件變數析構時要呼叫int pthread_cond_destroy(pthread_cond_t *cond);讓系統自行回收資源,否則會造成記憶體洩漏。

#ifndef CONDITION_H_
#define CONDITION_H_

#include "MutexLockGuard.hpp"
#include <pthread.h>

class Condition
{
  public:
    explicit Condition(MutexLock& mutex) : mutex_(mutex)
    {
      pthread_cond_init(&pcond_, NULL);
    }

   ~Condition()
   {
      pthread_cond_destroy(&pcond_);
   }

   void wait()
   {
      pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
   }

   void notify()
   {
      pthread_cond_signal(&pcond_);
   }

   void notifyAll()
   {
      pthread_cond_broadcast(&pcond_);
   }

  private:
    MutexLock& mutex_;
    pthread_cond_t pcond_;
};

#endif 

2、封裝執行緒thread

常規下建立一個新執行緒使用

int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict_attr, void*(*start_rtn)(void*), void *restrict arg);

其中第一個引數為指向執行緒識別符號的指標,第二個引數用來設定執行緒屬性,第三個引數是執行緒執行函式的地址,最後一個引數是執行函式的引數。

為進一步分離業務邏輯,將pthread_create中執行緒執行函式固定形式,為detail::startThread()。將實際需要執行的執行緒函式封裝到一個資源類detail::ThreadData中成為一個數據成員std::function<void()> detail::ThreadData::func_。該資源類的一個具體實現detail::ThreadData* data作為執行緒函式的引數傳入,並在此之前已通過介面Thread::Thread(std::function<void()>)將實際需要執行的執行緒函式傳入並儲存在資料成員std::function<void()> detail::ThreadData::func_中。

pthread_create成功建立一個新執行緒後,線上程函式detail::startThread()中通過傳入的具體資源物件detail::ThreadData data*呼叫detail::ThreadData::runInThread(),最終呼叫實際需要執行的執行緒函式detail::ThreadData::func_()。

#ifndef THREAD_H_
#define THREAD_H_

#include <thread>
#include <memory>
#include <functional>
#include <string>
#include <unistd.h>
#include <iostream>
#include <sys/syscall.h>
//#define gettid() syscall(__NR_gettid)

pid_t gettid()
{
    return static_cast<pid_t>(syscall(SYS_gettid));
}
__thread pid_t t_cachedTid = 0;

pid_t CurrentThreadtid()
{
    if (t_cachedTid == 0)
    {
        t_cachedTid = gettid();
    }
    return t_cachedTid;
}

namespace detail
{

struct ThreadData
{
    typedef std::function<void ()> ThreadFunc;
    ThreadFunc func_;
    std::string name_;
    pid_t tid_;

    ThreadData(ThreadFunc func, const std::string& name, pid_t tid)
      : func_(std::move(func)),
        name_(name),
        tid_(tid)
    { }

    void runInThread()
    {
        tid_ = CurrentThreadtid();
        func_();
        name_=std::string("finished");
        std::cout<<"tid "<<CurrentThreadtid()<<": Thread::runInThread() end!"<<std::endl;
    }
};

void* startThread(void* obj)
{
    ThreadData* data = static_cast<ThreadData*>(obj);
    data->runInThread();
    delete data;
    std::cout<<"tid "<<CurrentThreadtid()<<": Thread end!"<<std::endl;
    return NULL;
}
 
}

class Thread
{
public:
    //typedef void* (*ThreadFunc)(void*);
    typedef std::function<void ()> ThreadFunc;
    Thread(ThreadFunc func) : func_(func), tidp_(0), tid_() {}
    void start()
    {
         detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);
         std::cout<<"tid "<<CurrentThreadtid()<<": create a new thread"<<std::endl;
         if(pthread_create(&tidp_, NULL, &detail::startThread, (void*)data))
         {
             delete data;
             std::cout<<"thread create error"<<std::endl;
         }
    }

private:
    ThreadFunc func_;
    std::string name_;
    pid_t tid_;
    pthread_t tidp_;
};

#endif

3、事件迴圈

Reactor模式基本構成為“non-blocking I/O + I/O multiplexing”,遵循“one loop per thread” 。程式的基本結構是一個事件迴圈(event loop),通過I/O多路複用器select、poll和epoll等實現。再以事件驅動(event-driven)和事件回撥(event callback)的方式實現業務邏輯,此處暫時省略該方式,事件迴圈僅呼叫系統函式poll延時1s。

其中,EventLoop類實現單個執行緒內的事件迴圈主體框架,在此基礎上再進一步封裝為EventLoopThread類,實現多執行緒應用。

EventLoop類中:EventLoop::loop()為事件迴圈主體函式,其中EventLoop::isInLoopThread()檢測該EventLoop物件是否執行在所建立的執行緒中,以遵循“one loop per thread”。

EventLoopThread類中:EventLoop* EventLoopThread::startLoop()為介面函式,其中void EventLoopThread::threadFunc()為執行緒函式,該函式即為傳入上一節Thread類資料成員std::function<void()> detail::ThreadData::func_中的執行緒函式。在該執行緒函式中具現一個棧上物件EventLoop loop;進行事件迴圈loop.loop();。此外,介面函式startLoop()的返回指標即為該執行緒函式中的棧上物件loop,需要進行執行緒同步才能返回給使用者使用,用以實現其他多執行緒業務如定時任務等。

#ifndef EVENT_LOOP_H_
#define EVENT_LOOP_H_

#include <thread>
#include <poll.h>
#include <unistd.h>
#include <iostream>
#include <sys/syscall.h>
#include "Thread.hpp"

class EventLoop{
public:
    EventLoop() : tid_(CurrentThreadtid()) {}
    bool isInLoopThread() const { return tid_==CurrentThreadtid(); }
    void loop();
private:
    pid_t tid_;
};

void EventLoop::loop()
{
    if( !isInLoopThread() ){
        std::cout<<"tid "<<CurrentThreadtid()<<": This EventLoop had been created!"<<std::endl;
    }else{
        std::cout<<"tid "<<CurrentThreadtid()<<": looping..."<<std::endl;
        poll(NULL, 0, 1*1000); //poll() just waiting for a second
    }
}

#endif
#ifndef EVENT_LOOP_THREAD_H_
#define EVENT_LOOP_THREAD_H_

#include "EventLoop.hpp"
#include "Thread.hpp"
#include "MutexLockGuard.hpp"
#include "Condition.hpp"
#include <memory>
#include <iostream>

class EventLoopThread
{
public:
  EventLoopThread() 
    : loop_(NULL), exiting_(false), thread_(std::bind(&EventLoopThread::ThreadFunc, this)), mutex_(), cond_(mutex_) {}
  //~EventLoopThread();
  EventLoop* startLoop();
  
private:
  void ThreadFunc();

  EventLoop* loop_; 
  bool exiting_;
  Thread thread_; 
  MutexLock mutex_;
  Condition cond_;
};

EventLoop* EventLoopThread::startLoop()
{
  //assert(!thread_.started());
  thread_.start();
  
  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      std::cout<<"tid "<<CurrentThreadtid()<<": waiting"<<std::endl;
      cond_.wait();
    }
    std::cout<<"tid "<<CurrentThreadtid()<<": received notification"<<std::endl;
  }
  //sleep(3); //just for thread security
  return loop_;
}

void EventLoopThread::ThreadFunc()
{
    EventLoop loop;

    {
      MutexLockGuard lock(mutex_);
      loop_ = &loop;
      cond_.notify();
      std::cout<<"tid "<<CurrentThreadtid()<<": notified"<<std::endl;
    }

    loop.loop();
    //assert(exiting_);
}

#endif

4、測試

#include "EventLoopThread.hpp"
#include "EventLoop.hpp"
#include "Thread.hpp"
#include <iostream>

using namespace std;

int main()
{
  cout<<"Main: pid: "<<getpid()<<" tid: "<<CurrentThreadtid()<<endl;//main thread
  //sleep(1);

  EventLoopThread ELThread1;
  EventLoop* loop = ELThread1.startLoop();//thread 2
  sleep(1);

  loop->loop(); //test "one thread one loop"
  sleep(3);

  return 0;
}
[email protected]:~/Documents/Reactor/s0$ g++ -std=c++11 -pthread -o testEventLoopThreadDemo MutexLockGuard.hpp Condition.hpp Thread.hpp EventLoop.hpp EventLoopThread.hpp testEventLoopThread.cpp 
[email protected]:~/Documents/Reactor/s0$ /usr/bin/valgrind ./testEventLoopThreadDemo 
==16751== Memcheck, a memory error detector
==16751== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==16751== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==16751== Command: ./testEventLoopThreadDemo
==16751== 
Main: pid: 16751 tid: 16751
tid 16751: create a new thread
tid 16751: MutexLockGuard lock!
tid 16751: waiting
tid 16752: Thread::func_() started!
tid 16752: MutexLockGuard lock!
tid 16752: notified
tid 16751: received notification
tid 16751: MutexLockGuard unlock!
tid 16752: MutexLockGuard unlock!
tid 16752: looping...
tid 16751: This EventLoop had been created!
tid 16752: Thread end!
==16751== 
==16751== HEAP SUMMARY:
==16751==     in use at exit: 0 bytes in 0 blocks
==16751==   total heap usage: 7 allocs, 7 frees, 74,176 bytes allocated
==16751== 
==16751== All heap blocks were freed -- no leaks are possible
==16751== 
==16751== For counts of detected and suppressed errors, rerun with: -v
==16751== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

其中cout部分列印執行緒ID使用了關鍵字__pthread。主執行緒ID是16751,使用者建立一個EventLoopThread物件後呼叫介面函式EventLoop* EventLoopThread::startLoop(),函式內部建立一個新執行緒,ID為16752,並開始執行執行緒函式,同時主執行緒執行的介面函式返回給使用者的返回值需要與新執行緒進行執行緒同步。

參考資料: