1. 程式人生 > 其它 >一個模擬訊息訂閱和釋出的簡單程式

一個模擬訊息訂閱和釋出的簡單程式

  • ros_sim.h
#ifndef __ROS_SIM_H__
#define __ROS_SIM_H__

#include <iostream>
#include <string>
#include <functional>
#include <queue>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>

enum RosSimMsgType
{
    ROS_SIM_MSG_UNKNOW,
    ROS_SIM_MSG_TEST
};

class RosSimMsg
{
public:
    RosSimMsg() : _type(ROS_SIM_MSG_UNKNOW) {}
    RosSimMsg(RosSimMsgType type) : _type(type) {}
    RosSimMsgType get_msg_type() const { return _type; }
    virtual std::string msg_str() const = 0;

private:
    RosSimMsgType _type;
};

using RosSimCallback = std::function<void(RosSimMsg *)>;

class RosSimMsgTest : public RosSimMsg
{
public:
    RosSimMsgTest(int data) : RosSimMsg(ROS_SIM_MSG_TEST), _data(data) {}
    std::string msg_str() const
    {
        return std::string("This message is just for testing.") + std::to_string(_data);
    }

private:
    int _data;
};

class RosSim
{
public:
    RosSim() : _has_new_msg(false), _force_shutdown(false), _shutdown(false) {}
    int init();
    int exit(bool force = false);
    void register_callback(RosSimMsgType type, RosSimCallback &callback);
    int publish_msg(RosSimMsg *msg);

private:
    void run();

    bool queue_is_empty();
    bool queue_push(RosSimMsg *msg);
    bool queue_pop(RosSimMsg **msg);

    void wait_new_msg(unsigned int timeout_us);
    void wakeup_msg_hander();

    void invoke_callback(RosSimMsg *msg);

    bool should_exit();

    std::queue<RosSimMsg *> _msg_queue;
    std::mutex _msg_queue_mtx;

    std::unordered_map<unsigned int, RosSimCallback> _msg_handler_map;
    std::mutex _msg_handler_map_mtx;

    std::atomic<bool> _force_shutdown;
    std::atomic<bool> _shutdown;
    bool _has_new_msg;
    std::mutex _msg_cv_mtx;
    std::condition_variable _msg_cv;
    std::thread _msg_handler;
};

#endif
  • ros_sim.cpp
#include <chrono>
#include "ros_sim.h"

bool RosSim::queue_is_empty()
{
    std::unique_lock<std::mutex> queue_lk(_msg_queue_mtx);
    return _msg_queue.empty();
}

bool RosSim::queue_push(RosSimMsg *msg)
{
    if (msg == nullptr)
    {
        return false;
    }
    std::unique_lock<std::mutex> queue_lk(_msg_queue_mtx);
    _msg_queue.push(msg);
    return true;
}

bool RosSim::queue_pop(RosSimMsg **msg)
{
    bool ret = false;
    std::unique_lock<std::mutex> queue_lk(_msg_queue_mtx);
    if (!_msg_queue.empty())
    {
        *msg = _msg_queue.front();
        _msg_queue.pop();
        ret = true;
    }
    return ret;
}

void RosSim::wait_new_msg(unsigned int timeout_us)
{
    std::unique_lock<std::mutex> cv_lk(_msg_cv_mtx);
    _msg_cv.wait_for(cv_lk, std::chrono::microseconds(timeout_us), [this]()
                     {
        if (_has_new_msg)
        {
            _has_new_msg = false;
            return true;
        }
        else
        {
            return false;
        } });
}

void RosSim::invoke_callback(RosSimMsg *msg)
{
    if (msg == nullptr)
    {
        return;
    }
    RosSimMsgType type = msg->get_msg_type();
    std::unique_lock<std::mutex> map_lk(_msg_handler_map_mtx);
    if (_msg_handler_map.count(type))
    {
        _msg_handler_map[type](msg);
    }
}

void RosSim::register_callback(RosSimMsgType type, RosSimCallback &callback)
{
    std::unique_lock<std::mutex> map_lk(_msg_handler_map_mtx);
    if (!_msg_handler_map.count(type))
    {
        _msg_handler_map.emplace(type, callback);
    }
}

int RosSim::publish_msg(RosSimMsg *msg)
{
    if (msg == nullptr)
    {
        return -1;
    }
    queue_push(msg);
    wakeup_msg_hander();
}

void RosSim::wakeup_msg_hander()
{
    std::unique_lock<std::mutex> cv_lk(_msg_cv_mtx);
    _has_new_msg = true;
    _msg_cv.notify_all();
}

bool RosSim::should_exit()
{
    return _force_shutdown || (_shutdown && queue_is_empty());
}

void RosSim::run()
{
    while (!should_exit())
    {
        RosSimMsg *msg = nullptr;
        while (queue_pop(&msg))
        {
            invoke_callback(msg);
            msg = nullptr;
        }

        if (queue_is_empty())
        {
            wait_new_msg(100);
        }
    }
}

int RosSim::init()
{
    _msg_handler = std::thread(&RosSim::run, this);
    return 0;
}

int RosSim::exit(bool force)
{
    if (force)
    {
        _force_shutdown.store(true);
    }
    else
    {
        _shutdown.store(true);
    }

    if (_msg_handler.joinable())
    {
        _msg_handler.join();
    }

    _force_shutdown.store(false);
    _shutdown.store(false);
    return 0;
}
  • ros_sim_test.cpp
#include <iostream>
#include "ros_sim.h"

int main()
{
    // init RosSim
    RosSim ros_sim;

    RosSimCallback msg1_callback = [](RosSimMsg *msg) -> void
    {
        std::cout << "Msg1 Callback. " << msg->msg_str() << std::endl;
    };

    // test1: single thread
    ros_sim.init();
    ros_sim.register_callback(ROS_SIM_MSG_TEST, msg1_callback);
    RosSimMsgTest msg_test1(100);
    ros_sim.publish_msg(&msg_test1);
    ros_sim.exit();

    // test2: multi thread
    ros_sim.init();
    RosSimMsgTest msg_test2(200);
    auto test_thd = [&ros_sim, &msg_test2]() -> void
    {
        ros_sim.publish_msg(&msg_test2);
    };
    std::thread t(test_thd);
    t.join();
    ros_sim.exit();
    return 0;
}
  • how to build
g++ -std=c++11 -pthread -g3 -o ros_sim_test ros_sim_test.cpp ros_sim.cpp
  • run results
Msg1 Callback. This message is just for testing.100
Msg1 Callback. This message is just for testing.200