一個模擬訊息訂閱和釋出的簡單程式
阿新 • • 發佈:2022-04-01
- ros_sim.h
#ifndef __ROS_SIM_H__ #define __ROS_SIM_H__ #include <iostream> #include <string> #include <functional> #include <queue> #include <unordered_map> #include <thread> #include <mutex> #include <condition_variable> #include <atomic> enum RosSimMsgType { ROS_SIM_MSG_UNKNOW, ROS_SIM_MSG_TEST }; class RosSimMsg { public: RosSimMsg() : _type(ROS_SIM_MSG_UNKNOW) {} RosSimMsg(RosSimMsgType type) : _type(type) {} RosSimMsgType get_msg_type() const { return _type; } virtual std::string msg_str() const = 0; private: RosSimMsgType _type; }; using RosSimCallback = std::function<void(RosSimMsg *)>; class RosSimMsgTest : public RosSimMsg { public: RosSimMsgTest(int data) : RosSimMsg(ROS_SIM_MSG_TEST), _data(data) {} std::string msg_str() const { return std::string("This message is just for testing.") + std::to_string(_data); } private: int _data; }; class RosSim { public: RosSim() : _has_new_msg(false), _force_shutdown(false), _shutdown(false) {} int init(); int exit(bool force = false); void register_callback(RosSimMsgType type, RosSimCallback &callback); int publish_msg(RosSimMsg *msg); private: void run(); bool queue_is_empty(); bool queue_push(RosSimMsg *msg); bool queue_pop(RosSimMsg **msg); void wait_new_msg(unsigned int timeout_us); void wakeup_msg_hander(); void invoke_callback(RosSimMsg *msg); bool should_exit(); std::queue<RosSimMsg *> _msg_queue; std::mutex _msg_queue_mtx; std::unordered_map<unsigned int, RosSimCallback> _msg_handler_map; std::mutex _msg_handler_map_mtx; std::atomic<bool> _force_shutdown; std::atomic<bool> _shutdown; bool _has_new_msg; std::mutex _msg_cv_mtx; std::condition_variable _msg_cv; std::thread _msg_handler; }; #endif
- ros_sim.cpp
#include <chrono> #include "ros_sim.h" bool RosSim::queue_is_empty() { std::unique_lock<std::mutex> queue_lk(_msg_queue_mtx); return _msg_queue.empty(); } bool RosSim::queue_push(RosSimMsg *msg) { if (msg == nullptr) { return false; } std::unique_lock<std::mutex> queue_lk(_msg_queue_mtx); _msg_queue.push(msg); return true; } bool RosSim::queue_pop(RosSimMsg **msg) { bool ret = false; std::unique_lock<std::mutex> queue_lk(_msg_queue_mtx); if (!_msg_queue.empty()) { *msg = _msg_queue.front(); _msg_queue.pop(); ret = true; } return ret; } void RosSim::wait_new_msg(unsigned int timeout_us) { std::unique_lock<std::mutex> cv_lk(_msg_cv_mtx); _msg_cv.wait_for(cv_lk, std::chrono::microseconds(timeout_us), [this]() { if (_has_new_msg) { _has_new_msg = false; return true; } else { return false; } }); } void RosSim::invoke_callback(RosSimMsg *msg) { if (msg == nullptr) { return; } RosSimMsgType type = msg->get_msg_type(); std::unique_lock<std::mutex> map_lk(_msg_handler_map_mtx); if (_msg_handler_map.count(type)) { _msg_handler_map[type](msg); } } void RosSim::register_callback(RosSimMsgType type, RosSimCallback &callback) { std::unique_lock<std::mutex> map_lk(_msg_handler_map_mtx); if (!_msg_handler_map.count(type)) { _msg_handler_map.emplace(type, callback); } } int RosSim::publish_msg(RosSimMsg *msg) { if (msg == nullptr) { return -1; } queue_push(msg); wakeup_msg_hander(); } void RosSim::wakeup_msg_hander() { std::unique_lock<std::mutex> cv_lk(_msg_cv_mtx); _has_new_msg = true; _msg_cv.notify_all(); } bool RosSim::should_exit() { return _force_shutdown || (_shutdown && queue_is_empty()); } void RosSim::run() { while (!should_exit()) { RosSimMsg *msg = nullptr; while (queue_pop(&msg)) { invoke_callback(msg); msg = nullptr; } if (queue_is_empty()) { wait_new_msg(100); } } } int RosSim::init() { _msg_handler = std::thread(&RosSim::run, this); return 0; } int RosSim::exit(bool force) { if (force) { _force_shutdown.store(true); } else { _shutdown.store(true); } if (_msg_handler.joinable()) { _msg_handler.join(); } _force_shutdown.store(false); _shutdown.store(false); return 0; }
- ros_sim_test.cpp
#include <iostream> #include "ros_sim.h" int main() { // init RosSim RosSim ros_sim; RosSimCallback msg1_callback = [](RosSimMsg *msg) -> void { std::cout << "Msg1 Callback. " << msg->msg_str() << std::endl; }; // test1: single thread ros_sim.init(); ros_sim.register_callback(ROS_SIM_MSG_TEST, msg1_callback); RosSimMsgTest msg_test1(100); ros_sim.publish_msg(&msg_test1); ros_sim.exit(); // test2: multi thread ros_sim.init(); RosSimMsgTest msg_test2(200); auto test_thd = [&ros_sim, &msg_test2]() -> void { ros_sim.publish_msg(&msg_test2); }; std::thread t(test_thd); t.join(); ros_sim.exit(); return 0; }
- how to build
g++ -std=c++11 -pthread -g3 -o ros_sim_test ros_sim_test.cpp ros_sim.cpp
- run results
Msg1 Callback. This message is just for testing.100
Msg1 Callback. This message is just for testing.200