muduo庫使用示例之聊天伺服器(上)
阿新 • • 發佈:2018-12-05
本文程式碼存放在muduo\examples\asio\chat目錄下
聊天伺服器示意圖
實現的功能:任何一個Client給Server傳送訊息後,Server都會將該訊息回射給連線上來的所有Client
muduo實現一個聊天室伺服器,客戶傳送的訊息將廣播到連入的所有客戶(包括自己)。
程式的執行流程以及時序圖:
- 當Server接收到Client傳送的訊息後,將回調註冊的LengthHeaderCodec::onMessage函式,onMessage函式將對接收到的資料報進行解析[包頭+包體],解析完成後再回調註冊的StringMessageCallback messageCallback_函式(註冊的是ChatServer::onStringMessage)。
- 在ChatServer::onStringMessage呼叫了LengthHeaderCodec::send函式,send封裝成[包頭+包體],傳送。
1. 訊息編碼類:LengthHeaderCodec
訊息的位元組流定義成這種形式 0xXX 0xXX 0xXX 0xXX XXXXXX,前面4個位元組表示訊息的長度,後面是訊息實體。
muduo作者選擇自己編寫一個工具類:編解碼器LengthHeaderCodec,該類只有兩個public成員函式,分別為:onMessage、send,這兩個函式都是通過回撥呼叫的。
#ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
#define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
#include <muduo/base/Logging.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/Endian.h>
#include <muduo/net/TcpConnection.h>
class LengthHeaderCodec : muduo::noncopyable
{
public:
typedef std::function<void (const muduo::net::TcpConnectionPtr& ,
const muduo::string& message,
muduo::Timestamp)> StringMessageCallback;
//建構函式
explicit LengthHeaderCodec(const StringMessageCallback& cb)
: messageCallback_(cb) //註冊callback_:回覆所有客戶端的
{
}
//解析[包頭+包體]
void onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp receiveTime)
{
//判斷接收到的資料是否超過了4個位元組(包頭長度)
while (buf->readableBytes() >= kHeaderLen)
{
//取出(前4個位元組),得到包體的有效字串的長度(len)
//FIXME:use Buffer::peekInt32()
const void* data = buf->peek(); //peek,偷看資料,並沒有將緩衝區中的資料讀走
int32_t be32 = *static_cast<const int32_t*>(data); // SIGBUS
const int32_t len = muduo::net::sockets::networkToHost32(be32);
if (len > 65536 || len < 0)
{
LOG_ERROR << "Invalid length " << len;
conn->shutdown(); // FIXME: disable reading
break;
}
else if (buf->readableBytes() >= len + kHeaderLen) //一條完整的訊息
{
buf->retrieve(kHeaderLen);//先移走4個位元組的len
muduo::string message(buf->peek(), len);//拷貝len長度的有效字串到message
//呼叫回撥函式,向所有的客戶端回覆訊息
messageCallback_(conn, message, receiveTime);
buf->retrieve(len);
}
else //未達到一條完整的訊息
{
break;
}
}
}
//封裝成[包頭+包體],傳送
void send(muduo::net::TcpConnection* conn,
const muduo::StringPiece& message)
{
muduo::net::Buffer buf;
buf.append(message.data(), message.size());
int32_t len = static_cast<int32_t>(message.size());
int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
buf.prepend(&be32, sizeof be32);
conn->send(&buf);
}
private:
StringMessageCallback messageCallback_;
const static size_t kHeaderLen = sizeof(int32_t); //包頭長度,4位元組
};
#endif // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
2. 伺服器的實現
主執行緒有一個main Reactor負責accept連線,然後把已連線套接字掛在某個sub Reactor中(I/O Thread),至於怎麼選擇,以達到每個工作執行緒的“負載均衡”,muduo採用round-robin的方式。
結論:由於mutex的存在,多執行緒並不能併發執行,而是序列的,分析原因:
- C1向伺服器傳送一條訊息hello,伺服器通過一個IO執行緒轉發給所有的客戶端
- 與此同時(假設此時伺服器還沒有將hello全部轉發給所有的客戶端),C2又向伺服器傳送一條訊息world,那麼伺服器將通過另一個IO執行緒轉發給所有的客戶端,但是由於鎖的存在,(必須等到第一個IO執行緒將hello全部發送給客戶端之後,另一個IO執行緒才能進入臨界區將world再發送給客戶端),這兩個執行緒並不能併發執行,而是序列的。
- 假設客戶端傳送的資料包很大,第一個IO執行緒將資料傳送給客戶端的事件很長,將導致第二個IO執行緒一直阻塞 ==> 這樣設計的效率非常低下。
#include "codec.h"
#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpServer.h>
#include <set>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatServer : noncopyable
{
private:
typedef std::set<TcpConnectionPtr> ConnectionList;
TcpServer server_;
LengthHeaderCodec codec_; //包含工具類成員變數:訊息編解碼
MutexLock mutex_;
ConnectionList connections_ GUARDED_BY(mutex_); //連線列表,存放著所有連線上來的client
public:
ChatServer(EventLoop* loop,
const InetAddress& listenAddr)
: server_(loop, listenAddr, "ChatServer"),
//給codec_繫結ChatServer::onStringMessage
codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
{
server_.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback( //註冊訊息到來時的回撥函式LengthHeaderCodec::onMessage
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}
//numThreads sub reactors
void setThreadNum(int numThreads)
{
server_.setThreadNum(numThreads);
}
void start()
{
server_.start();
}
private:
//新的client連線到來,導致可寫事件發生,則建立連線
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
//不只有一個IO執行緒,因而這裡的connections_需要mutex保護
MutexLockGuard lock(mutex_);
if (conn->connected())
{
connections_.insert(conn);
}
else
{
connections_.erase(conn);
}
}
//當client給伺服器傳送的資料到來後,伺服器將採用RR選擇一個IO執行緒去處理該資料
//處理過程:
//1.在LengthHeaderCodec::onMessage中又會呼叫onStringMessage
//2.在onStringMessage中又會呼叫LengthHeaderCodec::send
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
//有多個IO執行緒,因而這裡的connections_需要用mutex保護
MutexLockGuard lock(mutex_);
//轉發訊息給所有的客戶端
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end();
++it)
{
codec_.send(get_pointer(*it), message);
}
}
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 1)
{
EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&loop, serverAddr);
if (argc > 2)
{
server.setThreadNum(atoi(argv[2]));
}
server.start();
loop.loop();
}
else
{
printf("Usage: %s port [thread_num]\n", argv[0]);
}
}
3.客戶端程式碼
兩個執行緒,一個執行緒用來從標準輸入讀入傳送的訊息,另外一個執行緒用Reactor處理網路I/O,這裡用兩個執行緒的原因是因為作者沒有把標準輸入輸出加入到Reactor的想法,在UNP的單執行緒Reactor中有管理0,1,2(標準輸入、標準輸出、標準錯誤)監聽讀入鍵盤資料的示例。
#include "codec.h"
#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpClient.h>
#include <iostream>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatClient : noncopyable
{
public:
ChatClient(EventLoop* loop, const InetAddress& serverAddr)
: client_(loop, serverAddr, "ChatClient"),
codec_(std::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
{
client_.setConnectionCallback(
std::bind(&ChatClient::onConnection, this, _1));
client_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
client_.enableRetry();
}
void connect()
{
client_.connect();
}
void disconnect()
{
client_.disconnect();
}
//在主執行緒中傳送資料
void write(const StringPiece& message)
{
MutexLockGuard lock(mutex_);
if (connection_)
{
codec_.send(get_pointer(connection_), message);
}
}
private:
//該函式在IO執行緒中執行,IO執行緒與主執行緒不在同一個執行緒
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
//mutex用來保護connection_這個shared_ptr
MutexLockGuard lock(mutex_);
if (conn->connected())
{
connection_ = conn;
}
else
{
connection_.reset();
}
}
//IO執行緒:接收道資料
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
printf("<<< %s\n", message.c_str());
}
TcpClient client_;
LengthHeaderCodec codec_;
MutexLock mutex_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 2)
{
EventLoopThread loopThread; //建立一個IO執行緒,用於與Server通訊
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress serverAddr(argv[1], port);
ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();
//主執行緒
std::string line;
while (std::getline(std::cin, line)) //從鍵盤獲取資料
{
client.write(line); //傳送資料
}
client.disconnect();
CurrentThread::sleepUsec(1000*1000); // wait for disconnect, see ace/logging/client.cc
}
else
{
printf("Usage: %s host_ip port\n", argv[0]);
}
}