1. 程式人生 > >boost中asio網路庫多執行緒併發處理實現,以及asio在多執行緒模型中執行緒的排程情況和執行緒安全。

boost中asio網路庫多執行緒併發處理實現,以及asio在多執行緒模型中執行緒的排程情況和執行緒安全。

1、實現多執行緒方法:

其實就是多個執行緒同時呼叫io_service::run

        for (int i = 0; i != m_nThreads; ++i)
        {
            boost::shared_ptr<boost::thread> pTh(new boost::thread(
                boost::bind(&boost::asio::io_service::run,&m_ioService)));
            m_listThread.push_back(pTh);
        }

2、多執行緒排程情況:

asio規定:只能在呼叫io_service::run的執行緒中才能呼叫事件完成處理器。

注:事件完成處理器就是你async_accept、async_write等註冊的控制代碼,類似於回撥的東西。

單執行緒:

如果只有一個執行緒呼叫io_service::run,根據asio的規定,事件完成處理器也只能在這個執行緒中執行。也就是說,你所有程式碼都在同一個執行緒中執行,因此變數的訪問是安全的。

多執行緒:

如果有多個執行緒同時呼叫io_service::run以實現多執行緒併發處理。對於asio來說,這些執行緒都是平等的,沒有主次之分。如果你投遞的一個請求比如async_write完成時,asio將隨機的啟用呼叫io_service::run的執行緒。並在這個執行緒中呼叫事件完成處理器(async_write當時註冊的控制代碼)。如果你的程式碼耗時較長,這個時候你投遞的另一個async_write請求完成時,asio將不等待你的程式碼處理完成,它將在另外的一個呼叫io_service::run執行緒中,呼叫async_write當時註冊的控制代碼。也就是說,你註冊的事件完成處理器有可能同時在多個執行緒中呼叫。

當然你可以使用 boost::asio::io_service::strand讓完成事件處理器的呼叫,在同一時間只有一個, 比如下面的的程式碼:

  socket_.async_read_some(boost::asio::buffer(buffer_),
      strand_.wrap(
        boost::bind(&connection::handle_read, shared_from_this(),
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred)));

...

boost::asio::io_service::strand strand_;

此時async_read_som完成後掉用handle_read時,必須等待其它handle_read呼叫完成時才能被執行(async_read_som引起的handle_read呼叫)。

      多執行緒呼叫時,還有一個重要的問題,那就是無序化。比如說,你短時間內投遞多個async_write,那麼完成處理器的呼叫並不是按照你投遞async_write的順序呼叫的。asio第一次呼叫完成事件處理器,有可能是第二次async_write返回的結果,也有可能是第3次的。使用strand也是這樣的。strand只是保證同一時間只執行一個完成處理器,但它並不保證順序。

程式碼測試:

伺服器:

將下面的程式碼編譯以後,使用cmd命令提示符下傳人蔘數<IP> <port> <threads>呼叫

比如:test.exe 0.0.0.0 3005 10   

客服端 使用windows自帶的telnet

cmd命令提示符:

telnet 127.0.0.1 3005

原理:客戶端連線成功後,同一時間呼叫100次boost::asio::async_write給客戶端傳送資料,並且在完成事件處理器中列印呼叫序號,和執行緒ID。

核心程式碼:

    void start()
    {
        for (int i = 0; i != 100; ++i)
        {
            boost::shared_ptr<string> pStr(new string);
            *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
            *pStr += "\r\n";
            boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
                boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred,
                 pStr,i)
                );
        }
    }

//去掉 boost::mutex::scoped_lock lk(m_ioMutex); 效果更明顯。

    void HandleWrite(const boost::system::error_code& error
        ,std::size_t bytes_transferred
        ,boost::shared_ptr<string> pStr,int nIndex)
    {
        if (!error)
        {
            boost::mutex::scoped_lock lk(m_ioMutex);
            cout << "傳送序號=" << nIndex << ",執行緒id=" << boost::this_thread::get_id() << endl;
        }
        else
        {
            cout << "連線斷開" << endl;
        }
    }

完整程式碼:

#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <string>
#include <iostream>


using std::cout;
using std::endl;
using std::string;
using boost::asio::ip::tcp;


class CMyTcpConnection
    : public boost::enable_shared_from_this<CMyTcpConnection>
{
public:
    CMyTcpConnection(boost::asio::io_service &ser)
        :m_nSocket(ser)
    {
    }
    typedef boost::shared_ptr<CMyTcpConnection> CPMyTcpCon;


    static CPMyTcpCon CreateNew(boost::asio::io_service& io_service)
    {
        return CPMyTcpCon(new CMyTcpConnection(io_service));
    }


   
public:
    void start()
    {
        for (int i = 0; i != 100; ++i)
        {
            boost::shared_ptr<string> pStr(new string);
            *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
            *pStr += "\r\n";
            boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
                boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred,
                 pStr,i)
                );
        }
    }
    tcp::socket& socket()
    {
        return m_nSocket;
    }
private:
    void HandleWrite(const boost::system::error_code& error
        ,std::size_t bytes_transferred
        ,boost::shared_ptr<string> pStr,int nIndex)
    {
        if (!error)
        {
            boost::mutex::scoped_lock lk(m_ioMutex);
            cout << "傳送序號=" << nIndex << ",執行緒id=" << boost::this_thread::get_id() << endl;
        }
        else
        {
            cout << "連線斷開" << endl;
        }
    }
private:
    tcp::socket m_nSocket;
    boost::mutex m_ioMutex;
};


class CMyService
    : private boost::noncopyable
{
public:
    CMyService(string const &strIP,string const &strPort,int nThreads)
        :m_tcpAcceptor(m_ioService)
        ,m_nThreads(nThreads)
    {
        tcp::resolver resolver(m_ioService);
        tcp::resolver::query query(strIP,strPort);
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
        m_tcpAcceptor.open(endpoint.protocol());
        m_tcpAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        m_tcpAcceptor.bind(endpoint);
        m_tcpAcceptor.listen();


        StartAccept();
    }
    ~CMyService(){Stop();}
public:
    void Stop() 
    { 
        m_ioService.stop();
        for (std::vector<boost::shared_ptr<boost::thread>>::const_iterator it = m_listThread.cbegin();
            it != m_listThread.cend(); ++ it)
        {
            (*it)->join();
        }
    }
    void Start()
    {
        for (int i = 0; i != m_nThreads; ++i)
        {
            boost::shared_ptr<boost::thread> pTh(new boost::thread(
                boost::bind(&boost::asio::io_service::run,&m_ioService)));
            m_listThread.push_back(pTh);
        }
    }
private:
    void HandleAccept(const boost::system::error_code& error
        ,boost::shared_ptr<CMyTcpConnection> newConnect)
    {
        if (!error)
        {
            newConnect->start();
        }
        StartAccept();
    }


    void StartAccept()
    {
        CMyTcpConnection::CPMyTcpCon newConnect = CMyTcpConnection::CreateNew(m_tcpAcceptor.get_io_service());
        m_tcpAcceptor.async_accept(newConnect->socket(),
            boost::bind(&CMyService::HandleAccept, this,
            boost::asio::placeholders::error,newConnect));
    }
private:
    boost::asio::io_service m_ioService;
    boost::asio::ip::tcp::acceptor m_tcpAcceptor;
    std::vector<boost::shared_ptr<boost::thread>> m_listThread;
    std::size_t m_nThreads;
};


int main(int argc, char* argv[])
{
    try
    {
        if (argc != 4)
        {
            std::cerr << "<IP> <port> <threads>\n";
            return 1;
        }
        int nThreads = boost::lexical_cast<int>(argv[3]);
        CMyService mySer(argv[1],argv[2],nThreads);
        mySer.Start();
        getchar();
        mySer.Stop();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }
    return 0;
}

測試發現和上面的理論是一致的,傳送序號是亂的,執行緒ID也不是同一個。

asio多執行緒中執行緒的合理個數:

作為伺服器,在不考慮省電的情況下,應該儘可能的使用cpu。也就是說,為了讓cpu都忙起來,你的執行緒個數應該大於等於你電腦的cpu核心數(一個核心執行一個執行緒)。具體的值沒有最優方案,大多數人使用cpu核心數*2 + 2的這種方案,但它不一定適合你的情況。

asio在windows xp等系統中的實現:

asio在windows下使用完成埠,如果你投遞的請求沒有完成,那麼這些執行緒都在等待GetQueuedCompletionStatus的返回,也就是等待核心物件,此時執行緒是不佔用cpu時間的。