1. 程式人生 > >boost 單io_serverce 非同步多執行緒資源保護程式碼

boost 單io_serverce 非同步多執行緒資源保護程式碼

伺服器:

// ConsoleApplication12.cpp : 定義控制檯應用程式的入口點。
//

#include "stdafx.h"

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <thread>
#include <boost/bind.hpp>
using boost::asio::ip::tcp;
#include<mutex>
std::mutex lock;
class session
	: public std::enable_shared_from_this < session >
{
public:
	session(tcp::socket socket)
		: socket_(std::move(socket))
	{
	}
	~session()
	{
		//std::cout << "~senssion" << std::endl;
	}
	void start()
	{
		do_read();
	}

private:
	void do_read()
	{
		//
		
		static long cnt = 0;
		auto self(shared_from_this());
		socket_.async_read_some(boost::asio::buffer(data_, max_length),
			[this,self](boost::system::error_code ec, std::size_t length)
		{
			if (!ec)
			{
				do_write(length);
			}
			//lock.lock();
			{
				std::unique_lock<std::mutex> l(lock);
				cnt++;
				std::this_thread::sleep_for(std::chrono::milliseconds(50));
				std::cout << "id=" << std::this_thread::get_id() <<"cnt="<<cnt<< std::endl;
				std::this_thread::sleep_for(std::chrono::milliseconds(100));
				cnt--;
			}
			
			//lock.unlock();
			
		});
		
	}

	void do_write(std::size_t length)
	{
		auto self(shared_from_this());
		boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
			[this,self](boost::system::error_code ec, std::size_t /*length*/)
		{
			if (!ec)
			{
				do_read();
			}
		});
	}

	tcp::socket socket_;
	enum { max_length = 1024 };
	char data_[max_length];
};

class server
{
public:
	server(boost::asio::io_service& io_service, short port)
		: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
		socket_(io_service)
	{
		do_accept();
	}

private:
	void do_accept()
	{
		acceptor_.async_accept(socket_,
			[this](boost::system::error_code ec)
		{
			if (!ec)
			{
				std::make_shared<session>(std::move(socket_))->start();
			}

			do_accept();
		});
	}

	tcp::acceptor acceptor_;
	tcp::socket socket_;
};
boost::asio::io_service io_service;
int main1()
{
	try
	{
		
		

		server s(io_service, 8001);
		std::thread work1(boost::bind(&boost::asio::io_service::run, &io_service));
		std::thread work2(boost::bind(&boost::asio::io_service::run, &io_service));
		std::thread work3(boost::bind(&boost::asio::io_service::run, &io_service));
		std::thread work4(boost::bind(&boost::asio::io_service::run, &io_service));
		work1.join();
		work2.join();
		work3.join();
		work4.join();
	}
	catch (std::exception& e)
	{
		std::cerr << "Exception: " << e.what() << "\n";
	}

	return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
	main1();
	return 0;
}


測試客戶端:

// async_Client.cpp : 定義控制檯應用程式的入口點。
//

#include "stdafx.h"
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <boost/bind.hpp>
#include <boost\enable_shared_from_this.hpp>
#include <boost\noncopyable.hpp>
#include <boost\format.hpp>
#include <vector>
using namespace boost::asio;
#define MEM_FN(x)       boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y)    boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z)  boost::bind(&self_type::x, shared_from_this(),y,z)
io_service service;
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, boost::noncopyable {
	typedef talk_to_svr self_type;
	talk_to_svr(const std::string & message) : sock_(service), started_(true), message_(message) {}
	void start(ip::tcp::endpoint ep) {
		sock_.async_connect(ep, MEM_FN1(on_connect, _1));
	}
public:
	typedef boost::system::error_code error_code;
	typedef boost::shared_ptr<talk_to_svr> ptr;
	static ptr start(ip::tcp::endpoint ep, const std::string &message) {
		ptr new_(new talk_to_svr(message));
		new_->start(ep);
		return new_;
	}
	void stop() {
		if (!started_) return;
		started_ = false;
		sock_.close();
	}
	bool started() { return started_; }
	void do_read() {
		async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete, _1, _2), MEM_FN2(on_read, _1, _2));
	}
	void do_write(const std::string & msg) {
		if (!started()) return;
		std::copy(msg.begin(), msg.end(), write_buffer_);
		sock_.async_write_some(buffer(write_buffer_, msg.size()), MEM_FN2(on_write, _1, _2));
	}
	size_t read_complete(const boost::system::error_code & err, size_t bytes) {
		if (err) return 0;
		bool found = std::find(read_buffer_, read_buffer_ + bytes, '.') < read_buffer_ + bytes;
		// 我們一個一個讀取直到讀到回車,不快取
		return found ? 0 : 1;
	}
	void on_connect(const error_code & err) {
		if (!err)      do_write(message_ + "\n");
		else            stop();
	}
	void on_read(const error_code & err, size_t bytes) {
		if (!err) {
			std::string copy(read_buffer_, bytes - 1);
			std::cout << "server echoed our " << message_ << ": " << (copy == message_ ? "OK" : "FAIL") << std::endl;
		}
		do_write(message_);
	}
	void on_write(const error_code & err, size_t bytes) {
		do_read();
	}
private:
	ip::tcp::socket sock_;
	enum { max_msg = 1024 };
	char read_buffer_[max_msg];
	char write_buffer_[max_msg];
	bool started_;
	std::string message_;
};
void test(ip::tcp::endpoint ep, const std::string &message)
{
	talk_to_svr::start(ep, message);
	service.run();
}
void sum() {
	std::cout << 3 << std::endl;
}


int _tmain(int argc, _TCHAR* argv[])
{
#define THREADNUM 100
	try{
		ip::tcp::endpoint e(ip::address::from_string("127.0.0.1"), 8001);
		boost::format fat("id= %1%.");
		std::vector<std::shared_ptr<std::thread>> ar;
		std::shared_ptr<std::thread> thread_ptr;
		for (int i = 0; i < THREADNUM; i++)
		{
			fat%i;
			thread_ptr = std::make_shared<std::thread>(boost::bind(test, e, fat.str()));
			ar.push_back(thread_ptr);
		}
		for (int i = 0; i < THREADNUM; i++)
		{
			ar[i]->join();
		}
	}
	catch (std::exception &e)
	{
	
	std::cout << e.what() << std::endl;
	}
	

	return 0;
}


相關推薦

boost io_serverce 非同步執行資源保護程式碼

伺服器:// ConsoleApplication12.cpp : 定義控制檯應用程式的入口點。 // #include "stdafx.h" #include <cstdlib> #include <iostream> #include <

【Python】執行非同步執行程序例項

上一篇文章主要介紹了多工場景下單執行緒非同步、多執行緒、多程序如何選擇,連結:多工場景下單執行緒非同步多執行緒多程序 這裡主要通過三個例項去驗證一下簡單的多工場景下,三種方式的耗時情況,假設有10個互不關聯的10個任務 ''''''''' 多程序版本: 使用多程序,時間比多執行緒更慢,為什麼

【精】【執行】ListenableFuture非同步執行查詢實現

  業務場景:為優化查詢效率,將原有查詢的條件做成單獨的索引表,每次產生記錄就會同步到索引表中,每次查詢索引表,根據索引便利的條件欄位再分別查詢每張子表的內容,最後封裝成前臺要的實體類。這裡面涉及到非同步查詢,如何保證一條記錄下的子表全部都查出來後才執行下面的操作。 下面Demo簡

例模式執行

單例模式的多執行緒         比如使用者點選   程式正在操作這條資料的過程中  然後又有一個人點選  又把這條資料修改了  多執行緒會發生的問題 這時候 當第一個使用者點選後&nbs

Boost(六)——執行

結合Boost官網 多執行緒的難點在於同步執行,需要“鎖”控制所有權。 鎖有分:互斥鎖,條件變數... 互斥鎖:boost::mutex 獲取和釋放成對存在,也可以用boost::lock_guard<boost::mutex> lock(mutex); boost::l

C#非同步執行總結(delegate、Thread、Task、ThreadPool、Parallel、async、cancel)

同步與非同步多執行緒的區別: 1、同步方法卡介面(UI執行緒忙於計算);非同步多執行緒不卡介面(主執行緒閒置,子執行緒在計算) 2、同步方法慢(CPU利用率低、資源耗費少);非同步多執行緒快(CPU利用率高、資源耗費多) 3、同步方法是有序的;非同步方法是無序的(啟動無序、執行時間不確定、結

非同步執行效能小結

非同步委託效能小結 非同步多執行緒的三大特點。 同步卡介面,UI執行緒被佔用;非同步多執行緒不卡介面,UI執行緒空閒,計算任務交給了執行緒 同步方法慢,因為只有一個執行緒幹活,非同步多執行緒方法快,因為多個執行緒併發計算。這裡也會消耗更多的資源,不是執行緒的線性關係,不

【07】例VS執行

 還是套路問題,一種思想而已,兩種方式   1 dubble check instance   2 static inner class   兩次檢測加類鎖   靜態內部類,其實就是餓漢模式,直接給你就好了   package Concurre

12 非同步執行(二)Thread,ThreadPool,Task

一.Thread 1.Thread 是framework1.0時候就存在的,可以用TreadStart來啟動多執行緒。 Stopwatch watch = new Stopwatch();//計時器 watch.Start(); Console.WriteLine($"*

11 非同步執行(一)

任何的非同步多執行緒,都是和委託相關,沒有委託,啥也沒有。 BeginInvoke在 C#裡面,就是啟動一個執行緒完成任務。 用設定斷點的方法來除錯的非同步多執行緒,是行不通的,只有多寫一些日誌或者輸出文字資訊到控制檯程式上。 如果要想看到控制檯程式一樣的介面輸出結果,

併發 並行 同步 非同步 執行的區別

1. 併發:在作業系統中,是指一個時間段中有幾個程式都處於已啟動執行到執行完畢之間,且這幾個程式都是在同一個處理機上執行。其中兩種併發關係分別是同步和互斥 2. 互斥:程序間相互排斥的使用臨界資源的現象,就叫互斥。 3. 同步:程序之間的關係不是相互排斥臨界資源的關係,而是相

C# 執行資源克隆解決方式及其應用

背景:多執行緒中的共享資源處理常用的方法是加鎖,但是加鎖是的任務處理由並行處理程式設計了序列處理大大降低了多執行緒的效率,這裡介紹另一種處理多執行緒共享資源的處理方式克隆,介紹在C#中克隆解決多執行緒問題的示例和其他開源框架中使用這種思想實現的一些功能,歡迎大家留言交流。

工場景下單執行非同步執行程序

多工的場景:1.爬取不同url的內容,爬取同一個url分頁內容。比如:豆瓣圖書 Top 250 https://book.douban.com/top250?start=0 實現豆瓣圖書Top250的抓取工作,並存入excel中,如果採用的序列爬取方式,每次爬完250頁都需要花費7到8分鐘,顯然讓人

例和執行

public class ConnThreadLocal { public static ThreadLocal<String> th = new ThreadLocal<String>(); public void setTh(String value){

Boostboost庫中thread執行詳解5——談談執行中斷

執行緒不是在任意時刻都可以被中斷的。如果將執行緒中函式中的sleep()睡眠等待去掉,那麼即使在主執行緒中呼叫interrupt()執行緒也不會被中斷。 thread庫預定義了若干個執行緒的中斷點,只有當執行緒執行到中斷點的時候才能被中斷,一個執行緒可以擁有任意多箇中斷點。

Boostboost庫中thread執行詳解3——細說lock_guard

boost::lock_guard可以說是一種比boost::unique_lock輕量級的lock, 簡單一些場景可以用它就行了。 看看它的原始碼也很簡單:template<typename Mutex> class lock_guard { private:

Boostboost庫中thread執行詳解1

1. 概述 執行緒就是,在同一程式同一時間內允許執行不同函式的離散處理佇列。 這使得一個長時間去進行某種特殊運算的函式在執行時不阻礙其他的函式變得十分重要。 執行緒實際上允許同時執行兩種函式,而這兩個函式不必相互等待。 一旦一個應用程式啟動,它僅包含一個預設執行緒。 此執行

NSThread鎖的使用(執行資源共享的問題)

之前已經瞭解了NSThread如何建立執行緒,以及執行緒當中的2個屬性。 現在我們用一個購票案例,來模擬一下執行緒當中資源共享的問題。 1.建立一個售票類 // // TicketManager.h // TestThread #import

[email protected]非同步+執行

   博主在寫專案是需要新增非同步操作來提高效率,在網上有很多關於非同步操作的例子 有的是整合訊息佇列mq(kafka等分散式訊息佇列 )有的是整合redis的訊息佇列等操作,關於訊息佇列的好處可以自行百度一下,在下認為整合訊息佇列是需要安裝對應的程式對於小型專案沒有必要,有

C++ BOOST庫 條件變數[執行通訊]機制 [大三四八九月實習]

1相關理念 (1)類名 條件變數和互斥變數都是boost庫中被封裝的類。 (2)條件變數 條件變數是thread庫提供的一種等待執行緒同步的機制,可實現執行緒間的通訊,它必須與互斥量配合使用,等待另一個執行緒中某個事件發生後本執行緒才能繼續執行。 (3)互斥變數 互斥量