muduo_net程式碼剖析之聯結器Connector
阿新 • • 發佈:2018-12-05
Connector用於client向server主動發起連線,並有自動重連的功能
Connector只負責建立socket連線,不負責建立TcpCOnnection(下文中的TcpClient類實現TcpCOnnection的建立)
Connector
- 在非阻塞網路程式設計中,發起連線的基本方式是呼叫connect(2),當socket變得可寫時表明連線建立完畢,其中要處理各種型別的錯誤,我們把它封裝為Connector class.
- 還有一點就是錯誤處理,socket可寫不一定就是連線建立好了 , 當連線建立出錯時,套介面描述符變成既可讀又可寫,這時我們可以通過呼叫getsockopt來得到套介面上待處理的錯誤(SO_ERROR).
- 其次非阻塞網路程式設計中connect(2)的sockfd是一次性的,一旦出錯(比如對方拒絕連線),就無法恢復,只能關閉重來。 但Connector是可以反覆使用的, 因此每次嘗試連線都要使用新的socket檔案描述符和新的Channel物件。要注意的就是Channel的生命期管理了.
程式碼剖析
只有5個共有介面
void setNewConnectionCallback(const NewConnectionCallback& cb);
void start(); // can be called in any thread
void restart(); // must be called in loop thread
void stop(); // can be called in any thread
const InetAddress& serverAddress() const;
- 成員函式
EventLoop* loop_;
InetAddress serverAddr_; //要連線的伺服器端地址
bool connect_; // atomic 表示是否要去建立連線
States state_; // FIXME: use atomic variable
std::unique_ptr<Channel> channel_;//客戶端用於通訊的sockfd建立的channel_
NewConnectionCallback newConnectionCallback_; //連線建立成功的回撥函式
int retryDelayMs_; //重連延遲事件(單位:ms)
- 建構函式
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop),
serverAddr_(serverAddr),
connect_(false), // 現在不去建立連線
state_(kDisconnected),
retryDelayMs_(kInitRetryDelayMs)
{
LOG_DEBUG << "ctor[" << this << "]";
}
- start()
第一步:建立非阻塞socket,返回套介面描述符;
第二步:connect(2)開始建立連線;
第三步:根據connect的返回值和errno,判斷連線是否成功建立,見下面3中情況 ==>
① 如果connect返回0,表示連線建立成功;如果錯誤為EINPROGRESS表示連線正在進行,可以等待select()變的可寫,再通過getsockopt(___,SO_ERROR)來確定連線是否真正的建立成功。
② EAGAIN、EADDRINUSE、EADDRNOTAVAIL、ECONNREFUSED、ENETUNREACH 像EAGAIN 這類表明本機臨時埠暫時用完的錯誤、可以嘗試重連。
③ EACCES、EPERM、EAFNOSUPPORT、EALREADY、EBADF、EFAULT、ENOTSOCK 其他錯誤像無許可權,協議錯誤等,就直接關閉套接字。
當連線成功建立後,會呼叫newConnectionCallback_函式,該回調函式可以使用setNewConnectionCallback進行設定
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
重連策略retry:採用bake-off退避策略重連,即重連時間逐漸延長,0.5s,1s,2s,…,直至30s
void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
<< " in " << retryDelayMs_ << " milliseconds. ";
//註冊一個定時操作,重連
loop_->runAfter(retryDelayMs_/1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}
start()程式碼實現:
void Connector::start() //public介面
{
connect_ = true; //表示開始向server發起連線請求
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); //FIXME: unsafe
}
void Connector::startInLoop() //private介面
{
loop_->assertInLoopThread(); //斷言處於IO執行緒中
assert(state_ == kDisconnected); //沒連線狀態
if (connect_)
{
connect(); //向server發起連線的步驟
}
else
{
LOG_DEBUG << "do not connect";
}
}
void Connector::connect()//private介面
{
//建立非阻塞套接字sockfd
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
//
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0: //connect返回0,表示連線建立成功
case EINPROGRESS: //表示連線正在進行,可以等待select()變的可寫
case EINTR:
case EISCONN: //連線成功
connecting(sockfd);
break;
case EAGAIN: //表明本機臨時埠暫時用完的錯誤,要關閉socket再延期重試
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd); //重連
break;
case EACCES: //其他錯誤像無許可權,協議錯誤等,就直接關閉套接字
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
void Connector::connecting(int sockfd) //private介面
{
setState(kConnecting);
assert(!channel_);
channel_.reset(new Channel(loop_, sockfd));
//一旦select()可寫,就會呼叫handleWrite
channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe
channel_->enableWriting(); //關注可寫事件
}
void Connector::handleWrite() //private介面
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
//從poller中移除關注,並將channel置空,防止可寫造成busy loop
int sockfd = removeAndResetChannel();
//sockfd可寫並不意味著連線一定建立成功
//還需要用getsockopt(sockfd,SOL_SOCKET,SO_ERROR,...)再次確認一下
int err = sockets::getSocketError(sockfd);
if (err)//有錯誤
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
retry(sockfd); //重連
}
else if (sockets::isSelfConnect(sockfd))//自連線
{
LOG_WARN << "Connector::handleWrite - Self connect";
retry(sockfd);//重連
}
else //連線成功
{
setState(kConnected);
if (connect_)
{
newConnectionCallback_(sockfd);
}
else
{
sockets::close(sockfd);
}
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
- restart() // 重啟連線
void Connector::restart()
{
loop_->assertInLoopThread();
setState(kDisconnected); //設定連線為斷開標誌
retryDelayMs_ = kInitRetryDelayMs; //超時時間為0.5s
connect_ = true; //設定去建立連線的標誌
startInLoop(); //發起連線
}
- stop()關閉套接字,刪除註冊的通道,停止連線
void Connector::stop()//停止、關閉連線的sockfd
{
connect_ = false;
loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
// FIXME: cancel timer
}
void Connector::stopInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnecting)
{
setState(kDisconnected);
int sockfd = removeAndResetChannel();
retry(sockfd); //這裡並非重連,只是呼叫socket::close(sockfd)
}
}