1. 程式人生 > >muduo庫使用示例之聊天伺服器(上)

muduo庫使用示例之聊天伺服器(上)

本文程式碼存放在muduo\examples\asio\chat目錄下

聊天伺服器示意圖

實現的功能:任何一個Client給Server傳送訊息後,Server都會將該訊息回射給連線上來的所有Client
在這裡插入圖片描述

muduo實現一個聊天室伺服器,客戶傳送的訊息將廣播到連入的所有客戶(包括自己)。

程式的執行流程以及時序圖:
在這裡插入圖片描述

  1. 當Server接收到Client傳送的訊息後,將回調註冊的LengthHeaderCodec::onMessage函式,onMessage函式將對接收到的資料報進行解析[包頭+包體],解析完成後再回調註冊的StringMessageCallback messageCallback_函式(註冊的是ChatServer::onStringMessage)。
  2. 在ChatServer::onStringMessage呼叫了LengthHeaderCodec::send函式,send封裝成[包頭+包體],傳送。

1. 訊息編碼類:LengthHeaderCodec

訊息的位元組流定義成這種形式 0xXX 0xXX 0xXX 0xXX XXXXXX,前面4個位元組表示訊息的長度,後面是訊息實體。
muduo作者選擇自己編寫一個工具類:編解碼器LengthHeaderCodec,該類只有兩個public成員函式,分別為:onMessage、send,這兩個函式都是通過回撥呼叫的。

#ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
#define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H #include <muduo/base/Logging.h> #include <muduo/net/Buffer.h> #include <muduo/net/Endian.h> #include <muduo/net/TcpConnection.h> class LengthHeaderCodec : muduo::noncopyable { public: typedef std::function<void (const muduo::net::TcpConnectionPtr&
, const muduo::string& message, muduo::Timestamp)> StringMessageCallback; //建構函式 explicit LengthHeaderCodec(const StringMessageCallback& cb) : messageCallback_(cb) //註冊callback_:回覆所有客戶端的 { } //解析[包頭+包體] void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp receiveTime) { //判斷接收到的資料是否超過了4個位元組(包頭長度) while (buf->readableBytes() >= kHeaderLen) { //取出(前4個位元組),得到包體的有效字串的長度(len) //FIXME:use Buffer::peekInt32() const void* data = buf->peek(); //peek,偷看資料,並沒有將緩衝區中的資料讀走 int32_t be32 = *static_cast<const int32_t*>(data); // SIGBUS const int32_t len = muduo::net::sockets::networkToHost32(be32); if (len > 65536 || len < 0) { LOG_ERROR << "Invalid length " << len; conn->shutdown(); // FIXME: disable reading break; } else if (buf->readableBytes() >= len + kHeaderLen) //一條完整的訊息 { buf->retrieve(kHeaderLen);//先移走4個位元組的len muduo::string message(buf->peek(), len);//拷貝len長度的有效字串到message //呼叫回撥函式,向所有的客戶端回覆訊息 messageCallback_(conn, message, receiveTime); buf->retrieve(len); } else //未達到一條完整的訊息 { break; } } } //封裝成[包頭+包體],傳送 void send(muduo::net::TcpConnection* conn, const muduo::StringPiece& message) { muduo::net::Buffer buf; buf.append(message.data(), message.size()); int32_t len = static_cast<int32_t>(message.size()); int32_t be32 = muduo::net::sockets::hostToNetwork32(len); buf.prepend(&be32, sizeof be32); conn->send(&buf); } private: StringMessageCallback messageCallback_; const static size_t kHeaderLen = sizeof(int32_t); //包頭長度,4位元組 }; #endif // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H

2. 伺服器的實現

主執行緒有一個main Reactor負責accept連線,然後把已連線套接字掛在某個sub Reactor中(I/O Thread),至於怎麼選擇,以達到每個工作執行緒的“負載均衡”,muduo採用round-robin的方式。
在這裡插入圖片描述

結論:由於mutex的存在,多執行緒並不能併發執行,而是序列的,分析原因:

  1. C1向伺服器傳送一條訊息hello,伺服器通過一個IO執行緒轉發給所有的客戶端
  2. 與此同時(假設此時伺服器還沒有將hello全部轉發給所有的客戶端),C2又向伺服器傳送一條訊息world,那麼伺服器將通過另一個IO執行緒轉發給所有的客戶端,但是由於鎖的存在,(必須等到第一個IO執行緒將hello全部發送給客戶端之後,另一個IO執行緒才能進入臨界區將world再發送給客戶端),這兩個執行緒並不能併發執行,而是序列的。
  3. 假設客戶端傳送的資料包很大,第一個IO執行緒將資料傳送給客戶端的事件很長,將導致第二個IO執行緒一直阻塞 ==> 這樣設計的效率非常低下。
#include "codec.h"

#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpServer.h>

#include <set>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class ChatServer : noncopyable
{
private:
  typedef std::set<TcpConnectionPtr> ConnectionList;
  TcpServer server_;
  LengthHeaderCodec codec_;  //包含工具類成員變數:訊息編解碼
  MutexLock mutex_;
  ConnectionList connections_ GUARDED_BY(mutex_); //連線列表,存放著所有連線上來的client

public:
  ChatServer(EventLoop* loop,
             const InetAddress& listenAddr)
  : server_(loop, listenAddr, "ChatServer"),
    //給codec_繫結ChatServer::onStringMessage
    codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
  {
    server_.setConnectionCallback(
        std::bind(&ChatServer::onConnection, this, _1));
    server_.setMessageCallback( //註冊訊息到來時的回撥函式LengthHeaderCodec::onMessage
        std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
  }

  //numThreads sub reactors
  void setThreadNum(int numThreads)
  {
    server_.setThreadNum(numThreads); 
  }

  void start()
  {
    server_.start();
  }

private:
//新的client連線到來,導致可寫事件發生,則建立連線
  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_INFO << conn->localAddress().toIpPort() << " -> "
        << conn->peerAddress().toIpPort() << " is "
        << (conn->connected() ? "UP" : "DOWN");

    //不只有一個IO執行緒,因而這裡的connections_需要mutex保護
    MutexLockGuard lock(mutex_);
    if (conn->connected())
    {
      connections_.insert(conn);
    }
    else
    {
      connections_.erase(conn);
    }
  }

//當client給伺服器傳送的資料到來後,伺服器將採用RR選擇一個IO執行緒去處理該資料
//處理過程:
  //1.在LengthHeaderCodec::onMessage中又會呼叫onStringMessage
  //2.在onStringMessage中又會呼叫LengthHeaderCodec::send
  void onStringMessage(const TcpConnectionPtr&,
                       const string& message,
                       Timestamp)
  {
    //有多個IO執行緒,因而這裡的connections_需要用mutex保護
    MutexLockGuard lock(mutex_);
    //轉發訊息給所有的客戶端
    for (ConnectionList::iterator it = connections_.begin();
        it != connections_.end();
        ++it)
        
    {
      codec_.send(get_pointer(*it), message);
    }
  }
};

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid();
  if (argc > 1)
  {
    EventLoop loop;
    uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
    InetAddress serverAddr(port);
    ChatServer server(&loop, serverAddr);
    if (argc > 2)
    {
      server.setThreadNum(atoi(argv[2]));
    }
    server.start();
    loop.loop();
  }
  else
  {
    printf("Usage: %s port [thread_num]\n", argv[0]);
  }
}

3.客戶端程式碼

兩個執行緒,一個執行緒用來從標準輸入讀入傳送的訊息,另外一個執行緒用Reactor處理網路I/O,這裡用兩個執行緒的原因是因為作者沒有把標準輸入輸出加入到Reactor的想法,在UNP的單執行緒Reactor中有管理0,1,2(標準輸入、標準輸出、標準錯誤)監聽讀入鍵盤資料的示例。

#include "codec.h"

#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpClient.h>

#include <iostream>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class ChatClient : noncopyable
{
 public:
  ChatClient(EventLoop* loop, const InetAddress& serverAddr)
    : client_(loop, serverAddr, "ChatClient"),
      codec_(std::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
  {
    client_.setConnectionCallback(
        std::bind(&ChatClient::onConnection, this, _1));
    client_.setMessageCallback(
        std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
    client_.enableRetry();
  }

  void connect()
  {
    client_.connect();
  }

  void disconnect()
  {
    client_.disconnect();
  }

  //在主執行緒中傳送資料
  void write(const StringPiece& message)
  {
    MutexLockGuard lock(mutex_);
    if (connection_)
    {
      codec_.send(get_pointer(connection_), message);
    }
  }

 private:
  //該函式在IO執行緒中執行,IO執行緒與主執行緒不在同一個執行緒
  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_INFO << conn->localAddress().toIpPort() << " -> "
             << conn->peerAddress().toIpPort() << " is "
             << (conn->connected() ? "UP" : "DOWN");

	//mutex用來保護connection_這個shared_ptr
    MutexLockGuard lock(mutex_);
    if (conn->connected())
    {
      connection_ = conn;
    }
    else
    {
      connection_.reset();
    }
  }

  //IO執行緒:接收道資料
  void onStringMessage(const TcpConnectionPtr&,
                       const string& message,
                       Timestamp)
  {
    printf("<<< %s\n", message.c_str());
  }

  TcpClient client_;
  LengthHeaderCodec codec_;
  MutexLock mutex_;
  TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid();
  if (argc > 2)
  {
    EventLoopThread loopThread; //建立一個IO執行緒,用於與Server通訊
    uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
    InetAddress serverAddr(argv[1], port);

	
    ChatClient client(loopThread.startLoop(), serverAddr);
    client.connect();
	
	//主執行緒
    std::string line;
    while (std::getline(std::cin, line)) //從鍵盤獲取資料
    {
      client.write(line); //傳送資料
    }
    client.disconnect();
    CurrentThread::sleepUsec(1000*1000);  // wait for disconnect, see ace/logging/client.cc
  }
  else
  {
    printf("Usage: %s host_ip port\n", argv[0]);
  }
}