boost之無鎖佇列使用例項
阿新 • • 發佈:2019-01-31
#ifndef SEARCH_ENGINE_H #define SEARCH_ENGINE_H #include "boost_comm.h" #include "message_header.h" namespace tspace { class search_engine { public: search_engine(); ~search_engine(); public: static search_engine *instance() { static search_engine ins_; return &ins_; } public: static void print() { cout << "search_engine:" << instance()->m_add_feature_counts_ << " " << instance()->m_start_task_counts_ << " " << instance()->m_get_result_counts_ << " "; } public: static bool mt_push_add_featrue(add_feature *src); static bool mt_push_start_task(start_task *src); static bool mt_push_stop_task(stop_task *src); static bool mt_push_get_result(get_result *src); boost::atomic<int> m_add_feature_counts_; boost::atomic<int> m_total_feature_counts_ ; boost::atomic<int> m_start_task_counts_; boost::atomic<int> m_stop_task_counts_; boost::atomic<int> m_get_result_counts_; boost::mutex mtx1_; boost::mutex mtx2_; boost::mutex mtx3_; boost::mutex mtx4_; static bool mt_pop_add_feature(ptr_add_feature &ctx); static bool mt_pop_start_task(ptr_start_task &ctx); boost::mutex map_lock_; std::map<std::string, start_task *> m_curr_task_list; static bool mt_pop_stop_task(ptr_stop_task &ctx); static bool mt_pop_get_result(ptr_get_result &ctx); protected: //std=c++11,spsc_queue #ifdef STD_CPP11 boost::lockfree::spsc_queue<ptr_add_feature> add_feature_; boost::lockfree::spsc_queue<ptr_start_task> start_task_; boost::lockfree::spsc_queue<ptr_stop_task> stop_task_; boost::lockfree::spsc_queue<ptr_get_result> get_result_; #else boost::lockfree::queue<ptr_add_feature> add_feature_; boost::lockfree::queue<ptr_start_task> start_task_; boost::lockfree::queue<ptr_start_task> stop_task_; boost::lockfree::queue<ptr_get_result> get_result_; #endif public: boost::mutex mtx_; static int mt_push_task_result(const string taskid, result_data *data, int max = 100); static int mt_commit_task_result(const string &taskid); static int mt_pop_task_result(const std::string taskid, std::string &json, const int size); protected: std::map<std::string, ptr_ret_result> taskid_result_; typedef std::map<std::string, ptr_ret_result>::iterator taskid_result_itr; //boost::circular_buffer<flow_in_window_t> flows_in_windows_; friend class file_system; friend class picture_search; }; } #endif // SEARCH_ENGINE_H
#include "search_engine.h" #include "thread_pool.h" #include "picture_search.h" #include "file_system.h" #include "api_database.h" #include "tcp_client.h" #include "object_pool.h" #include "session_manager.h" #include "config.h" #include "base64.h" namespace tspace { search_engine::search_engine() :add_feature_(2048), start_task_(1024), get_result_(1024), stop_task_(1024), m_add_feature_counts_(0), m_start_task_counts_(0), m_get_result_counts_(0), m_total_feature_counts_(0) { } search_engine::~search_engine() { } boost::mutex mtx_1_; void add_feature_run() { try { //boost::unique_lock<boost::mutex> wlock(mtx_1_); add_feature *p = NULL; bool ret = search_engine::mt_pop_add_feature(p); if (ret && p && p->container_bin_.size() > 0) { bin_value *ptr_bin = p->container_bin_.front(); //file_system::fstore_file(p->car_id_, p->json_basic_, (char *)p->buffer_ctx_, p->buffer_size_); file_system::fstore_file(p->car_id_, p->json_basic_, (char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_); buffer_pool::bin_value_free(p->buffer_ctx_); delete ptr_bin; delete p; } else { std::ostringstream oslog; oslog << "add_feature_run failed ret:" << ret; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } } catch (...) { std::ostringstream oslog; oslog << "add_feature_run throw exception" ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } } boost::mutex mtx_2_; void start_task_run() { try { //boost::unique_lock<boost::mutex> wlock(mtx_2_); start_task *p = NULL; bool ret = search_engine::mt_pop_start_task(p); if (ret && p && p->container_bin_.size() > 0) { //task start bin_value *ptr_bin = p->container_bin_.front(); search_engine::instance()->map_lock_.lock(); if(search_engine::instance()->m_curr_task_list.find(p->task_id_)==search_engine::instance()->m_curr_task_list.end()) { auto itr = search_engine::instance()->m_curr_task_list.insert(make_pair<>(p->task_id_, p)); search_engine::instance()->map_lock_.unlock(); file_system::search_file(p->task_id_, p->json_condition_, p->json_detail_, (char *)ptr_bin->bin_value_, ptr_bin->bin_value_len_); search_engine::instance()->map_lock_.lock(); if(search_engine::instance()->m_curr_task_list.find(p->task_id_) !=search_engine::instance()->m_curr_task_list.end()) search_engine::instance()->m_curr_task_list.erase(itr.first); search_engine::instance()->map_lock_.unlock(); } else { std::ostringstream oslog; oslog <<"taskID "<<p->task_id_<<" is exist ,start task is failed"; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); search_engine::instance()->map_lock_.unlock(); } delete ptr_bin; delete p; //task end } } catch (...) { std::ostringstream oslog; oslog << "start_task_run throw exception" ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } } boost::mutex mtx_3_; void stop_task_run() { try { //boost::unique_lock<boost::mutex> wlock(mtx_3_); stop_task *p = NULL; bool ret = search_engine::mt_pop_stop_task(p); if (ret && p) { search_engine::instance()->map_lock_.lock(); auto itr = search_engine::instance()->m_curr_task_list.find(p->task_id_); if (itr != search_engine::instance()->m_curr_task_list.end()) { itr->second->task_state_ = task_sigal_interrupt; search_engine::instance()->m_curr_task_list.erase(itr); } search_engine::instance()->map_lock_.unlock(); delete p; } } catch (...) { std::ostringstream oslog; oslog << "stop_task_run throw exception" ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } } boost::mutex mtx_4_; void get_result_run() { try { //boost::unique_lock<boost::mutex> wlock(mtx_4_); get_result *p = NULL; bool ret = search_engine::mt_pop_get_result(p); if (ret && p) { std::string str_json; search_engine::mt_pop_task_result(p->task_id_, str_json, 100); if (p->user_ctx_) { unsigned char *buff = buffer_pool::bin_value_alloc(); int len = BUFFER_POOL_BIN_CHUNK_SIZE; bool ret = protocol_functions::construct_response((char *)buff, len, str_json.c_str()); (*((tcp_client *)p->user_ctx_))((char *)buff, len); buffer_pool::bin_value_free(buff); } delete p; } } catch (...) { std::ostringstream oslog; oslog << "get_result_run throw exception" ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } } #ifdef ENABLE_BOOST_THREAD bool search_engine::mt_push_add_featrue(add_feature *src) { boost::unique_lock<boost::mutex> wlock(instance()->mtx1_); bool ret = instance()->add_feature_.push(src); if (ret) { instance()->m_add_feature_counts_++; thread_pool::post_task(add_feature_run); if (instance()->m_total_feature_counts_++ % 400 == 1) { std::ostringstream oslog; oslog << "add_feature:" << instance()->m_add_feature_counts_ << " total_feature_counts_:" << instance()->m_total_feature_counts_ << LOG_END; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str()); } } else { std::ostringstream oslog; oslog << "search_engine add_feature max:" << instance()->m_add_feature_counts_ ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } return ret; } bool search_engine::mt_pop_add_feature(ptr_add_feature &ctx) { boost::unique_lock<boost::mutex> wlock(instance()->mtx1_); bool ret = instance()->add_feature_.pop(ctx); if (ret) { instance()->m_add_feature_counts_--; } return ret; } bool search_engine::mt_push_start_task(start_task *src) { boost::unique_lock<boost::mutex> wlock(instance()->mtx2_); bool ret = instance()->start_task_.push(src); if (ret) { instance()->m_start_task_counts_++; thread_pool::post_task(start_task_run); } else { std::ostringstream oslog; oslog << "search_engine start_task max:" << instance()->m_start_task_counts_ << LOG_END; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } return ret; } bool search_engine::mt_push_stop_task(stop_task * src) { boost::unique_lock<boost::mutex> wlock(instance()->mtx4_); bool ret = instance()->stop_task_.push(src); if (ret) { instance()->m_stop_task_counts_++; thread_pool::post_task(stop_task_run); } else { std::ostringstream oslog; oslog << "search_engine stop_task max:" << instance()->m_stop_task_counts_ ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } return ret; } bool search_engine::mt_pop_start_task(ptr_start_task &ctx) { boost::unique_lock<boost::mutex> wlock(instance()->mtx2_); bool ret = instance()->start_task_.pop(ctx); if (ret) { instance()->m_start_task_counts_--; } return ret; } bool search_engine::mt_pop_stop_task(ptr_stop_task & ctx) { boost::unique_lock<boost::mutex> wlock(instance()->mtx4_); bool ret = instance()->stop_task_.pop(ctx); if (ret) { instance()->m_stop_task_counts_++; } return ret; } bool search_engine::mt_push_get_result(get_result *src) { boost::unique_lock<boost::mutex> wlock(instance()->mtx3_); bool ret = instance()->get_result_.push(src); if (ret) { instance()->m_get_result_counts_++; thread_pool::post_task(get_result_run); } else { std::ostringstream oslog; oslog << "search_engine get_result max:" << instance()->m_get_result_counts_ ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } return ret; } bool search_engine::mt_pop_get_result(ptr_get_result &ctx) { boost::unique_lock<boost::mutex> wlock(instance()->mtx3_); bool ret = instance()->get_result_.pop(ctx); if (ret) { instance()->m_get_result_counts_++; } return ret; } int search_engine::mt_push_task_result(const string taskid, result_data *data, int max/*=100*/) { bool ret = false; try { boost::unique_lock<boost::mutex> wlock(instance()->mtx_); //LOG(INFO)<< "push task result taskid:" << taskid <<LOG_END; taskid_result_itr itr = instance()->taskid_result_.find(taskid); if (itr != instance()->taskid_result_.end()) { if (itr->second->ret_result_.size() < 100) { itr->second->ret_result_.insert(data); //LOG(INF) <<"data:"<<data<<" size:" << itr->second->ret_result_.size() << endl; } else { result_data *b_data=*(itr->second->ret_result_.begin()); if(data->matchingScore_>b_data->matchingScore_){ itr->second->ret_result_.insert(data); object_pool::free_data(b_data); itr->second->ret_result_.erase(itr->second->ret_result_.begin()); } else { object_pool::free_data(data); } } } else { ret_result *ptr_res = object_pool::alloc_result("0", "success"); ptr_res->ret_result_.insert(data); ret = instance()->taskid_result_.insert(make_pair<>(taskid, ptr_res)).second; } } catch (...) { std::ostringstream oslog; oslog << "mt_push_task_result excp" ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); } return ret; } int search_engine::mt_commit_task_result(const string &taskid) { boost::unique_lock<boost::mutex> wlock(instance()->mtx_); taskid_result_itr itr_result = instance()->taskid_result_.find(taskid); std::string strSql = ""; std::ostringstream ostrstream; if (itr_result != instance()->taskid_result_.end()) { //char db_buff[1024]; ostrstream<<"insert into picsearch_fstore_detail values"; int count =0; for (std::multiset<ptr_result_data, result_data_cmp>::iterator itr = itr_result->second->ret_result_.begin(); itr != itr_result->second->ret_result_.end(); ) { string carPlateNumber = (*itr)->carPlateNumber_; //char outbuf[1024]; // int outlen = 1024; #ifdef _WIN32 #else //extern int code_convert(char *from_charset, char *to_charset, char *inbuf, int inlen, char *outbuf, int outlen); //code_convert("gb2312", "utf-8", (char *)carPlateNumber.c_str(), carPlateNumber.size(), outbuf, outlen); #endif //taskId,carId,matchingScore,snapShotTime,carPlateNumber,searchType if(count++>0) ostrstream<<","; ostrstream<<"("<<taskid<<","<<(*itr)->objectId_<<","<<(*itr)->numType_ << "," <<(*itr)->matchingScore_<<","<<(*itr)->snapshotTime_<<",0)"; /*snprintf(db_buff, 1024, "insert into picsearch_fstore_detail values(%s,%s,%3.3f,%s,'%s',%d)", taskid.c_str(), (*itr)->carId_.c_str(),(*itr)->matchingScore_, (*itr)->snapshotTime_.c_str(), carPlateNumber.c_str(), 0);*/ object_pool::free_data(*itr); itr_result->second->ret_result_.erase(itr++); } strSql = ostrstream.str(); file_system::instance()->task_detail_db_->excute_insert(strSql.c_str()); object_pool::free_result(itr_result->second); instance()->taskid_result_.erase(itr_result); } } int search_engine::mt_pop_task_result(const std::string taskid, std::string &str_json, const int size) { boost::unique_lock<boost::mutex> wlock(instance()->mtx_); std::ostringstream oslog; oslog << "pop task result taskid:" << taskid ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str()); taskid_result_itr itr = instance()->taskid_result_.find(taskid); if (itr != instance()->taskid_result_.end()) { ptr_ret_result ret_res = itr->second; //assert(ptr_res != NULL); static Json::Value root; root.clear(); static Json::Value arrayObj; arrayObj.clear(); Json::FastWriter writer; //construct json string acc size std::multiset<ptr_result_data, result_data_cmp>::iterator itr = ret_res->ret_result_.begin(); for (; itr !=ret_res->ret_result_.end();itr++) { result_data *p = NULL; p = *itr; if (!p) { break; } Json::FastWriter writer; static Json::Value subdetail; subdetail.clear(); subdetail["objectId"] = p->objectId_.c_str(); subdetail["type"] = p->numType_.c_str(); subdetail["matchingScore"] = p->matchingScore_; subdetail["snapshotTime"] = p->snapshotTime_; string detailString = writer.write(subdetail); arrayObj.append(subdetail); } root["returnCode"] = 0; root["msg"] = "success"; root["data"] = arrayObj; str_json = writer.write(root); std::ostringstream oslog; oslog << "task result:" << str_json << LOG_END; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger0"), oslog.str()); } else { std::ostringstream oslog; oslog << "can't find taskid:" << taskid ; LOG4CXX_INFO(log4cxx::Logger::getLogger("logger2"), oslog.str()); //<!--如果任務已經結束,返回returnCode = 2,data : [] --> str_json = "{\"returnCode\":2,\"msg\":\"can't find taskid\",\"data\":[]}"; } return 0; } #endif }
需要說明的是spsc_queue支援一個生產者一個消費者的情況,lockfree::queue支援多個生產者,多個消費者的情況,定義的無鎖佇列在類的建構函式中初始化指定固定大小,否則程式會崩潰,無鎖佇列中沒有統計元素個數的方法,可以在用一個外部變數在push和pop的地方做統計。