1. 程式人生 > >SRS原始碼分析-協程相關類

SRS原始碼分析-協程相關類

SRS中使用協程庫state-thread(ST), 在使用時對其進行了封裝,保證使用方便。這種封裝方法和使用thread庫比較類似。

SrsEndlessThread

用於建立一個永不退出的協程,生命週期和整個程式一樣。使用時需要繼承ISrsEndlessThreadHandler方法,並在建構函式中建立SrsEndlessThread,重寫cycle方法。

使用時執行流程:
SRSEndlessThread使用時執行流程圖

 //永不退出協程的處理類
class ISrsEndlessThreadHandler
{
public:
    ISrsEndlessThreadHandler();
    virtual ~ISrsEndlessThreadHandler();
public:
    virtual int cycle() = 0;
public:
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};

//永不退出的協程
class SrsEndlessThread : public internal::ISrsThreadHandler
{
private:
    internal::SrsThread* pthread; //包含一個協程封裝類
    ISrsEndlessThreadHandler* handler; //協程處理類
public:
    SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h);
    virtual ~SrsEndlessThread();
public:
    virtual int start(); //啟動
public:
    virtual int cycle(); //執行的迴圈
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};

SrsOneCycleThread

用於建立一次迴圈的thread,在cycle函式執行完後,呼叫stop_loop退出。執行流程和SrsEndlessThread類似。

//執行完handler->cycle()後,呼叫stop_loop,退出
int SrsOneCycleThread::cycle()
{
    int ret = handler->cycle();
    pthread->stop_loop();
    return ret;
}

SrsReusableThread

可以重複使用的thread, 其他類繼承ISrsReusableThreadHandler,幷包含SrsReusableThread的變數,start函式啟動執行緒,stop函式停止執行緒。重寫cycle函式

class ISrsReusableThreadHandler
{
public:
    ISrsReusableThreadHandler();
    virtual ~ISrsReusableThreadHandler();
public:
    virtual int cycle() = 0;
public:
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};
class SrsReusableThread : public internal::ISrsThreadHandler
{
private:
    internal::SrsThread* pthread;
    ISrsReusableThreadHandler* handler;
public:
    SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0);
    virtual ~SrsReusableThread();
public:
    virtual int start();
    virtual void stop();
public:
    virtual int cid();
// interface internal::ISrsThreadHandler
public:
    virtual int cycle();
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};
//可以呼叫stop
void SrsReusableThread::stop()
{
    pthread->stop();
}

SrsReusableThread2

和SrsReusableThread區別是:線上程cycle裡有內部迴圈,需要判斷interrupt狀態,如果內部loop想要退出執行緒,應該interrupt該執行緒。

//多了interrupt
void SrsReusableThread2::interrupt()
{
    pthread->stop_loop();
}

bool SrsReusableThread2::interrupted()
{
    return !pthread->can_loop();
}

SrsThread

/*
 * 執行緒處理類,定製執行緒啟動的回撥函式
 * */
class ISrsThreadHandler
{
public:
    ISrsThreadHandler();
    virtual ~ISrsThreadHandler();
public:
    virtual void on_thread_start(); //執行緒啟動
    virtual int on_before_cycle(); //cycle前
    virtual int cycle() = 0; //cycle
    virtual int on_end_cycle(); //cycle後
    virtual void on_thread_stop(); //stop時
};
 /*
  * 協程的封裝,作為內部使用的類
  * */
class SrsThread
{
private:
    st_thread_t tid; //tid
    int _cid; //cid
    bool loop; //是否支援loop
    bool can_run; //是否能run
    bool really_terminated; //是否terminate
    bool _joinable; //是否joinable
    const char* _name; //協程名字
    bool disposed; //是否dispose
private:
    ISrsThreadHandler* handler; //回撥處理
    int64_t cycle_interval_us; //迴圈時間us
public:
     //初始化協程
    SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);
    virtual ~SrsThread();
public:
    virtual int cid(); //獲取cid
    virtual int start(); //啟動執行緒
    virtual void stop(); //暫停執行緒
public:
    virtual bool can_loop(); //是否能loop
    virtual void stop_loop(); //停止loop
private:
    virtual void dispose(); //釋放
    virtual void thread_cycle(); //執行緒迴圈
    static void* thread_fun(void* arg); //執行緒迴圈呼叫的函式
};
/*
     * 執行緒的建構函式
     * name:函式名
     * thread_handle:執行緒處理函式
     * interval_us: 休眠時長
     * joinalbe: 是否能join
     * */
    SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
    {
        _name = name;
        handler = thread_handler;
        cycle_interval_us = interval_us;
        
        tid = NULL;
        loop = false;
        really_terminated = true;
        _cid = -1;
        _joinable = joinable;
        disposed = false;
        can_run = false;
    }
    //解構函式,呼叫stop
    SrsThread::~SrsThread()
    {
        stop();
    }
    
    int SrsThread::cid()
    {
        return _cid;
    }
    //啟動一個協程
    int SrsThread::start()
    {
        int ret = ERROR_SUCCESS;
        
        if(tid) {
            srs_info("thread %s already running.", _name);
            return ret;
        }
        //建立協程,呼叫thread_fun
        if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
            ret = ERROR_ST_CREATE_CYCLE_THREAD;
            srs_error("st_thread_create failed. ret=%d", ret);
            return ret;
        }
        //是否dispose
        disposed = false;
        // we set to loop to true for thread to run.
        loop = true;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (_cid < 0) {
            st_usleep(10 * 1000);
        }
        
        // now, cycle thread can run.
        can_run = true;
        
        return ret;
    }

    //停止一個協程
    void SrsThread::stop()
    {
        if (!tid) {
            return;
        }
        
        loop = false; //loop為false, 那麼不會繼續執行cycle()
        
        dispose(); //釋放協程
        
        _cid = -1;
        can_run = false;
        tid = NULL;        
    }

    //清理
    void SrsThread::dispose()
    {
        if (disposed) {
            return;
        }
        st_thread_interrupt(tid);
        if (_joinable) {
            // wait the thread to exit.
            int ret = st_thread_join(tid, NULL);
            if (ret) {
                srs_warn("core: ignore join thread failed.");
            }
        }
        while (!really_terminated) {
            st_usleep(10 * 1000);
            
            if (really_terminated) {
                break;
            }
            srs_warn("core: wait thread to actually terminated");
        }
        
        disposed = true;
    }

    //協程的迴圈
    void SrsThread::thread_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        _srs_context->generate_id(); //生成cid
        srs_info("thread %s cycle start", _name);
        
        _cid = _srs_context->get_id();
        
        srs_assert(handler);
        handler->on_thread_start(); //呼叫handle的on_thread_start
        
        // thread is running now.
        really_terminated = false;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (!can_run && loop) {
            st_usleep(10 * 1000);
        }
        //正在的loop,loop裡執行函式為:on_before_cycle->cycle->on_end_cycle
        while (loop) {
            if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
                goto failed;
            }
            srs_info("thread %s on before cycle success", _name);
            
            if ((ret = handler->cycle()) != ERROR_SUCCESS) {
                if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
                    srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
                }
                goto failed;
            }
            srs_info("thread %s cycle success", _name);
            
            if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
                goto failed;
            }
            srs_info("thread %s on end cycle success", _name);
            
        failed:
            if (!loop) {
                break;
            }
            if (cycle_interval_us != 0) {
                st_usleep(cycle_interval_us);
            }
        }
        
        // really terminated now.
        really_terminated = true;
        
        handler->on_thread_stop();//停止時的回撥
        srs_info("thread %s cycle finished", _name);
    }

    //協程執行的函式
    void* SrsThread::thread_fun(void* arg)
    {
        SrsThread* obj = (SrsThread*)arg;
        srs_assert(obj);
        
        obj->thread_cycle(); //呼叫cycle函式
        
        // for valgrind to detect.
        SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
        if (ctx) {
            ctx->clear_cid();
        }
        
        st_thread_exit(NULL); //退出協程
        
        return NULL;
    }