muduo_net程式碼剖析之TcpConnection
TcpConnection類可謂是muduo最核心也是最複雜的類,它的標頭檔案和原始檔一共有450多行,是muduo最大的類。
TcpConnection是muduo裡唯一預設使用shared_ptr來管理的class,也是唯一繼承enable_shared_from_this的類,這源於其模糊的生命期。
1、TcpServer在接收並建立連線的過程中,TcpConnection做了什麼工作?
經過前文對TcpServer類的講解以及上圖的展示,我們知道:
- TcpConnection是TcpServer的一個成員變數
- 當TcpServer接收到client的連線請求後,將會回撥newConnection()函式:先建立TcpConnection物件用來與client連線,再執行connectEstablished()函式
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
... ...
//2.在ioLoop上,建立一個TcpConnection物件
//建構函式的定義請向下看
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
//3.將這個物件加入到ConnectionMap雜湊表中
connections_[connName] = conn;
//4.使用TcpServer類的成員變數,給TcpConnection的成員變數賦值
conn->setConnectionCallback(connectionCallback_); //defaultConnectionCallback
conn->setMessageCallback(messageCallback_) ; //defaultMessageCallback
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));
//5.呼叫conn->connectEstablished()
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
//TcpConnection建構函式
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)), //sockfd連線的對方是[核心中的傳送/接收緩衝區]
channel_(new Channel(loop, sockfd)), //channel_
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
//當通道POLLIN事件到來的時候
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, _1));
//當通道POLLOUT事件到來的時候
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}
- 在connectEstablished()函式中,實現了:關注sockfd/channel_的讀事件;回撥connectionCallback_(即defaultConnectionCallback)
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); //關注sockfd的讀事件
//回撥connectionCallback_
// connectionCallback_的預設值為defaultConnectionCallback
connectionCallback_(shared_from_this());
}
上面就是新的client連線到TcpServer後,所做的一些工作,主要是建立TcpConnection物件用於連線,並對TcpConnection進行初始化。
2、TcpConnection成員函式詳解
EventLoop* loop_; //需要一個唯一eventloop
const string name_; //name 為了標識一個連結,用於log
StateE state_; // FIXME: use atomic variable
bool reading_;
//we don't expose those classes to client.
std::unique_ptr<Socket> socket_; //sockfd 是該連線的檔案描述符
std::unique_ptr<Channel> channel_; //用sockfe建立的channel_
const InetAddress localAddr_;
const InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_; //
HighWaterMarkCallback highWaterMarkCallback_; //高水位線
CloseCallback closeCallback_;
size_t highWaterMark_;//當outputBuffer_的資料超過時後執行對應的操作
Buffer inputBuffer_; //應用層接收緩衝區
Buffer outputBuffer_; //應用層傳送緩衝區
boost::any context_; //表示連線物件可以繫結一個未知型別的上下文物件
- 有應用層的傳送和接收緩衝區:inputBuffer_、outputBuffer_
- highWaterMark_與highWaterMarkCallback_:分別表示高水位線和高水位線回撥函式
Q:什麼情況下會呼叫highWaterMarkCallback_?
A:當Buffer中的readIndex_到達highWaterMark_時,將會觸發highWaterMarkCallback_ - 非常重要的引數:writeCompleteCallback_
Q1:什麼時候會觸發writeCompleteCallback_回撥函式?
A1:兩種情況,分別是 ==> ①當用戶直接呼叫write(channel_->fd(), data, len),並且將所有的資料全部都發送給了核心緩衝區 ②POLLOUT事件觸發(回調了TcpConnection::handleWrite),導致outputBuffer_中的資料全部轉移到核心緩衝區
Q2:什麼情況下需要關注writeCompleteCallback_回撥函式?
A2:編寫大流量的應用程式,才需要關心writeCompleteCallback_;而編寫低流量的應用程式,不需要關心writeCompleteCallback_
Q3:為什麼大流量的應用程式,才需要關心writeCompleteCallback_?
A3:
大流量的應用程式將會導致記憶體被撐爆,具體分析 ==> 假設應用程式不斷的呼叫TcpConnection::send()函式想傳送資料給核心中的緩衝區,但是由於傳送的資料量非常大,將會導致核心緩衝區被填滿; 當核心緩衝區被填滿後,將會把資料暫時傳送到outputBuffer_中,但是隨著outputBuffer_不斷自增,將會撐爆記憶體。
解決方案:使用writeCompleteCallback_控制TcpConnection::send()的傳送頻率,具體步驟 ==> ①關注writeCompleteCallback_ ②當用戶呼叫TcpConnection::send()向核心緩衝區傳送資料時,通過關注writeCompleteCallback_通知使用者,使用者才能傳送下一條資料給核心緩衝區(顯然,這樣就避免了資料不斷的堆積到應用層的outputBuffer_中,不會造成outputBuffer_被撐爆。)
3、使用TcpConnection::send傳送訊息
實際上,即便使用者將資料傳送給核心中的接收緩衝區過程十分複雜,但是muduo庫已經幫我們完成了該操作
那麼,使用者只需要呼叫conn->send(___)函式就相當於將使用者程式碼中的buf傳送給了核心中的接收緩衝區!
注:send函式是執行緒安全的,可以跨執行緒呼叫
send介面根據傳送的資料型別過載了3種版本,見下:
void TcpConnection::send(const void* data, int len);
void TcpConnection::send(const StringPiece& message);
void TcpConnection::send(Buffer* buf);
send程式碼的實現最終都呼叫了sendInLoop函式,sendInLoop的整體的思想:①將使用者空間的資料data,傳送給傳送緩衝區TcpConnection::outputBuffer_ ②outputBuffer_中有資料了,就關注POLLOUT事件channel_->enableWriting();
一旦核心緩衝區有空閒位置,就將觸發POLLOUT事件進而回調TcpConnection::handleWrite函式將outputBuffer_中的資料拷貝到核心緩衝區
sendInLoop虛擬碼
void TcpConnection::sendInLoop(const void* data, size_t len)
{
size_t remaining = len; //待發送的資料、剩餘待發送的資料
if(通道沒有關注POLLOUT事件 &&outputBuffer_中沒資料)
就將資料直接傳送給核心緩衝區
remaining = len - 已經發送給核心緩衝區中的位元組數nwrote
if(資料全部發送給了核心中的緩衝區,即remaining == 0)
回撥writeCompleteCallback_
else{
if (!faultError && remaining > 0)
{
將資料寫入outputBuffer_之前,需要判斷寫入remaining之後,是否高於高水位線higWaterMark_
如果超過higWaterMark_,回撥highWaterMarkCallback_
}
將remaining個位元組的資料寫入到outputBuffer_
此時outputBuffer_中已經有資料了,就關注POLLOUT事件
/*如果核心接收緩衝區有空閒空間,就會觸發POLLOUT事件,進而
回撥TcpConnection::handleWrite()*/
}
}
sendInLoop程式碼
//先將data指向的資料,傳送len個位元組給應用層的傳送緩衝區outputBuffer_
//再啟用channel_的POLLOUT事件
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected) //狀態是沒連線
{
LOG_WARN << "disconnected, give up writing";
return;
}
//if(通道沒有關注POLLOUT事件 && 傳送緩衝區沒有資料)
// ==> 直接write到核心中的緩衝區
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
//直接write到核心中的緩衝區,寫入的位元組數為nwrote
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote; //remaining表示剩餘要寫的資料
//remaining == 0,即資料全部寫完了 ==> 回撥writeCompleteCallback_
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0 寫入失敗
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
//綜合:向outputBuffer_中再寫remaining個位元組的資料
//case 1:因為核心緩衝區滿了,導致向核心緩衝區只發送了nwrote個位元組,還剩下remaining個數據沒傳送
// 將剩下的資料存放入outputBuffer_
//case 2:if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)不成立,表示已經關注了POLLOUT事件 || outputBuffer_中有資料
// 則應當將新來的資料新增到outputBuffer_中
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes(); //獲得當前outputBuffer_中的資料大小
//將資料寫入outputBuffer_之前,需要判斷寫入remaining之後,是否高於高水位線
//如果超過higWaterMark_,回撥highWaterMarkCallback_
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
//將remaining個位元組的資料寫入到outputBuffer_
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting()) //如果沒有關注POLLOUT事件
{
//1.就關注POLLOUT事件,即說明outputBuffer_中已經有資料了
channel_->enableWriting();
//2.一旦核心緩衝區有空閒位置,就將觸發POLLOUT事件進而回
//調TcpConnection::handleWrite函式將outputBuffer_中的
//資料拷貝到核心緩衝區
}
}
}
TcpConnection::handleWrite實現細節分析:①儘可能的將outputBuffer_中的資料全部發送到核心緩衝區;②如果全部發送,則取消關注POLLOUT事件,如果連線狀態是kDisconnecting,就關閉連線;③如果沒全部發送,即使連線狀態是kDisconnecting,也不能立即關閉連線,要等待下一次POLLOUT事件觸發
//outputBuffer_有資料了&&核心接收緩衝區有空閒,才觸發POLLOUT事件
//觸發POLLOUT事件將回調handleWrite()函式
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting()) //如果關注了POLLOUT事件
{
//就將outputBuffer_中的所有的資料寫入核心緩衝區中
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
//if outputBuffer_中的資料全部寫完
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting(); //就取消關注POLLOUT事件,以免出現busy loop
//回撥writeCompleteCallback_函式
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
//傳送緩衝區已經清空&&連線狀態是kDisconnecting,就關閉連線
if (state_ == kDisconnecting)
{
//因為上面已經取消關注POLLOUT事件,所以,shutdownWrite()會執行成功
shutdownInLoop();
}
}
}
else //如果outputBuffer_中的資料沒有全部寫完
{
LOG_SYSERR << "TcpConnection::handleWrite";
// 即使連線狀態是kDisconnecting,因為剩下的資料還要繼續向對方寫
// 所以也不能立即關閉寫,即不能立即呼叫shutdownInLoop()
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
4、TcpConnection自動接收訊息
一旦POLLIN事件到來,就會回撥handleRead函式,如果資料接收成功將回調註冊的messageCallback_函式,詳細請看虛擬碼:
虛擬碼
void TcpConnection::handleRead(Timestamp receiveTime)
{
將核心傳送緩衝區的資料讀取資料,存放到inputBuffer_的writeable中
if(讀取資料成功)
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); //回撥messageCallback_
else if(對方關閉)
handleClose(); //與client斷開連線
else
handleError(); //LOG_ERROR列印錯誤日誌
}
實現程式碼
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
//從核心中讀取資料,存放到inputBuffer_中
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) //readFd讀取到資料,則回撥messageCallback_
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0) //readFd==0,表示對方已經關閉,則關閉連線
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
預設註冊的回撥函式messageCallback_,見下:
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,Buffer* buf,Timestamp)
{
buf->retrieveAll(); //清空Buffer
}
通過修改messageCallback_,實現回射伺服器,程式碼見下:
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
{
//取出readable中所有的資料,並轉換成string型別,再返回msg
string msg(buf->retrieveAllAsString());
LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toString(); //列印
//根據msg的內容,執行相應的動作
if (msg == "exit\n")
{
conn->send("bye\n");
conn->shutdown();
}
if (msg == "quit\n")
{
loop_->quit();
}
conn->send(msg); //將msg回射回去
}
5、關閉連線
//半關閉,關閉本端的寫
void shutdown(); // NOT thread safe, no simultaneous calling
//強制關閉連結,其實就是呼叫close而已。就是個close
void forceClose();
//使用計時器,定時關閉
void forceCloseWithDelay(double seconds);
6、不是很重要的函式
void startRead(); //開始監聽channel_的讀事件
void stopRead(); //停止監聽channel_的讀事件
//獲取物件內部的成員
EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; } //判斷是否處於連線的狀態
bool disconnected() const { return state_ == kDisconnected; } //判斷是否處於斷開連線的狀態
bool getTcpInfo(struct tcp_info*) const;
string getTcpInfoString() const;
7、完善TcpConnection
7.1 SIGPIPE
muduo庫幫我們巧妙地實現了忽略SIGPIPE訊號,即:在程式中定義了全域性的IgnoreSigPipe物件(那麼會在main函式之前呼叫建構函式,忽略SIGPIPE訊號)
class IgnoreSigPipe
{
public:
IgnoreSigPipe()
{
::signal(SIGPIPE, SIG_IGN);
// LOG_TRACE << "Ignore SIGPIPE";
}
};
#pragma GCC diagnostic error "-Wold-style-cast"
IgnoreSigPipe initObj; //全域性物件
} // namespace
7.2 TCP No Delay 和 TCP keepalive
禁用Nagle演算法,避免連續發包出現延遲,這對編寫低延遲網路服務很重要。
void TcpConnection::setTcpNoDelay(bool on)
{
socket_->setTcpNoDelay(on);
}
void Socket::setTcpNoDelay(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY,
&optval, static_cast<socklen_t>(sizeof optval));
// FIXME CHECK
}
keepalive:定期檢視TCP連線是否存在,一般來說,如果應用層心跳有心跳檢測的話,TCP keepalive不是必須的。
8、示例程式碼
實現了回射伺服器EchoServer
#include <muduo/net/TcpServer.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <utility>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
int numThreads = 0;
class EchoServer
{
public:
EchoServer(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "EchoServer")
{
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, _1));
server_.
相關推薦
muduo_net程式碼剖析之TcpConnection
TcpConnection類可謂是muduo最核心也是最複雜的類,它的標頭檔案和原始檔一共有450多行,是muduo最大的類。 TcpConnection是muduo裡唯一預設使用shared_ptr來管理的class,也是唯一繼承enable_shared_from_this的類,這源
muduo_net程式碼剖析之EventLoop::runInLoop()函式
runInLoop()函式的有用之處
“EventLoop有一個非常有用的功能:在它的IO執行緒內執行某個使用者任務回撥,即EventLoop::runInLoop(const Functor& cb),其中Functor是boost::function<voi
muduo_net程式碼剖析之EventLoop
TCP網路程式設計本質
TCP網路程式設計最本質的是處理三個半事件
連線建立:伺服器accept(被動)接收連線,客戶端connect(主動)發起連線
連線斷開:主動斷開(close、shutdown),被動斷開(read返回0)
訊息到達:檔案描述符可讀
muduo_net程式碼剖析之Channel
Channel簡介
Channel是Reactor結構中的“事件”,它自始至終都屬於一個EventLoop,相當於libevent中的event_base,可以將event_base安插到EventLoop上,用EventLoop監視該Channel
Channel負責
muduo_net程式碼剖析之定時器
1、Timer*類簡介
這裡涉及了3個類TimerId、Timer、TimerQueue,反映到實際使用,主要是EventLoop中的三個函式:runAt()、runAfter()、runEvery()。
簡單來說,TimerQueue是用來進行管理排程的; 而Time
muduo_net程式碼剖析之Http相關類介紹
[1] 請求類HttpRequest
注:該類僅僅是對HttpRequest中的成員屬性(即Http請求包內容)賦值,客戶端並沒有真正的向伺服器傳送請求動作。
提供了設定、獲取以下屬性變數的介面:method_(請求方法)、version_(協議版本1.0/1.1)、 st
muduo_net程式碼剖析之TcpClient
有了Connector,TcpClient的實現就不難了,它的程式碼與TcpServer甚至有幾分相似,只不過TcpClient只管理一個TcpConnection。先談幾個要點:
TcpClient具備TcpConnection斷開之後重新連線的功能,加上Connector
muduo_net程式碼剖析之聯結器Connector
Connector用於client向server主動發起連線,並有自動重連的功能 Connector只負責建立socket連線,不負責建立TcpCOnnection(下文中的TcpClient類實現TcpCOnnection的建立)
Connector
在非阻塞網路程式設
muduo_net程式碼剖析之Buffer類的設計
一、備用知識
1、為什麼TcpConnection必須要有output buffer
考慮一個常見場景:程式想通過TCP連線相對方傳送100K位元組的資料,但是write()呼叫中,作業系統只接收了80K位元組(受TCP advertised window的控制,細節見TCPv
muduo_base程式碼剖析之日誌類封裝
日誌作用
Linux C下的程式設計師,很少使用gdb除錯程式,一般都使用日誌除錯程式 程式錯誤分為:編譯時錯誤、執行時錯誤(將errno儲存到日誌中)、邏輯錯誤(最難除錯,將程式的執行狀態都存到日誌)
開發過程中 除錯錯誤 更好的理解程式:執行流程
執行過程中 診
muduo_base程式碼剖析之執行緒本地單例物件ThreadLocalSingleton
執行緒本地單例物件的封裝ThreadLocalSingleton<T>,也就是說,每一個執行緒都有一個T型別的單例物件
直接看測試程式碼
#include <muduo/base/ThreadLocalSingleton.h>
#include &l
muduo_base程式碼剖析之執行緒特定資料的封裝ThreadLocal
執行緒特定資料 (執行緒內私有的全域性變數)來源
在單執行緒程式中,我們經常要用到“全域性變數”以實現多個函式間能共享資料
在多執行緒環境下,由於資料空間是共享的,因此全域性變數也為所有執行緒共享
但有時應用程式設計中有必要提供執行緒私有的全域性變數,僅在某個執行
muduo_base程式碼剖析之執行緒安全的Sigleton模式
預備知識
什麼是單例模式? 一個類有且僅有一個例項,並且對外提供統一的訪問該例項的介面
單例模式分為餓漢式、懶漢式,對於餓漢式要保證執行緒安全需要使用double-check lock機制
muduo中實現的執行緒安全的Sigleton類,沒使用鎖,而使用效率更高
muduo_base程式碼剖析之ThreadPool執行緒池
1. 執行緒池
執行緒池的問題本質上也是生產者消費者模型問題
生產者生產產品的過程,實際上就是由程式設計師向任務佇列中新增任務的過程(需要程式設計師控制),實現程式碼見下:
1. print函式是程式設計師自己手動定義的任務函式
2. run(Task
muduo_base程式碼剖析之BlockingQueue、BoundedBlockingQueue
BlockingQueue.h 無界佇列
程式碼功能:使用條件變數+互斥鎖,實現無界佇列(即生產者消費者模型),保證對臨界資源(佇列)的訪問是安全的
BlockingQueue類的原始碼
template<typename T>
class BlockingQueu
muduo_base程式碼剖析之Thread/Mutex/MutexLockGuard /Condition/CountDownLatch
(1) 執行緒:Thread、CurrentThread
Thread.h、Thread.cc程式碼解讀
功能介紹:
建立執行緒時,把執行緒函式當作形參傳遞進去:muduo::Thread t(std::bind(threadFunc2, 42), “thread
muduo_base程式碼剖析之Timestamp、AtomicIntegerT、Exception
Timestamp類封裝
時間戳一般用來唯一地標識某一刻的時間,通常是指格林威治時間1970年01月01日00時00分00秒(北京時間1970年01月01日08時00分00秒)起至現在的總毫秒數
Timestamp類繼承自boost::less_than_compara
STL源碼剖析之allocator(1)
bsp 初始 參數 使用 分配 最大 bin 初始化 pre 空間配置器(allocator)這個概念在閱讀源碼之前我根本沒有聽過,原以為內存分配都是使用new和delete運算符(註意和operator new、placement new、operator delete以
STL源碼剖析之allocator(2)
內部 擁有 struct trait 多個 接受 rst stl -m SGI雖然定義了名為allocator的配置器,但從未使用過。SGI的allocator只是包裝了C++的::operatpor new和::operator delete,效率不高。STL中內存配置操
JDK_API剖析之java.net包
決定 字節 file 順序 close docs ddr packet 設置 為實現網絡應用程序提供類。(按照字母順序排序)
1、Authenticator
抽象類
自1.2開始有
無父類和接口
Authenticator 類表示懂得如何獲得網絡連接驗證的對象。通常,它