1. 程式人生 > >muduo_net程式碼剖析之聯結器Connector

muduo_net程式碼剖析之聯結器Connector

Connector用於client向server主動發起連線,並有自動重連的功能
Connector只負責建立socket連線,不負責建立TcpCOnnection(下文中的TcpClient類實現TcpCOnnection的建立)

Connector

  1. 在非阻塞網路程式設計中,發起連線的基本方式是呼叫connect(2),當socket變得可寫時表明連線建立完畢,其中要處理各種型別的錯誤,我們把它封裝為Connector class.
  2. 還有一點就是錯誤處理,socket可寫不一定就是連線建立好了 , 當連線建立出錯時,套介面描述符變成既可讀又可寫,這時我們可以通過呼叫getsockopt來得到套介面上待處理的錯誤(SO_ERROR).
  3. 其次非阻塞網路程式設計中connect(2)的sockfd是一次性的,一旦出錯(比如對方拒絕連線),就無法恢復,只能關閉重來。 但Connector是可以反覆使用的, 因此每次嘗試連線都要使用新的socket檔案描述符和新的Channel物件。要注意的就是Channel的生命期管理了.

程式碼剖析

只有5個共有介面

void setNewConnectionCallback(const NewConnectionCallback& cb);

void start();  // can be called in any thread
void restart();
// must be called in loop thread void stop(); // can be called in any thread const InetAddress& serverAddress() const;
  1. 成員函式
EventLoop* loop_;
InetAddress serverAddr_; //要連線的伺服器端地址
bool connect_; // atomic 表示是否要去建立連線
States state_;  // FIXME: use atomic variable
std::unique_ptr<Channel>
channel_;//客戶端用於通訊的sockfd建立的channel_ NewConnectionCallback newConnectionCallback_; //連線建立成功的回撥函式 int retryDelayMs_; //重連延遲事件(單位:ms)
  1. 建構函式
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  : loop_(loop),
    serverAddr_(serverAddr),
    connect_(false), // 現在不去建立連線
    state_(kDisconnected),
    retryDelayMs_(kInitRetryDelayMs)
{
  LOG_DEBUG << "ctor[" << this << "]";
}
  1. start()
    第一步:建立非阻塞socket,返回套介面描述符;
    第二步:connect(2)開始建立連線;
    第三步:根據connect的返回值和errno,判斷連線是否成功建立,見下面3中情況 ==>
    ① 如果connect返回0,表示連線建立成功;如果錯誤為EINPROGRESS表示連線正在進行,可以等待select()變的可寫,再通過getsockopt(___,SO_ERROR)來確定連線是否真正的建立成功。
    ② EAGAIN、EADDRINUSE、EADDRNOTAVAIL、ECONNREFUSED、ENETUNREACH 像EAGAIN 這類表明本機臨時埠暫時用完的錯誤、可以嘗試重連。
    ③ EACCES、EPERM、EAFNOSUPPORT、EALREADY、EBADF、EFAULT、ENOTSOCK 其他錯誤像無許可權,協議錯誤等,就直接關閉套接字。
    在這裡插入圖片描述

當連線成功建立後,會呼叫newConnectionCallback_函式,該回調函式可以使用setNewConnectionCallback進行設定

void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }

重連策略retry:採用bake-off退避策略重連,即重連時間逐漸延長,0.5s,1s,2s,…,直至30s

void Connector::retry(int sockfd)
{
  sockets::close(sockfd);
  setState(kDisconnected);
  if (connect_)
  {
    LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
             << " in " << retryDelayMs_ << " milliseconds. ";
    //註冊一個定時操作,重連
	loop_->runAfter(retryDelayMs_/1000.0,
                    std::bind(&Connector::startInLoop, shared_from_this()));
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}

start()程式碼實現:

void Connector::start() //public介面
{
  connect_ = true; //表示開始向server發起連線請求
  loop_->runInLoop(std::bind(&Connector::startInLoop, this)); //FIXME: unsafe
}

void Connector::startInLoop() //private介面
{
  loop_->assertInLoopThread(); //斷言處於IO執行緒中
  assert(state_ == kDisconnected); //沒連線狀態
  if (connect_) 
  {
    connect(); //向server發起連線的步驟
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}
void Connector::connect()//private介面
{
  //建立非阻塞套接字sockfd
  int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
  //
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)
  {
    case 0: //connect返回0,表示連線建立成功
    case EINPROGRESS: //表示連線正在進行,可以等待select()變的可寫
    case EINTR:
    case EISCONN: //連線成功
      connecting(sockfd);
      break;

    case EAGAIN:  //表明本機臨時埠暫時用完的錯誤,要關閉socket再延期重試
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd); //重連 
      break;

    case EACCES: //其他錯誤像無許可權,協議錯誤等,就直接關閉套接字
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      break;

    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }
}
void Connector::connecting(int sockfd) //private介面
{
  setState(kConnecting);
  assert(!channel_);
  channel_.reset(new Channel(loop_, sockfd));
  
  //一旦select()可寫,就會呼叫handleWrite
  channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
  channel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe

  channel_->enableWriting(); //關注可寫事件
}
void Connector::handleWrite() //private介面
{
  LOG_TRACE << "Connector::handleWrite " << state_;

  if (state_ == kConnecting) 
  {
    //從poller中移除關注,並將channel置空,防止可寫造成busy loop
	int sockfd = removeAndResetChannel();
	
	//sockfd可寫並不意味著連線一定建立成功
	//還需要用getsockopt(sockfd,SOL_SOCKET,SO_ERROR,...)再次確認一下
    int err = sockets::getSocketError(sockfd);
    if (err)//有錯誤
    {
      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
               << err << " " << strerror_tl(err);
      retry(sockfd); //重連
    }
    else if (sockets::isSelfConnect(sockfd))//自連線
    {
      LOG_WARN << "Connector::handleWrite - Self connect";
      retry(sockfd);//重連
    }
    else //連線成功
    {
      setState(kConnected);
      if (connect_)
      {
        newConnectionCallback_(sockfd);
      }
      else
      {
        sockets::close(sockfd);
      }
    }
  }
  else
  {
    // what happened?
    assert(state_ == kDisconnected);
  }
}
  1. restart() // 重啟連線
void Connector::restart()
{
  loop_->assertInLoopThread();
  setState(kDisconnected); //設定連線為斷開標誌
  retryDelayMs_ = kInitRetryDelayMs; //超時時間為0.5s
  connect_ = true; //設定去建立連線的標誌
  startInLoop(); //發起連線
}
  1. stop()關閉套接字,刪除註冊的通道,停止連線
void Connector::stop()//停止、關閉連線的sockfd
{
  connect_ = false;
  loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
  // FIXME: cancel timer
}

void Connector::stopInLoop()
{
  loop_->assertInLoopThread();
  if (state_ == kConnecting)
  {
    setState(kDisconnected);
    int sockfd = removeAndResetChannel();
    retry(sockfd); //這裡並非重連,只是呼叫socket::close(sockfd)
  }
}