Phxpaxos網路部分(3) —— TCP事件迴圈
阿新 • • 發佈:2019-02-19
上一部份介紹了TCP模組大略的類構成。在檢視相關的TcpRead/TcpWrite/TcpAcceptor的過程中發現,都包含以個EventLoop類。這是事件處理主迴圈類。在介紹其他組成部分前,我們先分析一下此類。程式碼在/src/communicate/tcp/ecent_loop.h/cpp
檔案中。順帶手介紹一下里面的事件類Event。
先看一下Event類(含有純虛擬函式)肯定要做為父類進行使用,在event_base.h
中。其子類兩個:Notify(封裝管道,主要用來進行事件通知),MessageEvent(封裝socket,主要的處理邏輯)
class Event
{
public:
//傳入eventloop指標,將Event與eventloop關聯,時間、事件、資料的處理都是在eventloop中觸發進行的
Event (EventLoop * poEventLoop);
virtual ~Event();
//在EventLoop中通過fd關聯Event。然後執行相應的邏輯(read\write\error\timeout)
virtual int GetSocketFd() const = 0;
virtual const std::string & GetSocketHost() = 0;
virtual int OnRead();
virtual int OnWrite();
virtual void OnError(bool & bNeedDelete) = 0 ;
virtual void OnTimeout(const uint32_t iTimerID, const int iType);
public:
//向EventLoop中新增、刪除事件,iEvents是EPOLLIN等邏輯結果
void AddEvent(const int iEvents);
void RemoveEvent(const int iEvents);
void JumpoutEpollWait();
const bool IsDestroy() const;
void Destroy();
public :
//向EventLoop中新增、刪除定時事件
void AddTimer(const int iTimeoutMs, const int iType, uint32_t & iTimerID);
void RemoveTimer(const uint32_t iTimerID);
protected:
int m_iEvents;
EventLoop * m_poEventLoop;
bool m_bIsDestroy;
};
下面是Event及其子類Notify和MessageEvent的手畫類圖,湊活著看吧
EventLoop程式碼部分:
在第一眼看其標頭檔案根據其名稱分析出其某些功能:
1. 事件的新增修改刪除
2. 主事件迴圈處理
3. 定時事件新增刪除
class EventLoop
{
public:
EventLoop(NetWork * poNetWork);
virtual ~EventLoop();
int Init(const int iEpollLength);
//事件維護(增刪改,改中封裝了增)
void ModEvent(const Event * poEvent, const int iEvents);
void RemoveEvent(const Event * poEvent);
//開始主事件迴圈
void StartLoop();
void Stop();
void OnError(const int iEvents, Event * poEvent);
virtual void OneLoop(const int iTimeoutMs);
public:
void SetTcpClient(TcpClient * poTcpClient);
//跳出主時間迴圈,通過Notify實現,其內部通過pipe實現
void JumpoutEpollWait();
public:
//時間事件的新增刪除處理。通過觀察下面私有的資料結果發現,內部通過管理TimeID進行時間事件的管理
//timeID-->fd--->Event--->onTimeout(關聯關係)
bool AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID);
void RemoveTimer(const uint32_t iTimerID);
void DealwithTimeout(int & iNextTimeout);
void DealwithTimeoutOne(const uint32_t iTimerID, const int iType);
public:
void AddEvent(int iFD, SocketAddress oAddr);
void CreateEvent();
void ClearEvent();
int GetActiveEventCount();
public:
//事件上下文,用來進行索引,包括event和相關監聽事件
typedef struct EventCtx
{
Event * m_poEvent;
int m_iEvents; //監聽內容 EPOLLIN,EPOLLOUT
} EventCtx_t;
private:
bool m_bIsEnd;
protected:
int m_iEpollFd;
epoll_event m_EpollEvents[MAX_EVENTS];
std::map<int, EventCtx_t> m_mapEvent; //fd與event對應關係map
NetWork * m_poNetWork;
TcpClient * m_poTcpClient;
Notify * m_poNotify; //內部事件通知使用
protected:
Timer m_oTimer;
std::map<uint32_t, int> m_mapTimerID2FD; //timeID 與 fd對應關係map
//新增事件的queue,每一個包含其fd及網路socket資訊(IP,Port等),之後建立MessageEvent進行相關事件的處理
//此處又是一個多生產者單消費者模型。
std::queue<std::pair<int, SocketAddress> > m_oFDQueue;
std::mutex m_oMutex;
std::vector<MessageEvent *> m_vecCreatedEvent; //MessageEvent 繼承自Event,新增到此EventLoop
};
event_loop.cpp
#include "event_loop.h"
#include "event_base.h"
#include "tcp_acceptor.h"
#include "tcp_client.h"
#include "comm_include.h"
#include "message_event.h"
#include "phxpaxos/network.h"
using namespace std;
namespace phxpaxos
{
EventLoop :: EventLoop(NetWork * poNetWork)
{
m_iEpollFd = -1;
m_bIsEnd = false;
m_poNetWork = poNetWork;
m_poTcpClient = nullptr;
m_poNotify = nullptr;
memset(m_EpollEvents, 0, sizeof(m_EpollEvents));
}
EventLoop :: ~EventLoop()
{
ClearEvent();
}
// 通過管道傳送訊息,跳出epoll_wait()
void EventLoop :: JumpoutEpollWait()
{
m_poNotify->SendNotify();
}
void EventLoop :: SetTcpClient(TcpClient * poTcpClient)
{
m_poTcpClient = poTcpClient;
}
int EventLoop :: Init(const int iEpollLength)
{
//epoll建立
m_iEpollFd = epoll_create(iEpollLength);
if (m_iEpollFd == -1)
{
PLErr("epoll_create fail, ret %d", m_iEpollFd);
return -1;
}
m_poNotify = new Notify(this);
assert(m_poNotify != nullptr);
//將管道通知類Notify新增到EventLoop(EPOLLIN事件)
int ret = m_poNotify->Init();
if (ret != 0)
{
return ret;
}
return 0;
}
void EventLoop :: ModEvent(const Event * poEvent, const int iEvents)
{
//從所有的fd、event對應關係中尋找,確定新增還是修改
//之後epoll_ctl改變感興趣內容
//最後更新map中fd對應的ctx
auto it = m_mapEvent.find(poEvent->GetSocketFd());
int iEpollOpertion = 0;
if (it == end(m_mapEvent))
{
iEpollOpertion = EPOLL_CTL_ADD;
}
else
{
iEpollOpertion = it->second.m_iEvents ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
}
epoll_event tEpollEvent;
tEpollEvent.events = iEvents;
tEpollEvent.data.fd = poEvent->GetSocketFd();
int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
if (ret == -1)
{
PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d EpollEvent %d",
m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), iEvents);
//to do
return;
}
EventCtx tCtx;
tCtx.m_poEvent = (Event *)poEvent;
tCtx.m_iEvents = iEvents;
m_mapEvent[poEvent->GetSocketFd()] = tCtx;
}
void EventLoop :: RemoveEvent(const Event * poEvent)
{
//與新增類似只不過將新增改為刪除,其他內容一樣
auto it = m_mapEvent.find(poEvent->GetSocketFd());
if (it == end(m_mapEvent))
{
return;
}
int iEpollOpertion = EPOLL_CTL_DEL;
epoll_event tEpollEvent;
tEpollEvent.events = 0;
tEpollEvent.data.fd = poEvent->GetSocketFd();
int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
if (ret == -1)
{
PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d",
m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd());
//to do
//when error
return;
}
m_mapEvent.erase(poEvent->GetSocketFd());
}
void EventLoop :: StartLoop()
{
m_bIsEnd = false;
while(true)
{
BP->GetNetworkBP()->TcpEpollLoop();
int iNextTimeout = 1000;
//迴圈處理超時事件,並通過引數返回未超時的最小時間間隔,用於epoll_wait的超時
//Timer內部通過vector進行儲存,通過演算法排序取最小的東西
DealwithTimeout(iNextTimeout);
//PLHead("nexttimeout %d", iNextTimeout);
//epoll_wait的封裝,根據觸發的fd查詢相應的ctx,在根據事件型別執行相應的操作
OneLoop(iNextTimeout);
//將感興趣的內容(Event)新增到EventLoop中,通過MessageEvent進行新增
CreateEvent();
if (m_poTcpClient != nullptr)
{
//為client中的Event新增EPOLLOUT
m_poTcpClient->DealWithWrite();
}
if (m_bIsEnd)
{
PLHead("TCP.EventLoop [END]");
break;
}
}
}
void EventLoop :: Stop()
{
m_bIsEnd = true;
}
void EventLoop :: OneLoop(const int iTimeoutMs)
{
int n = epoll_wait(m_iEpollFd, m_EpollEvents, MAX_EVENTS, 1);
if (n == -1)
{
if (errno != EINTR)
{
PLErr("epoll_wait fail, errno %d", errno);
return;
}
}
for (int i = 0; i < n; i++)
{
//查詢ctx
int iFd = m_EpollEvents[i].data.fd;
auto it = m_mapEvent.find(iFd);
if (it == end(m_mapEvent))
{
continue;
}
int iEvents = m_EpollEvents[i].events;
Event * poEvent = it->second.m_poEvent;
//找到所屬event然後執行相應的event的相應事件處理函式
int ret = 0;
if (iEvents & EPOLLERR)
{
OnError(iEvents, poEvent);
continue;
}
try
{
if (iEvents & EPOLLIN)
{
ret = poEvent->OnRead();
}
if (iEvents & EPOLLOUT)
{
ret = poEvent->OnWrite();
}
}
catch (...)
{
ret = -1;
}
if (ret != 0)
{
OnError(iEvents, poEvent);
}
}
}
void EventLoop :: OnError(const int iEvents, Event * poEvent)
{
BP->GetNetworkBP()->TcpOnError();
PLErr("event error, events %d socketfd %d socket ip %s errno %d",
iEvents, poEvent->GetSocketFd(), poEvent->GetSocketHost().c_str(), errno);
//從epoll中刪除相關的監聽事件,並從map中刪除相關內容
RemoveEvent(poEvent);
bool bNeedDelete = false;
//呼叫使用者的error處理函式並決定此event是否可以釋放資源。若此處不釋放好像會發生記憶體洩漏,因為在解構函式中,沒有進行釋放。
//不過我看MessageEvent的OnError函式處理中對bNeedDelete 置true處理。
//Notify中沒有做相關的處理,因為Notify相關的處理邏輯不再此處
poEvent->OnError(bNeedDelete);
if (bNeedDelete)
{
poEvent->Destroy();
}
}
bool EventLoop :: AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID)
{
if (poEvent->GetSocketFd() == 0)
{
return false;
}
//此event沒有新增到event Map中,新增
if (m_mapEvent.find(poEvent->GetSocketFd()) == end(m_mapEvent))
{
EventCtx tCtx;
tCtx.m_poEvent = (Event *)poEvent;
tCtx.m_iEvents = 0;
m_mapEvent[poEvent->GetSocketFd()] = tCtx;
}
//建立定時器
uint64_t llAbsTime = Time::GetSteadyClockMS() + iTimeout;
m_oTimer.AddTimerWithType(llAbsTime, iType, iTimerID);
//建立時間事件和fd的關聯關係
m_mapTimerID2FD[iTimerID] = poEvent->GetSocketFd();
return true;
}
void EventLoop :: RemoveTimer(const uint32_t iTimerID)
{
//移除此timeID和fd對應關係。因為Timer沒有移除介面,所以通過切斷中間的關聯關係,使其觸發時不執行相應的處理即可
auto it = m_mapTimerID2FD.find(iTimerID);
if (it != end(m_mapTimerID2FD))
{
m_mapTimerID2FD.erase(it);
}
}
void EventLoop :: DealwithTimeoutOne(const uint32_t iTimerID, const int iType)
{
//通過timeID查詢fd,再通過fd找event,找到event執行相應的OnTimeout函式即可
auto it = m_mapTimerID2FD.find(iTimerID);
if (it == end(m_mapTimerID2FD))
{
//PLErr("Timeout aready remove!, timerid %u iType %d", iTimerID, iType);
return;
}
int iSocketFd = it->second;
m_mapTimerID2FD.erase(it);
auto eventIt = m_mapEvent.find(iSocketFd);
if (eventIt == end(m_mapEvent))
{
return;
}
eventIt->second.m_poEvent->OnTimeout(iTimerID, iType);
}
void EventLoop :: DealwithTimeout(int & iNextTimeout)
{
bool bHasTimeout = true;
while(bHasTimeout)
{
//只要有超時,就會一直處理超時事件
uint32_t iTimerID = 0;
int iType = 0;
bHasTimeout = m_oTimer.PopTimeout(iTimerID, iType);
if (bHasTimeout)
{
DealwithTimeoutOne(iTimerID, iType);
iNextTimeout = m_oTimer.GetNextTimeout();
if (iNextTimeout != 0)
{
break;
}
}
}
}
void EventLoop :: AddEvent(int iFD, SocketAddress oAddr)
{
//生產者新增資料
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
m_oFDQueue.push(make_pair(iFD, oAddr));
}
void EventLoop :: CreateEvent()
{
//消費者,消費資料
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
if (m_oFDQueue.empty())
{
return;
}
//每次都清楚需要釋放的MessageEvent.決定權在每一次錯誤發生時使用者的處理中是否需要釋放onError函式中bNeedDelete 。若需要釋放則在此處進行釋放。不需要釋放則會一直保留
ClearEvent();
int iCreatePerTime = 200;
while ((!m_oFDQueue.empty()) && iCreatePerTime--)
{
auto oData = m_oFDQueue.front();
m_oFDQueue.pop();
//建立event並新增到EventLoop中
//create event for this fd
MessageEvent * poMessageEvent = new MessageEvent(MessageEventType_RECV, oData.first,
oData.second, this, m_poNetWork);
poMessageEvent->AddEvent(EPOLLIN);
m_vecCreatedEvent.push_back(poMessageEvent);
}
}
void EventLoop :: ClearEvent()
{
for (auto it = m_vecCreatedEvent.begin(); it != end(m_vecCreatedEvent);)
{
//根據MessageEvent中的bool變數值,決定是否釋放其資源
if ((*it)->IsDestroy())
{
delete (*it);
it = m_vecCreatedEvent.erase(it);
}
else
{
it++;
}
}
}
int EventLoop :: GetActiveEventCount()
{
//返回總共的MessageEvent的數量
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
ClearEvent();
return (int)m_vecCreatedEvent.size();
}
}
手畫結構湊活著看吧
EventLoop相關的程式碼邏輯就想當的清晰了。下一篇文章將詳細介紹TCP的工作過程。