1. 程式人生 > >srs程式碼學習(6)--如何實現edge

srs程式碼學習(6)--如何實現edge

sre叢集的方式有兩種一種是forword型別的。一種是edge-origin型別的。後者比前者要複雜的多。就從比較難的開始分析。

有實現edge,首先得在配置檔案中做配置。配置方式如下


這個配置裡配置了實時模式,邊緣模式(remote)

那麼這個配置到了程式碼中是如何生效的呢?

這裡面核心的類還是SrsSource,如果服務設定為邊緣模式,這個類的角色,經歷了從服務到客戶端在到服務的轉變,具體來看。

首先,如果一個客戶端連線上了一個邊緣節點,在走到釋出了play命令後,伺服器會有進入playing()函式,具體如下

int SrsRtmpConn::playing(SrsSource* source)
{
    int ret = ERROR_SUCCESS;
    
    // create consumer of souce.
    SrsConsumer* consumer = NULL;
    if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
        srs_error("create consumer failed. ret=%d", ret);
        return ret;
    }

在create_consumer函式裡,除了建立一個consumer外,服務還會做一個判斷,判斷服務的模式是否是邊緣節點模式,如果是,那麼呼叫邊緣節點的對應函式啟動邊緣節點。
int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
{
.....
 // for edge, when play edge stream, check the state
    if (_srs_config->get_vhost_is_edge(_req->vhost)) {
        // notice edge to start for the first client.
        if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
            srs_error("notice edge start play stream failed. ret=%d", ret);
            return ret;
        }
    }

程式碼如上,控制權轉給了play_edge。那麼這個play_edge是個什麼東西?看看其類圖


很簡單的一個類,看看原始碼

int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

int SrsPlayEdge::on_client_play()
{
    int ret = ERROR_SUCCESS;
    
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
        return ingester->start();
    }

    return ret;
}

其他程式碼基本都是這樣,它只是一個封裝類,真正幹活的是一個叫SrsEdgeIngester* ingester的類。看看這個類的結構圖


裡面有很多變數。有一部分是其上級設定的,比如 _source _edge,這些基本都是有資料給回撥上去的。但有一個關鍵的變數

 SrsReusableThread2* pthread

有了這個變數。說明這個類是可以自持的,就是可以自己執行。那麼它的迴圈函式是

int SrsEdgeIngester::cycle()
{
    int ret = ERROR_SUCCESS;

    _source->on_source_id_changed(_srs_context->get_id());
        
    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
        return ret;
    }
    srs_assert(client);

    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);

    SrsRequest* req = _req;
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
    if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", 
            req->stream.c_str(), stream_id, ret);
        return ret;
    }
    
    if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) {
        return ret;
    }
    
    ret = ingest();
    if (srs_is_client_gracefully_close(ret)) {
        srs_warn("origin disconnected, retry. ret=%d", ret);
        ret = ERROR_SUCCESS;
    }
    
    return ret;
}

真相逐漸明白了,看看真正的迴圈函式ingest()
int SrsEdgeIngester::ingest()
{
    int ret = ERROR_SUCCESS;
    
    client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);
    
    SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
    SrsAutoFree(SrsPithyPrint, pprint);

    while (!pthread->interrupted()) {
        pprint->elapse();
        
        // pithy print
        if (pprint->can_print()) {
            kbps->sample();
            srs_trace("<- "SRS_CONSTS_LOG_EDGE_PLAY
                " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", 
                pprint->age(),
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
        }

        // read from client.
        SrsCommonMessage* msg = NULL;
        if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("pull origin server message failed. ret=%d", ret);
            }
            return ret;
        }
        srs_verbose("edge loop recv message. ret=%d", ret);
        
        srs_assert(msg);
        SrsAutoFree(SrsCommonMessage, msg);
        
        if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
            return ret;
        }
    }
    
    return ret;
}

看到沒,但client收到資料後,交給了process_publish_message()函式。在看看這個函式
int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
{
    int ret = ERROR_SUCCESS;
    
    SrsSource* source = _source;
        
    // process audio packet
    if (msg->header.is_audio()) {
        if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
            srs_error("source process audio message failed. ret=%d", ret);
            return ret;
        }
    }
    
    // process video packet
    if (msg->header.is_video()) {
        if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
            srs_error("source process video message failed. ret=%d", ret);
            return ret;
        }
    }
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
            srs_error("source process aggregate message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }

    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        SrsPacket* pkt = NULL;
        if ((ret = client->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("decode onMetaData message failed. ret=%d", ret);
            return ret;
        }
        SrsAutoFree(SrsPacket, pkt);
    
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
            if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
                srs_error("source process onMetaData message failed. ret=%d", ret);
                return ret;
            }
            srs_info("process onMetaData message success.");
            return ret;
        }
        
        srs_info("ignore AMF0/AMF3 data message.");
        return ret;
    }
    
    return ret;
}

看到沒,這裡把資料這就交個了source。我們上一次已經分析過,在souru拿到資料以後,會直接複製給consumer,而consumer是和客戶端相連的,這樣就可以把資料的轉發就做完了。

從上面的流程可以看出,如果多個客戶請求同一個數據源,只有一個source會被建立,也就意味這一個play_edeg.,這樣就把流量服務轉移到了邊緣伺服器這邊。

相關推薦

srs程式碼學習6--如何實現edge

sre叢集的方式有兩種一種是forword型別的。一種是edge-origin型別的。後者比前者要複雜的多。就從比較難的開始分析。 有實現edge,首先得在配置檔案中做配置。配置方式如下 這個配置裡配置了實時模式,邊緣模式(remote) 那麼這個配置到了程式碼中是如何

srs程式碼學習5--一些與運營相關的技術點

和大規模運營相關的技術點。我想到的有下面四個 1)vhost 2) edge  3) forword 4) log trace  5)reload 6)http-api 7)  statis 下面來一個一個具體分析 vhost ,意思就是virtual host .虛擬主

srs程式碼學習1--listen建立過程

srs的服務偵聽的建立過程。 以rtmp服務為例 srs服務偵聽的建立依靠從上到下的三個類。分別是 SrsServer   SrsStreamListener   SrsTcpListener 埠偵聽過程為 1)main函式中呼叫全域性變數_srs_server的  li

srs程式碼學習9----http_api處理流程

srs提供http相關功能。包括http_server http_api查詢功能和http_heartbeat ,http_callback等。 首先分析http_api,這個提供一系列的http的介面,可以用來查詢伺服器的狀態,但它的重要性並不只限於http查詢,更重要的

LeNet5的一個MATLAB實現程式碼解析6

MNIST資料集下載:http://yann.lecun.com/exdb/mnist/進網站後點擊紅色的連線就可以下載了。 function [I,labels,I_test,labels_test] = readMNIST(num)%讀取MNIST資料集,這

linux命令學習6:ps命令

bytes 釋放 ice cti width kthread hellip 名稱 pts Linux中的ps命令是Process Status的縮寫。ps命令用來列出系統中當前運行的那些進程。ps命令列出的是當前那些進程的快照,就是執行ps命令的那個時刻的那些進程,如果想要

構建之法學習6

客戶 需求 現在 保持 變化 經理 論證 規格 沒有 本周學習的是第六章——敏捷流程 在軟件工程的語境裏,“敏捷流程”是一系列價值觀和方法論的集合。從2001年開始,一些軟件界的專家開始倡導“敏捷”的價值觀和流程,他們肯定了流行做法的價值,但是強調敏捷的做法更能帶來價值。

maven--學習6--MVN命令

arc rgs 測試報告 額外 class sna osi tro 反向 Maven庫: http://repo2.maven.org/maven2/ Maven依賴查詢: http://mvnrepository.com/ 一,Maven常用命令: 1. 創建Mave

java===java基礎學習6---流程控制,for,if,switch,continue,break

nbsp int exception pub ase nio 內部 註意點 多重循環 註意點: for循環的用法和python截然不同,註意格式 switch~,switch對應的case每當執行完畢都要break,由於基本不怎麽用switch,所以作為了解。 中斷流程

perl學習6控制語句

OS body 條件 continue 其他 reac 控制 log int 1:  unless 條件為假時,執行指定的語句   unless…(條件為假執行)…else…(條件為真執行)… 2:  until    循環體一直執行,直到條件為真結束   until ($

區塊鏈學習6區塊鏈

有序 打包成 info 運算 區塊鏈 ash 互連 包含 hash 寫了幾篇區塊鏈的學習筆記,今天來寫寫比特幣中的區塊鏈。比特幣中區塊鏈是由包含交易信息的區塊從後向前有序鏈接起來的數據結構。每個區塊從後向前有序地鏈接在這個鏈條裏,每個區塊都指向前一個區塊。 區塊結構 區塊是

MyBatis學習6

throws 垃圾回收器 安全 cep 正整數 sin ret 關系 行刷新 本視頻觀看地址:https://edu.51cto.com/sd/3ec2c 1、緩存 1.1、緩存的意義 將用戶經常查詢的數據放在緩存(內存)中,用戶去查詢數據就不用從磁盤上(關系型數據庫數據

Arduino學習6

本文介紹Arduino連線並控制步進電機。 連線方式: 程式碼: #define ROTATE(x) {PORTD|=x; PORTD&=(x|0x0F);} //四相單四拍 const char SinBeat[4]={0x80,0x40,0x20,0x10}; /

《Frustum PointNets for 3D Object Detection from RGB-D Data》論文及程式碼學習程式碼部分

《Frustum PointNets for 3D Object Detection from RGB-D Data》論文及程式碼學習(二)程式碼部分 文章目錄 《Frustum PointNets for 3D Object Detection from RG

HTML的學習6

表頭的樣式 雖然表格已經初具雛形,但是樣式單一,我們已經添加了一些樣式表,以使它有點更容易閱讀。這個就是收尾工作。可以隨意的去加入任何的style屬性,你會在CSS課程期間學到更多關於這些東西。如果你想新增多個樣式,你只需要用;分號分隔開就行。 <th style="font-si

spring學習6

mob 常用 scope rda 之間 出現異常 類對象 介紹 資料 1 spring概念 (1)spring核心兩部分 (2)spring一站式框架 (3)spring版本 可以使用基本的javaBean代替EJB,EJB是重量級框架。 1 spring是一個開源的輕量級

基於Visual C++之Windows核心程式設計程式碼分析1實現裝置管理器列舉裝置

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Python學習6——面向物件編輯

1、類和例項 (1)通過定義一個特殊的__init__方法,在建立例項的時候,就把相關屬性綁上去 (2)普通的函式相比,在類中定義的函式第一個引數永遠是例項變數self,並且,呼叫時,不用傳遞該引數 (3)和靜態語言不同,Python允許對例項變數繫結任何資料,也就是說,對於兩個例項

mybatis學習6:快取原理詳解

一、快取原理圖   二、快取原理  一級快取(本地快取) sqlSession級別的快取,一級快取是一直開啟的; SqlSession級別的一個Map              &nb

機器學習6K近鄰演算法

k-近鄰,通過離你最近的來判斷你的類別 例子: 定義:如果一個樣本在特徵空間中的k個最相似(即特徵空間中最鄰近的樣本中大多數屬於某一類別),則該樣本屬於這個類別 K近鄰需要做標準化處理 例如: import numpy as npimport pandas as pdfrom mat