1. 程式人生 > >Phxpaxos網路部分(3) —— TCP事件迴圈

Phxpaxos網路部分(3) —— TCP事件迴圈

上一部份介紹了TCP模組大略的類構成。在檢視相關的TcpRead/TcpWrite/TcpAcceptor的過程中發現,都包含以個EventLoop類。這是事件處理主迴圈類。在介紹其他組成部分前,我們先分析一下此類。程式碼在/src/communicate/tcp/ecent_loop.h/cpp 檔案中。順帶手介紹一下里面的事件類Event。

先看一下Event類(含有純虛擬函式)肯定要做為父類進行使用,在event_base.h 中。其子類兩個:Notify(封裝管道,主要用來進行事件通知),MessageEvent(封裝socket,主要的處理邏輯)

class Event
{
public:
    //傳入eventloop指標,將Event與eventloop關聯,時間、事件、資料的處理都是在eventloop中觸發進行的
    Event
(EventLoop * poEventLoop); virtual ~Event(); //在EventLoop中通過fd關聯Event。然後執行相應的邏輯(read\write\error\timeout) virtual int GetSocketFd() const = 0; virtual const std::string & GetSocketHost() = 0; virtual int OnRead(); virtual int OnWrite(); virtual void OnError(bool & bNeedDelete) = 0
; virtual void OnTimeout(const uint32_t iTimerID, const int iType); public: //向EventLoop中新增、刪除事件,iEvents是EPOLLIN等邏輯結果 void AddEvent(const int iEvents); void RemoveEvent(const int iEvents); void JumpoutEpollWait(); const bool IsDestroy() const; void Destroy(); public
: //向EventLoop中新增、刪除定時事件 void AddTimer(const int iTimeoutMs, const int iType, uint32_t & iTimerID); void RemoveTimer(const uint32_t iTimerID); protected: int m_iEvents; EventLoop * m_poEventLoop; bool m_bIsDestroy; };

下面是Event及其子類Notify和MessageEvent的手畫類圖,湊活著看吧
這裡寫圖片描述
EventLoop程式碼部分:
在第一眼看其標頭檔案根據其名稱分析出其某些功能:
1. 事件的新增修改刪除
2. 主事件迴圈處理
3. 定時事件新增刪除

class EventLoop
{
public:
    EventLoop(NetWork * poNetWork);
    virtual ~EventLoop();

    int Init(const int iEpollLength);
    //事件維護(增刪改,改中封裝了增)
    void ModEvent(const Event * poEvent, const int iEvents);

    void RemoveEvent(const Event * poEvent);
    //開始主事件迴圈
    void StartLoop();

    void Stop();

    void OnError(const int iEvents, Event * poEvent);

    virtual void OneLoop(const int iTimeoutMs);

public:
    void SetTcpClient(TcpClient * poTcpClient);
    //跳出主時間迴圈,通過Notify實現,其內部通過pipe實現
    void JumpoutEpollWait();

public:
    //時間事件的新增刪除處理。通過觀察下面私有的資料結果發現,內部通過管理TimeID進行時間事件的管理
    //timeID-->fd--->Event--->onTimeout(關聯關係)
    bool AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID);

    void RemoveTimer(const uint32_t iTimerID);

    void DealwithTimeout(int & iNextTimeout);

    void DealwithTimeoutOne(const uint32_t iTimerID, const int iType);

public:
    void AddEvent(int iFD, SocketAddress oAddr);

    void CreateEvent();

    void ClearEvent();

    int GetActiveEventCount();

public:
    //事件上下文,用來進行索引,包括event和相關監聽事件
    typedef struct EventCtx
    {
        Event * m_poEvent;
        int m_iEvents; //監聽內容 EPOLLIN,EPOLLOUT
    } EventCtx_t;

private:
    bool m_bIsEnd;

protected:
    int m_iEpollFd;
    epoll_event m_EpollEvents[MAX_EVENTS];
    std::map<int, EventCtx_t> m_mapEvent;  //fd與event對應關係map
    NetWork * m_poNetWork;
    TcpClient * m_poTcpClient;
    Notify * m_poNotify; //內部事件通知使用

protected:
    Timer m_oTimer;
    std::map<uint32_t, int> m_mapTimerID2FD; //timeID 與 fd對應關係map
    //新增事件的queue,每一個包含其fd及網路socket資訊(IP,Port等),之後建立MessageEvent進行相關事件的處理
    //此處又是一個多生產者單消費者模型。
    std::queue<std::pair<int, SocketAddress> > m_oFDQueue;   
    std::mutex m_oMutex;
    std::vector<MessageEvent *> m_vecCreatedEvent; //MessageEvent 繼承自Event,新增到此EventLoop
};

event_loop.cpp


#include "event_loop.h"
#include "event_base.h"
#include "tcp_acceptor.h"
#include "tcp_client.h"
#include "comm_include.h"
#include "message_event.h"
#include "phxpaxos/network.h"

using namespace std;

namespace phxpaxos
{

EventLoop :: EventLoop(NetWork * poNetWork)
{
    m_iEpollFd = -1;
    m_bIsEnd = false;
    m_poNetWork = poNetWork;
    m_poTcpClient = nullptr;
    m_poNotify = nullptr;
    memset(m_EpollEvents, 0, sizeof(m_EpollEvents));
}

EventLoop :: ~EventLoop()
{
    ClearEvent();
}
// 通過管道傳送訊息,跳出epoll_wait()
void EventLoop :: JumpoutEpollWait()
{
    m_poNotify->SendNotify();
}
void EventLoop :: SetTcpClient(TcpClient * poTcpClient)
{
    m_poTcpClient = poTcpClient;
}

int EventLoop :: Init(const int iEpollLength)
{
    //epoll建立
    m_iEpollFd = epoll_create(iEpollLength);
    if (m_iEpollFd == -1)
    {
        PLErr("epoll_create fail, ret %d", m_iEpollFd);
        return -1;
    }

    m_poNotify = new Notify(this);
    assert(m_poNotify != nullptr);
    //將管道通知類Notify新增到EventLoop(EPOLLIN事件)
    int ret = m_poNotify->Init();
    if (ret != 0)
    {
        return ret;
    }

    return 0;
}

void EventLoop :: ModEvent(const Event * poEvent, const int iEvents)
{
    //從所有的fd、event對應關係中尋找,確定新增還是修改
    //之後epoll_ctl改變感興趣內容
    //最後更新map中fd對應的ctx
    auto it = m_mapEvent.find(poEvent->GetSocketFd());
    int iEpollOpertion = 0;
    if (it == end(m_mapEvent))
    {
        iEpollOpertion = EPOLL_CTL_ADD;
    }
    else
    {
        iEpollOpertion = it->second.m_iEvents ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
    }

    epoll_event tEpollEvent;
    tEpollEvent.events = iEvents;
    tEpollEvent.data.fd = poEvent->GetSocketFd();

    int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
    if (ret == -1)
    {
        PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d EpollEvent %d",
                m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), iEvents);

        //to do 
        return;
    }

    EventCtx tCtx;
    tCtx.m_poEvent = (Event *)poEvent;
    tCtx.m_iEvents = iEvents;

    m_mapEvent[poEvent->GetSocketFd()] = tCtx;
}

void EventLoop :: RemoveEvent(const Event * poEvent)
{
//與新增類似只不過將新增改為刪除,其他內容一樣
    auto it = m_mapEvent.find(poEvent->GetSocketFd());
    if (it == end(m_mapEvent))
    {
        return;
    }

    int iEpollOpertion = EPOLL_CTL_DEL;

    epoll_event tEpollEvent;
    tEpollEvent.events = 0;
    tEpollEvent.data.fd = poEvent->GetSocketFd();

    int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
    if (ret == -1)
    {
        PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d",
                m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd());

        //to do 
        //when error
        return;
    }

    m_mapEvent.erase(poEvent->GetSocketFd());
}

void EventLoop :: StartLoop()
{
    m_bIsEnd = false;
    while(true)
    {
        BP->GetNetworkBP()->TcpEpollLoop();

        int iNextTimeout = 1000;
        //迴圈處理超時事件,並通過引數返回未超時的最小時間間隔,用於epoll_wait的超時
        //Timer內部通過vector進行儲存,通過演算法排序取最小的東西
        DealwithTimeout(iNextTimeout);

        //PLHead("nexttimeout %d", iNextTimeout);
    //epoll_wait的封裝,根據觸發的fd查詢相應的ctx,在根據事件型別執行相應的操作
        OneLoop(iNextTimeout);

    //將感興趣的內容(Event)新增到EventLoop中,通過MessageEvent進行新增
        CreateEvent();

        if (m_poTcpClient != nullptr)
        {
        //為client中的Event新增EPOLLOUT
            m_poTcpClient->DealWithWrite();
        }

        if (m_bIsEnd)
        {
            PLHead("TCP.EventLoop [END]");
            break;
        }
    }
}

void EventLoop :: Stop()
{
    m_bIsEnd = true;
}

void EventLoop :: OneLoop(const int iTimeoutMs)
{
    int n = epoll_wait(m_iEpollFd, m_EpollEvents, MAX_EVENTS, 1);
    if (n == -1)
    {
        if (errno != EINTR)
        {
            PLErr("epoll_wait fail, errno %d", errno);
            return;
        }
    }

    for (int i = 0; i < n; i++)
    {
    //查詢ctx
        int iFd = m_EpollEvents[i].data.fd;
        auto it = m_mapEvent.find(iFd);
        if (it == end(m_mapEvent))
        {
            continue;
        }

        int iEvents = m_EpollEvents[i].events;
        Event * poEvent = it->second.m_poEvent;
    //找到所屬event然後執行相應的event的相應事件處理函式
        int ret = 0;
        if (iEvents & EPOLLERR)
        {
            OnError(iEvents, poEvent);
            continue;
        }

        try
        {
            if (iEvents & EPOLLIN)
            {
                ret = poEvent->OnRead();
            }

            if (iEvents & EPOLLOUT)
            {
                ret = poEvent->OnWrite();
            }
        }
        catch (...)
        {
            ret = -1;
        }

        if (ret != 0)
        {
            OnError(iEvents, poEvent);
        }
    }
}

void EventLoop :: OnError(const int iEvents, Event * poEvent)
{
    BP->GetNetworkBP()->TcpOnError();

    PLErr("event error, events %d socketfd %d socket ip %s errno %d", 
            iEvents, poEvent->GetSocketFd(), poEvent->GetSocketHost().c_str(), errno);
    //從epoll中刪除相關的監聽事件,並從map中刪除相關內容
    RemoveEvent(poEvent);

    bool bNeedDelete = false;
    //呼叫使用者的error處理函式並決定此event是否可以釋放資源。若此處不釋放好像會發生記憶體洩漏,因為在解構函式中,沒有進行釋放。
    //不過我看MessageEvent的OnError函式處理中對bNeedDelete 置true處理。
    //Notify中沒有做相關的處理,因為Notify相關的處理邏輯不再此處
    poEvent->OnError(bNeedDelete);

    if (bNeedDelete)
    {
        poEvent->Destroy();
    }
}

bool EventLoop :: AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID)
{
    if (poEvent->GetSocketFd() == 0)
    {
        return false;
    }    
    //此event沒有新增到event Map中,新增
    if (m_mapEvent.find(poEvent->GetSocketFd()) == end(m_mapEvent))
    {
        EventCtx tCtx;
        tCtx.m_poEvent = (Event *)poEvent;
        tCtx.m_iEvents = 0;

        m_mapEvent[poEvent->GetSocketFd()] = tCtx;
    }

   //建立定時器
    uint64_t llAbsTime = Time::GetSteadyClockMS() + iTimeout;
    m_oTimer.AddTimerWithType(llAbsTime, iType, iTimerID);
  //建立時間事件和fd的關聯關係
    m_mapTimerID2FD[iTimerID] = poEvent->GetSocketFd();

    return true;
}

void EventLoop :: RemoveTimer(const uint32_t iTimerID)
{
//移除此timeID和fd對應關係。因為Timer沒有移除介面,所以通過切斷中間的關聯關係,使其觸發時不執行相應的處理即可
    auto it = m_mapTimerID2FD.find(iTimerID);
    if (it != end(m_mapTimerID2FD))
    {
        m_mapTimerID2FD.erase(it);
    }
}

void EventLoop :: DealwithTimeoutOne(const uint32_t iTimerID, const int iType)
{
    //通過timeID查詢fd,再通過fd找event,找到event執行相應的OnTimeout函式即可
    auto it = m_mapTimerID2FD.find(iTimerID);
    if (it == end(m_mapTimerID2FD))
    {
        //PLErr("Timeout aready remove!, timerid %u iType %d", iTimerID, iType);
        return;
    }

    int iSocketFd = it->second;

    m_mapTimerID2FD.erase(it);

    auto eventIt = m_mapEvent.find(iSocketFd);
    if (eventIt == end(m_mapEvent))
    {
        return;
    }

    eventIt->second.m_poEvent->OnTimeout(iTimerID, iType);
}

void EventLoop :: DealwithTimeout(int & iNextTimeout)
{
    bool bHasTimeout = true;

    while(bHasTimeout)
    {
       //只要有超時,就會一直處理超時事件
        uint32_t iTimerID = 0;
        int iType = 0;
        bHasTimeout = m_oTimer.PopTimeout(iTimerID, iType);

        if (bHasTimeout)
        {
            DealwithTimeoutOne(iTimerID, iType);

            iNextTimeout = m_oTimer.GetNextTimeout();
            if (iNextTimeout != 0)
            {
                break;
            }
        }
    }
}

void EventLoop :: AddEvent(int iFD, SocketAddress oAddr)
{
    //生產者新增資料
    std::lock_guard<std::mutex> oLockGuard(m_oMutex);
    m_oFDQueue.push(make_pair(iFD, oAddr));
}

void EventLoop :: CreateEvent()
{
//消費者,消費資料
    std::lock_guard<std::mutex> oLockGuard(m_oMutex);

    if (m_oFDQueue.empty())
    {
        return;
    }
    //每次都清楚需要釋放的MessageEvent.決定權在每一次錯誤發生時使用者的處理中是否需要釋放onError函式中bNeedDelete 。若需要釋放則在此處進行釋放。不需要釋放則會一直保留
    ClearEvent();

    int iCreatePerTime = 200;
    while ((!m_oFDQueue.empty()) && iCreatePerTime--)
    {
        auto oData = m_oFDQueue.front();
        m_oFDQueue.pop();

    //建立event並新增到EventLoop中
        //create event for this fd
        MessageEvent * poMessageEvent = new MessageEvent(MessageEventType_RECV, oData.first, 
                oData.second, this, m_poNetWork);
        poMessageEvent->AddEvent(EPOLLIN);

        m_vecCreatedEvent.push_back(poMessageEvent);
    }
}

void EventLoop :: ClearEvent()
{
    for (auto it = m_vecCreatedEvent.begin(); it != end(m_vecCreatedEvent);)
    {
    //根據MessageEvent中的bool變數值,決定是否釋放其資源
        if ((*it)->IsDestroy())
        {
            delete (*it);
            it = m_vecCreatedEvent.erase(it);
        }
        else
        {
            it++;
        }
    }
}

int EventLoop :: GetActiveEventCount()
{
//返回總共的MessageEvent的數量
    std::lock_guard<std::mutex> oLockGuard(m_oMutex);
    ClearEvent();
    return (int)m_vecCreatedEvent.size();
}

}

手畫結構湊活著看吧

這裡寫圖片描述

EventLoop相關的程式碼邏輯就想當的清晰了。下一篇文章將詳細介紹TCP的工作過程。