SRS原始碼分析-協程相關類
阿新 • • 發佈:2018-12-18
SRS中使用協程庫state-thread(ST), 在使用時對其進行了封裝,保證使用方便。這種封裝方法和使用thread庫比較類似。
SrsEndlessThread
用於建立一個永不退出的協程,生命週期和整個程式一樣。使用時需要繼承ISrsEndlessThreadHandler方法,並在建構函式中建立SrsEndlessThread,重寫cycle方法。
使用時執行流程:
//永不退出協程的處理類 class ISrsEndlessThreadHandler { public: ISrsEndlessThreadHandler(); virtual ~ISrsEndlessThreadHandler(); public: virtual int cycle() = 0; public: virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop(); }; //永不退出的協程 class SrsEndlessThread : public internal::ISrsThreadHandler { private: internal::SrsThread* pthread; //包含一個協程封裝類 ISrsEndlessThreadHandler* handler; //協程處理類 public: SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h); virtual ~SrsEndlessThread(); public: virtual int start(); //啟動 public: virtual int cycle(); //執行的迴圈 virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop(); };
SrsOneCycleThread
用於建立一次迴圈的thread,在cycle函式執行完後,呼叫stop_loop退出。執行流程和SrsEndlessThread類似。
//執行完handler->cycle()後,呼叫stop_loop,退出
int SrsOneCycleThread::cycle()
{
int ret = handler->cycle();
pthread->stop_loop();
return ret;
}
SrsReusableThread
可以重複使用的thread, 其他類繼承ISrsReusableThreadHandler,幷包含SrsReusableThread的變數,start函式啟動執行緒,stop函式停止執行緒。重寫cycle函式
class ISrsReusableThreadHandler { public: ISrsReusableThreadHandler(); virtual ~ISrsReusableThreadHandler(); public: virtual int cycle() = 0; public: virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop(); }; class SrsReusableThread : public internal::ISrsThreadHandler { private: internal::SrsThread* pthread; ISrsReusableThreadHandler* handler; public: SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0); virtual ~SrsReusableThread(); public: virtual int start(); virtual void stop(); public: virtual int cid(); // interface internal::ISrsThreadHandler public: virtual int cycle(); virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop(); };
//可以呼叫stop
void SrsReusableThread::stop()
{
pthread->stop();
}
SrsReusableThread2
和SrsReusableThread區別是:線上程cycle裡有內部迴圈,需要判斷interrupt狀態,如果內部loop想要退出執行緒,應該interrupt該執行緒。
//多了interrupt
void SrsReusableThread2::interrupt()
{
pthread->stop_loop();
}
bool SrsReusableThread2::interrupted()
{
return !pthread->can_loop();
}
SrsThread
/*
* 執行緒處理類,定製執行緒啟動的回撥函式
* */
class ISrsThreadHandler
{
public:
ISrsThreadHandler();
virtual ~ISrsThreadHandler();
public:
virtual void on_thread_start(); //執行緒啟動
virtual int on_before_cycle(); //cycle前
virtual int cycle() = 0; //cycle
virtual int on_end_cycle(); //cycle後
virtual void on_thread_stop(); //stop時
};
/*
* 協程的封裝,作為內部使用的類
* */
class SrsThread
{
private:
st_thread_t tid; //tid
int _cid; //cid
bool loop; //是否支援loop
bool can_run; //是否能run
bool really_terminated; //是否terminate
bool _joinable; //是否joinable
const char* _name; //協程名字
bool disposed; //是否dispose
private:
ISrsThreadHandler* handler; //回撥處理
int64_t cycle_interval_us; //迴圈時間us
public:
//初始化協程
SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);
virtual ~SrsThread();
public:
virtual int cid(); //獲取cid
virtual int start(); //啟動執行緒
virtual void stop(); //暫停執行緒
public:
virtual bool can_loop(); //是否能loop
virtual void stop_loop(); //停止loop
private:
virtual void dispose(); //釋放
virtual void thread_cycle(); //執行緒迴圈
static void* thread_fun(void* arg); //執行緒迴圈呼叫的函式
};
/*
* 執行緒的建構函式
* name:函式名
* thread_handle:執行緒處理函式
* interval_us: 休眠時長
* joinalbe: 是否能join
* */
SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
{
_name = name;
handler = thread_handler;
cycle_interval_us = interval_us;
tid = NULL;
loop = false;
really_terminated = true;
_cid = -1;
_joinable = joinable;
disposed = false;
can_run = false;
}
//解構函式,呼叫stop
SrsThread::~SrsThread()
{
stop();
}
int SrsThread::cid()
{
return _cid;
}
//啟動一個協程
int SrsThread::start()
{
int ret = ERROR_SUCCESS;
if(tid) {
srs_info("thread %s already running.", _name);
return ret;
}
//建立協程,呼叫thread_fun
if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
//是否dispose
disposed = false;
// we set to loop to true for thread to run.
loop = true;
// wait for cid to ready, for parent thread to get the cid.
while (_cid < 0) {
st_usleep(10 * 1000);
}
// now, cycle thread can run.
can_run = true;
return ret;
}
//停止一個協程
void SrsThread::stop()
{
if (!tid) {
return;
}
loop = false; //loop為false, 那麼不會繼續執行cycle()
dispose(); //釋放協程
_cid = -1;
can_run = false;
tid = NULL;
}
//清理
void SrsThread::dispose()
{
if (disposed) {
return;
}
st_thread_interrupt(tid);
if (_joinable) {
// wait the thread to exit.
int ret = st_thread_join(tid, NULL);
if (ret) {
srs_warn("core: ignore join thread failed.");
}
}
while (!really_terminated) {
st_usleep(10 * 1000);
if (really_terminated) {
break;
}
srs_warn("core: wait thread to actually terminated");
}
disposed = true;
}
//協程的迴圈
void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS;
_srs_context->generate_id(); //生成cid
srs_info("thread %s cycle start", _name);
_cid = _srs_context->get_id();
srs_assert(handler);
handler->on_thread_start(); //呼叫handle的on_thread_start
// thread is running now.
really_terminated = false;
// wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) {
st_usleep(10 * 1000);
}
//正在的loop,loop裡執行函式為:on_before_cycle->cycle->on_end_cycle
while (loop) {
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on before cycle success", _name);
if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
}
goto failed;
}
srs_info("thread %s cycle success", _name);
if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", _name);
failed:
if (!loop) {
break;
}
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
}
}
// really terminated now.
really_terminated = true;
handler->on_thread_stop();//停止時的回撥
srs_info("thread %s cycle finished", _name);
}
//協程執行的函式
void* SrsThread::thread_fun(void* arg)
{
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj);
obj->thread_cycle(); //呼叫cycle函式
// for valgrind to detect.
SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
if (ctx) {
ctx->clear_cid();
}
st_thread_exit(NULL); //退出協程
return NULL;
}