1. 程式人生 > >boost之無鎖佇列使用例項

boost之無鎖佇列使用例項

#ifndef	SEARCH_ENGINE_H
#define SEARCH_ENGINE_H

#include "boost_comm.h"
#include "message_header.h"

namespace tspace
{
	class search_engine
	{
	public:
		search_engine();
		~search_engine();
	public:
		static search_engine *instance()
		{
			static search_engine ins_;
			return &ins_;
		}
	public:

		static void print()
		{
			cout << "search_engine:" << instance()->m_add_feature_counts_ << " "
				<< instance()->m_start_task_counts_ << " "
				<< instance()->m_get_result_counts_ << " ";
		}
	public:
		static bool mt_push_add_featrue(add_feature *src);
		static bool mt_push_start_task(start_task *src);
		static bool mt_push_stop_task(stop_task *src);
		static bool mt_push_get_result(get_result *src);

		boost::atomic<int> m_add_feature_counts_;
		boost::atomic<int> m_total_feature_counts_ ;
		boost::atomic<int> m_start_task_counts_;
		boost::atomic<int> m_stop_task_counts_;
		boost::atomic<int> m_get_result_counts_;

		boost::mutex mtx1_;
		boost::mutex mtx2_;
		boost::mutex mtx3_;
		boost::mutex mtx4_;
		static bool mt_pop_add_feature(ptr_add_feature &ctx);
		static bool mt_pop_start_task(ptr_start_task &ctx);
		boost::mutex map_lock_;
		std::map<std::string, start_task *> m_curr_task_list;
		static bool mt_pop_stop_task(ptr_stop_task &ctx);
		static bool mt_pop_get_result(ptr_get_result &ctx);
	protected:
		//std=c++11,spsc_queue
#ifdef STD_CPP11
		boost::lockfree::spsc_queue<ptr_add_feature> add_feature_;
		boost::lockfree::spsc_queue<ptr_start_task> start_task_;
		boost::lockfree::spsc_queue<ptr_stop_task> stop_task_;
		boost::lockfree::spsc_queue<ptr_get_result> get_result_;
#else
		boost::lockfree::queue<ptr_add_feature> add_feature_;
		boost::lockfree::queue<ptr_start_task> start_task_;
		boost::lockfree::queue<ptr_start_task> stop_task_;
		boost::lockfree::queue<ptr_get_result> get_result_;
#endif
	public:
		boost::mutex mtx_;
		static int mt_push_task_result(const string taskid, result_data *data, int max = 100);
		static int mt_commit_task_result(const string &taskid);
		static int mt_pop_task_result(const std::string taskid, std::string &json, const int size);
	protected:
		std::map<std::string, ptr_ret_result> taskid_result_;
		typedef std::map<std::string, ptr_ret_result>::iterator taskid_result_itr;
		//boost::circular_buffer<flow_in_window_t> flows_in_windows_;
		friend class file_system;
		friend class picture_search;
	};
}

#endif // SEARCH_ENGINE_H
#include "search_engine.h"

#include "thread_pool.h"
#include "picture_search.h"

#include "file_system.h"
#include "api_database.h"
#include "tcp_client.h"

#include "object_pool.h"
#include "session_manager.h"

#include "config.h"
#include "base64.h"

namespace tspace
{
	search_engine::search_engine()
		:add_feature_(2048),
		start_task_(1024),
		get_result_(1024),
                stop_task_(1024),
		m_add_feature_counts_(0),
		m_start_task_counts_(0),
		m_get_result_counts_(0),
		m_total_feature_counts_(0)
	{
	}

	search_engine::~search_engine()
	{
	}

	boost::mutex mtx_1_;
	void add_feature_run()
	{
		try
		{
			//boost::unique_lock<boost::mutex> wlock(mtx_1_);
			add_feature *p = NULL;
			bool ret = search_engine::mt_pop_add_feature(p);
			if (ret && p && p->container_bin_.size() > 0)
			{
				bin_value *ptr_bin = p->container_bin_.front();
				//file_system::fstore_file(p->car_id_, p->json_basic_, (char *)p->buffer_ctx_, p->buffer_size_);
				file_system::fstore_file(p->car_id_, p->json_basic_, (char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_);
				buffer_pool::bin_value_free(p->buffer_ctx_);
				delete ptr_bin;
				delete p;
			}
			else
			{
				std::ostringstream oslog;
				oslog << "add_feature_run failed ret:" << ret;
				LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
			}
		}
		catch (...)
		{
			std::ostringstream oslog;
			oslog << "add_feature_run throw exception" ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}
	}

	boost::mutex mtx_2_;
	void start_task_run()
	{
		try
		{
			//boost::unique_lock<boost::mutex> wlock(mtx_2_);
			start_task *p = NULL;
			bool ret = search_engine::mt_pop_start_task(p);
			if (ret && p && p->container_bin_.size() > 0)
			{
				//task start
				bin_value *ptr_bin = p->container_bin_.front();
				search_engine::instance()->map_lock_.lock();
				if(search_engine::instance()->m_curr_task_list.find(p->task_id_)==search_engine::instance()->m_curr_task_list.end())
				{
					auto itr = search_engine::instance()->m_curr_task_list.insert(make_pair<>(p->task_id_, p));
					search_engine::instance()->map_lock_.unlock();
					file_system::search_file(p->task_id_, p->json_condition_, p->json_detail_, 
						(char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_);
					search_engine::instance()->map_lock_.lock();
					if(search_engine::instance()->m_curr_task_list.find(p->task_id_) !=search_engine::instance()->m_curr_task_list.end())
						search_engine::instance()->m_curr_task_list.erase(itr.first);
					search_engine::instance()->map_lock_.unlock();
				}
				else 
				{
					std::ostringstream oslog;
					oslog <<"taskID "<<p->task_id_<<" is exist ,start task is failed";
					LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
					search_engine::instance()->map_lock_.unlock();
				}
				delete ptr_bin;
				delete p;
				//task end
			}
		}
		catch (...)
		{
			std::ostringstream oslog;
			oslog << "start_task_run throw exception" ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}
		
	}

	boost::mutex mtx_3_;
	void stop_task_run()
	{
		try
		{
			//boost::unique_lock<boost::mutex> wlock(mtx_3_);
			stop_task *p = NULL;
			bool ret = search_engine::mt_pop_stop_task(p);
			if (ret && p)
			{
				search_engine::instance()->map_lock_.lock();
				auto itr = search_engine::instance()->m_curr_task_list.find(p->task_id_);
				if (itr != search_engine::instance()->m_curr_task_list.end())
				{
					itr->second->task_state_ = task_sigal_interrupt;
					search_engine::instance()->m_curr_task_list.erase(itr);
				}
				search_engine::instance()->map_lock_.unlock();

				delete p;
			}
		}
		catch (...)
		{
			std::ostringstream oslog;
			oslog << "stop_task_run throw exception" ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}

	}

	boost::mutex mtx_4_;
	void get_result_run()
	{
		try
		{
			//boost::unique_lock<boost::mutex> wlock(mtx_4_);
			get_result *p = NULL;
			bool ret = search_engine::mt_pop_get_result(p);
			if (ret && p)
			{
				std::string str_json;
				search_engine::mt_pop_task_result(p->task_id_, str_json, 100);
				if (p->user_ctx_)
				{
					unsigned char *buff = buffer_pool::bin_value_alloc();
					int len = BUFFER_POOL_BIN_CHUNK_SIZE;
					bool ret = protocol_functions::construct_response((char *)buff, len, str_json.c_str());
					(*((tcp_client *)p->user_ctx_))((char *)buff, len);
					buffer_pool::bin_value_free(buff);
				}
				delete p;
			}
		}
		catch (...)
		{
			std::ostringstream oslog;
			oslog << "get_result_run throw exception" ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}
	}
#ifdef ENABLE_BOOST_THREAD
	bool search_engine::mt_push_add_featrue(add_feature *src)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx1_);
		bool ret = instance()->add_feature_.push(src);
		if (ret)
		{
			instance()->m_add_feature_counts_++;
			thread_pool::post_task(add_feature_run);
			if (instance()->m_total_feature_counts_++ % 400 == 1)
			{
				std::ostringstream oslog;
				oslog << "add_feature:" << instance()->m_add_feature_counts_ << " total_feature_counts_:" << instance()->m_total_feature_counts_ << LOG_END;
				LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());
			}
		}
		else
		{
			std::ostringstream oslog;
			oslog << "search_engine add_feature max:" << instance()->m_add_feature_counts_ ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}
		
		return ret;
	}

	bool search_engine::mt_pop_add_feature(ptr_add_feature &ctx)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx1_);
		bool ret = instance()->add_feature_.pop(ctx);
		if (ret)
		{
			instance()->m_add_feature_counts_--;
		}
		return ret;
	}

	bool search_engine::mt_push_start_task(start_task *src)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx2_);
		bool ret = instance()->start_task_.push(src);
		if (ret)
		{
			instance()->m_start_task_counts_++;
			thread_pool::post_task(start_task_run);
		}
		else
		{
			std::ostringstream oslog;
			oslog << "search_engine start_task max:" << instance()->m_start_task_counts_ << LOG_END;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}
		
		return ret;
	}
	bool search_engine::mt_push_stop_task(stop_task * src)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx4_);
		bool ret = instance()->stop_task_.push(src);
		if (ret)
		{
			instance()->m_stop_task_counts_++;
			thread_pool::post_task(stop_task_run);
		}
		else
		{
			std::ostringstream oslog;
			oslog << "search_engine stop_task max:" << instance()->m_stop_task_counts_ ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}

		return ret;
	}
	bool search_engine::mt_pop_start_task(ptr_start_task &ctx)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx2_);
		bool ret = instance()->start_task_.pop(ctx);
		if (ret)
		{
			instance()->m_start_task_counts_--;
		}
		return ret;
	}
	bool search_engine::mt_pop_stop_task(ptr_stop_task & ctx)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx4_);
		bool ret = instance()->stop_task_.pop(ctx);
		if (ret)
		{
			instance()->m_stop_task_counts_++;
		}
		return ret;
	}
	bool search_engine::mt_push_get_result(get_result *src)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx3_);
		bool ret = instance()->get_result_.push(src);
		if (ret)
		{
			instance()->m_get_result_counts_++;
			thread_pool::post_task(get_result_run);
		}
		else
		{
			std::ostringstream oslog;
			oslog << "search_engine get_result max:" << instance()->m_get_result_counts_ ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}
		
		return ret;
	}
	bool search_engine::mt_pop_get_result(ptr_get_result &ctx)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx3_);
		bool ret = instance()->get_result_.pop(ctx);
		if (ret)
		{
			instance()->m_get_result_counts_++;
		}
		return ret;
	}

	int search_engine::mt_push_task_result(const string taskid, result_data *data, int max/*=100*/)
	{
		bool ret = false;
		try
		{
			boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
			//LOG(INFO)<< "push task result taskid:" << taskid <<LOG_END;
			taskid_result_itr itr = instance()->taskid_result_.find(taskid);
			if (itr != instance()->taskid_result_.end())
			{
				if (itr->second->ret_result_.size() < 100)
				{
					itr->second->ret_result_.insert(data);
					//LOG(INF) <<"data:"<<data<<" size:" << itr->second->ret_result_.size() << endl;
				}
				else
				{
						result_data *b_data=*(itr->second->ret_result_.begin());
					if(data->matchingScore_>b_data->matchingScore_){
						itr->second->ret_result_.insert(data);
						object_pool::free_data(b_data);
						itr->second->ret_result_.erase(itr->second->ret_result_.begin());
					}
					else
					{
						object_pool::free_data(data);
					}
				}
			}
			else
			{
				ret_result *ptr_res = object_pool::alloc_result("0", "success");
				ptr_res->ret_result_.insert(data);
				ret = instance()->taskid_result_.insert(make_pair<>(taskid, ptr_res)).second;
			}
		}
		catch (...)
		{
			std::ostringstream oslog;
			oslog << "mt_push_task_result excp" ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
		}
		
		return ret;
	}

	int search_engine::mt_commit_task_result(const string &taskid)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
		taskid_result_itr itr_result = instance()->taskid_result_.find(taskid);
		std::string strSql = "";
		std::ostringstream ostrstream;
		if (itr_result != instance()->taskid_result_.end())
		{
			//char db_buff[1024];
			ostrstream<<"insert into picsearch_fstore_detail values";
			int count =0;
			for (std::multiset<ptr_result_data, result_data_cmp>::iterator itr = itr_result->second->ret_result_.begin();
				itr != itr_result->second->ret_result_.end(); )
			{
				string carPlateNumber = (*itr)->carPlateNumber_;
				//char outbuf[1024];
			//	int outlen = 1024;
#ifdef _WIN32
#else
				//extern int code_convert(char *from_charset, char *to_charset, char *inbuf, int inlen, char *outbuf, int outlen);
				//code_convert("gb2312", "utf-8", (char *)carPlateNumber.c_str(), carPlateNumber.size(), outbuf, outlen);
#endif
				//taskId,carId,matchingScore,snapShotTime,carPlateNumber,searchType
				if(count++>0)
					ostrstream<<",";
				ostrstream<<"("<<taskid<<","<<(*itr)->objectId_<<","<<(*itr)->numType_ << "," <<(*itr)->matchingScore_<<","<<(*itr)->snapshotTime_<<",0)";
				/*snprintf(db_buff, 1024, "insert into picsearch_fstore_detail values(%s,%s,%3.3f,%s,'%s',%d)",
					taskid.c_str(), (*itr)->carId_.c_str(),(*itr)->matchingScore_, (*itr)->snapshotTime_.c_str(), 
					carPlateNumber.c_str(), 0);*/
				object_pool::free_data(*itr);
				itr_result->second->ret_result_.erase(itr++);
			}
			strSql = ostrstream.str();
				file_system::instance()->task_detail_db_->excute_insert(strSql.c_str());
				
			object_pool::free_result(itr_result->second);
			instance()->taskid_result_.erase(itr_result);
		}
	}

	int search_engine::mt_pop_task_result(const std::string taskid, std::string &str_json, const int size)
	{
		boost::unique_lock<boost::mutex> wlock(instance()->mtx_);
		std::ostringstream oslog;
		oslog << "pop task result taskid:" << taskid ;
		LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());
		taskid_result_itr itr = instance()->taskid_result_.find(taskid);
		if (itr != instance()->taskid_result_.end())
		{
			ptr_ret_result ret_res = itr->second;
			//assert(ptr_res != NULL);
			static Json::Value root;
			root.clear();
			static Json::Value arrayObj;
			arrayObj.clear();
			Json::FastWriter writer;
			//construct json string acc size
			std::multiset<ptr_result_data, result_data_cmp>::iterator itr = ret_res->ret_result_.begin();
					
			for (; itr !=ret_res->ret_result_.end();itr++)
			{
				result_data *p = NULL;
				p = *itr;
				
				if (!p)
				{
					break;
				}
				Json::FastWriter writer;

				static Json::Value subdetail;
				subdetail.clear();
				subdetail["objectId"] = p->objectId_.c_str();
				subdetail["type"] = p->numType_.c_str();
				subdetail["matchingScore"] = p->matchingScore_;
				subdetail["snapshotTime"] = p->snapshotTime_;

				string detailString = writer.write(subdetail);

				arrayObj.append(subdetail);

			}
			root["returnCode"] = 0;
			root["msg"] = "success";
			root["data"] = arrayObj;
			str_json = writer.write(root);
			std::ostringstream oslog;
			oslog << "task result:" << str_json << LOG_END;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str());

		}
		else
		{
			std::ostringstream oslog;
			oslog << "can't find taskid:" << taskid ;
			LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str());
			//<!--如果任務已經結束,返回returnCode = 2,data : [] -->
			str_json = "{\"returnCode\":2,\"msg\":\"can't find taskid\",\"data\":[]}";
		}
		return 0;
	}
#endif
}

需要說明的是spsc_queue支援一個生產者一個消費者的情況,lockfree::queue支援多個生產者,多個消費者的情況,定義的無鎖佇列在類的建構函式中初始化指定固定大小,否則程式會崩潰,無鎖佇列中沒有統計元素個數的方法,可以在用一個外部變數在push和pop的地方做統計。