1. 程式人生 > >muduo庫的Connector以及TcpClient的使用

muduo庫的Connector以及TcpClient的使用


一:Connector

    Connector可以說是muduo庫的聯結器,負責客戶端向伺服器發起連線。實際上說白了就是封裝了socket的connect操作。

Connector類的成員如下:

class Connector : boost::noncopyable,
                  public boost::enable_shared_from_this<Connector>
{
 public:
  typedef boost::function<void (int sockfd)> NewConnectionCallback;

  Connector(EventLoop* loop, const InetAddress& serverAddr);
  ~Connector();

  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }

  void start();  // can be called in any thread
  void restart();  // must be called in loop thread
  void stop();  // can be called in any thread

  const InetAddress& serverAddress() const { return serverAddr_; }

 private:
  enum States { kDisconnected, kConnecting, kConnected };
  static const int kMaxRetryDelayMs = 30*1000;  //預設最大重連時間30000ms
  static const int kInitRetryDelayMs = 500;    //預設重連延遲時間500ms

  void setState(States s) { state_ = s; }
  void startInLoop();
  void stopInLoop();
  void connect();
  void connecting(int sockfd);
  void handleWrite();
  void handleError();
  void retry(int sockfd);
  int removeAndResetChannel();
  void resetChannel();

  EventLoop* loop_;               //所屬的EventLoop
  InetAddress serverAddr_;    //伺服器端的地址
  bool connect_; // atomic          
  States state_;  // FIXME: use atomic variable
  boost::scoped_ptr<Channel> channel_;    //Connector所對應的Channel
  NewConnectionCallback newConnectionCallback_;   //連線成功回撥函式
  int retryDelayMs_;    //重連延遲時間(單位ms)
};
建構函式是這樣的:
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  : loop_(loop),
    serverAddr_(serverAddr),
    connect_(false),
    state_(kDisconnected),
    retryDelayMs_(kInitRetryDelayMs)   //初始化延時
{
  LOG_DEBUG << "ctor[" << this << "]";
}

建構函式初始化了I/O執行緒,伺服器地址,並設定為未連線狀態以及初始化了重連延時時間。注意,這裡的重連是指發起連線失敗重連。

Connector啟動流程是這樣的:

//發起連線
void Connector::start()
{
  connect_ = true;
  loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}

實際上呼叫了:

void Connector::startInLoop()
{
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_)   //呼叫前必須connect_為true,start()函式中會這麼做
  {
    connect();  //連線具體實現
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}

如果connect_為true,才真正開始連線,呼叫connect():

//連線實現
void Connector::connect()
{
  //設定非阻塞,否則退出
  int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)  //檢查錯誤碼
  {
    case 0:
    case EINPROGRESS:  //非阻塞套接字,未連線成功返回碼是EINPROGRESS表示正在連線
    case EINTR:
    case EISCONN:   //連線成功
      connecting(sockfd);
      break;

    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd);   //重連
      break;

    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);   //這幾種情況不能重連,
      break;

    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      break;
  }
}

這個函式雖然很長,但實際上就是設定套接字為非阻塞,然後底層呼叫socket的conenct()函式,進而判斷errno採取相應的操作。

操作主要有三種情況,下面分三點討論:

1.errno為EINPROGRESS、EINTR、EISCONN

    說明連線成功,呼叫connecting()函式,對成功的連線進行處理:

//如果連線成功
void Connector::connecting(int sockfd)
{
  setState(kConnecting);
  assert(!channel_);
  //Channel與sockfd關聯
  channel_.reset(new Channel(loop_, sockfd));
  //設定可寫回調函式,這時候如果socket沒有錯誤,sockfd就處於可寫狀態
  channel_->setWriteCallback(
      boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
      //設定錯誤回撥函式
  channel_->setErrorCallback(
      boost::bind(&Connector::handleError, this)); // FIXME: unsafe

  // channel_->tie(shared_from_this()); is not working,
  // as channel_ is not managed by shared_ptr
  //關注可寫事件
  channel_->enableWriting();
}

總的來說,連線成功就是更改連線狀態+設定各種回撥函式+加入poller關注可寫事件。

2.errno為 EAGAIN等錯誤碼

    說明連線暫時失敗,但是仍可能成功,需要重連。呼叫retry()函式:

//重連函式,採用back-off策略重連,也就是退避策略
//也就是重連時間逐漸延長,0.5s,1s,2s,...一直到30s
void Connector::retry(int sockfd)
{
  sockets::close(sockfd);   //先關閉連線
  setState(kDisconnected);  
  if (connect_)
  {
    LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
             << " in " << retryDelayMs_ << " milliseconds. ";
    //隔一段時間後重連,重新啟用startInLoop
    loop_->runAfter(retryDelayMs_/1000.0,
                    boost::bind(&Connector::startInLoop, shared_from_this()));
    //間隔時間2倍增長
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
  else   //超出最大重連時間後,輸出連線失敗
  {
    LOG_DEBUG << "do not connect";
  }
}
3.徹底失敗,返回errno為EACCES等錯誤碼

    這種情況只能關掉sockfd,因為再怎麼試也成功不了的。

 sockets::close(sockfd);

斷開連線函式就不剖析了,相對比較簡單,接下來看TcpClient類。

二:TcpClient

Tcpclient主要有下列成員:
 EventLoop* loop_;
  ConnectorPtr connector_; // avoid revealing Connector
  const string name_;
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  WriteCompleteCallback writeCompleteCallback_;
  bool retry_;   // atomic   //是否重連,是指建立的連線成功後又斷開是否重連。而Connector的重連是一直不成功是否重試的意思
  bool connect_; // atomic
  // always in loop thread
  int nextConnId_;  //name_+nextConnid_用於標識一個連線
  mutable MutexLock mutex_;
  TcpConnectionPtr connection_; // @GuardedBy mutex_ //Connector連線成功後,得到一個TcpConnection
需要注意的是,這裡的重連是連線成功後又斷開的重連,而不是Connector類的連不上一直嘗試的重連。

建構函式:

TcpClient::TcpClient(EventLoop* loop,
                     const InetAddress& serverAddr,
                     const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),
    connector_(new Connector(loop, serverAddr)),
    name_(nameArg),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    retry_(false),
    connect_(true),
    nextConnId_(1)
{
  //一旦連線建立連線,回撥newConnection
  connector_->setNewConnectionCallback(
      boost::bind(&TcpClient::newConnection, this, _1));
}
建構函式建立了一個Connector,畢竟是TcpClient嘛,需要一個東西來發起連線。並且註冊連線成功的回撥函式。

它的使用方法,先connect:

void TcpClient::connect()
{
  connect_ = true;
  connector_->start();
}

底層就是呼叫我們上文中分析的Connector的一系列連線機制,向服務端發起連線。

連線成功後,就會呼叫自己的成員函式newConnection()函式:
void TcpClient::newConnection(int sockfd)
{
  loop_->assertInLoopThread();
  InetAddress peerAddr(sockets::getPeerAddr(sockfd));
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  //建立一個TcpConnection物件,智慧指標。
  //根據Connector中的handleWrite()函式,連線建立後會把sockfd從poller中移除,以後不會再關注可寫事件了
  //否則會出現busy loop,因為已連線套接字一直處於可寫狀態
  TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
  {
    MutexLockGuard lock(mutex_);
    connection_ = conn;   //儲存TcpConnection
  }
  conn->connectEstablished();  //這裡會關注可讀事件,並且回撥connectionCallback_,
}
該函式建立一個堆上區域性TcpConnection物件,並用TcpClient的智慧指標connection_儲存起來,這樣本函式中conn即便析構掉,connection_依然維護該連線。

然後設定各種回撥函式。由於為了避免busy loop,在Connector中一旦連線成功,我們取消關注sockfd的可寫事件。並且本函式使用conn->connectEstablished()內部會關注可讀事件:

void TcpConnection::connectEstablished()
{
  loop_->assertInLoopThread();   //斷言處於loop執行緒
  assert(state_ == kConnecting);   //斷言處於未連線狀態
  setState(kConnected);   //將狀態設定為已連線

  channel_->tie(shared_from_this());   //將自身這個TcpConnection物件提升,由於是智慧指標,所以不能直接用this
  //shared_from_this()之後引用計數+1,為3,但是shared_from_this()是臨時物件,析構後又會減一,
  //而tie是weak_ptr並不會改變引用計數,所以該函式執行完之後引用計數不會更改
  
  channel_->enableReading();   //一旦連線成功就關注它的可讀事件,加入到Poller中關注

  connectionCallback_(shared_from_this());
}

下面是連線斷開的函式:

//連線斷開
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  assert(loop_ == conn->getLoop());

  {
    MutexLockGuard lock(mutex_);
    assert(connection_ == conn);
    connection_.reset();   //重置
  }

  //I/O執行緒中銷燬
  loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
  if (retry_ && connect_)  //是否發起重連
  {
    LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
             << connector_->serverAddress().toIpPort();
    //這裡的重連是連線成功後斷開的重連,所以實際上是重啟
    connector_->restart();
  }
}

基本就這些了。

相關推薦

muduoConnector以及TcpClient的使用

一:Connector     Connector可以說是muduo庫的聯結器,負責客戶端向伺服器發起連線。實際上說白了就是封裝了socket的connect操作。 Connector類的成員如下: class Connector : boost::noncopyabl

Oracle數據簡介以及windows安裝過程

oracle數據庫Oracle數據庫簡介也許很多人熟悉SQL server,並不是太了解Oracle數據庫,這裏進行一下簡單的介紹Oracle數據庫的創始人是勞倫斯.埃裏斯Oracle數據庫能被多個操作系統使用eg:windows,linux,Solaris,AIX等現在我們把Oracle和SQL serve

數據優化以及SQL優化小結

需求 char 解決 通配符 () 表結構 date omsa 系列 優化數據庫的方法 1、選取最適用的字段屬性 MySQL可以很好的支持大數據量的存取,但是一般說來,數據庫中的表越小,在它上面執行的查詢也就會越快。因此,在創建表的時候,為了獲得更好的性能,我們可以將表中字

Django models數據配置以及多數據聯用設置

業務邏輯 family imp 框架 路由器 數據庫 ros del not 今天來說說web框架Django怎麽配置使用數據庫,也就是傳說中MVC(Model View Controller)中的M,Model(模型)。 簡單介紹一下Django中的MVC: 模型(mod

Muduo】【base】一、Timestamp類

second 一個 macro fin ftime mac cat gap base 一、Timestamp類 1、類圖如下: 2、 知識點 (1) 這個類繼承了 muduo::copyable, 以及 boost::less_than_comparable.

在控制臺中操作MYSQL數據步驟以及一些小問題

以及 sele 打開 tac 當前 nav char 操作 成功 一直用Navicat來對MySQL數據庫進行操作,今天突然想試試用DOS控制臺來操作,特記錄自己第一次使用經歷,若有錯誤之處,還望大佬們指點。 首先打開控制臺,win+R鍵,輸入cmd,確定 輸入mysql

淺析muduo中的定時器設施

val read eid 就是 RR 重新 cal using dss 一個設計良好的定時器在服務端的應用程序上至關重要,muduo定時器的實現陳碩大牛在書中已經詳細的談過,筆者嘗試從源碼的角度解讀定時器的實現,如果理解不對,歡迎指正。 在muduo的定時器系統中,一共由四

muduo中TcpServer一次完整的工作流程

函數 sep sock accep mes pin map all some 模擬單線程情況下muduo庫的工作情況 muduo的源代碼對於一個初學者來說還是有一些復雜的,其中有很多的回調函數以及交叉的組件,下面我將追蹤一次TCP連接過程中發生的事情,不會出現用戶態的源碼,

muduo中的核心:std::bind和std::function

muduo main ons 源碼 綁定 func 靜態成員 con 函數 最近在讀完陳碩大牛的《Linux多線程服務端編程》以及muduo源碼後,對其中的一些實現細節有著十分深刻的印象,尤其是使用std::bind和std::function的回調技術。可以說,這兩個大殺

淺析muduo中的線程設施

%d except a* empty fde returns exceptio HR 參數傳遞 muduo是目前我在學習過程中遇到的最具有學習意義的網絡庫,下文將分析muduo庫中的基礎設施--Thread和ThreadPool. 首先,介紹在多線程編程中不可缺少的同步措施

linux共享以及/etc/ld.so.conf檔案的應用

靜態連結的可執行程式。靜態可執行程式包含執行所需的所有函式 — 換句話說,它們是“完整的”。因為這一原因,靜態可執行程式不依賴任何外部庫就可以執行。檔案大 動態連結的可執行程式。動態可執行程式是不完整的程式,它依靠外部共享庫來提供執行所需的許多函式。檔案小 我們可以用 ldd 命令來確定某一

數據 : 事物以及隔離性導致的問題

read mit eat 幻讀 另一個 tab 隔離級別 zab date 事務的特性: 原子性: 事務不可分割一致性: 事務執行前後數據完整性保持一致隔離性: 一個事務的執行不能受到其他事務的幹擾持久性: 一旦事務結束, 數據就持久化到數據庫---------------

[git]新建以及和本地的資料夾關聯

命令列指令 Git 全域性設定 git config --global user.name "xxx " git config --global user.email " [email protected]" 建立新版本庫 git clone https://co

muduo介紹與安裝

全部程式碼5000行 (不含測試) 執行緒安全,原生支援多核多執行緒 不考慮移植性,不跨平臺,只支援Linux,不支援Windows 主要支援x86-64,兼顧IA32 不支援UDP,只支援TCP 不支援IPv6,只支援IPv4 不考慮廣域網應用,只考慮區域網(實際上muduo也可以用在

muduo使用示例之聊天伺服器(下)

借用shared_ptr實現copy on write shared_ptr是引用計數智慧指標,如果當前只有一個觀察者,那麼引用計數為1,可以用shared_ptr::unique()來判斷 對於共享資源有兩個端,分別是read端、write端 對於write端,

muduo使用示例之聊天伺服器(上)

本文程式碼存放在muduo\examples\asio\chat目錄下 聊天伺服器示意圖 實現的功能:任何一個Client給Server傳送訊息後,Server都會將該訊息回射給連線上來的所有Client muduo實現一個聊天室伺服器,客戶傳送的訊息將廣播到連入的所有客戶

muduo使用示例之檔案傳輸filetransfer

檔案傳輸(傳送)有下面3個版本,程式碼實現在muduo\examples\filetransfer 目錄中。 版本1:一次性將檔案讀入記憶體併發送 一次性將檔案讀入記憶體中的string fileContent,一次性呼叫send(const string&)傳送完畢。=

muduo使用示例之3種網路模型(Sudoku求解伺服器)

10種大併發伺服器設計方案與muduo庫網路模型使用https://blog.csdn.net/weixin_36750623/article/details/84329044 muduo庫網路模型使用 程式碼存放在muduo-master\examples\sudoku下

元件化開發之私有製作以及常見問題

前言:這篇文章主要描述私有庫的製作過程以及本人在使用過程中的一些問題和解決方案,提到元件化就不得不想到pods私有庫相關的東西(當然元件化不侷限於結合私有庫使用,還可以做成靜態庫或者多target開發等方式,這裡只講解私有庫相關的東西,稍後會出一篇元件化結合私有庫實現元件化開發的方式) 私有

【轉】系統呼叫和函式以及API

         在寫程式的過程中,像MFC,VC++這些程式設計,都會涉及到函式的呼叫,有庫函式也有系統函式,下面看一看它們的區別!!            系統呼叫(system