1. 程式人生 > >在storm中使用非Java語言

在storm中使用非Java語言

storm由Java實現,但通過multilang protocl(多語言協議),能夠使用php,python,ruby或者javascript來寫spout和bolt。 多語言協議是storm中實現的一種特殊協議,它使用標準輸入和標準輸出作為與執行spout和bolt任務的程序之間通訊的通道。訊息以json格式或者普通的文字行通過通道傳輸。

多語言協議的實現細節

該協議依賴於作為程序之間通訊通道的標準輸入和標準輸出。一個指令碼想要生效需要採取下列步驟

  • 初始化握手
  • 開始迴圈
  • 讀、寫tuple

初始化握手

要控制程序(啟動或者停止),storm需要知道的正在執行指令碼的程序PID。根據多語言協議,當處理過程開始的第一件事情就是storm會發射一個帶有配置,拓撲上下文和PID目錄的Json物件到標準輸入。它看上去跟下面的程式碼塊差不多:

{
    "conf": {
        "topology.message.timeout.secs": 3,
},
"context": {
    "task->component": {
        "1": "example-spout",
        "2": "__acker",
        "3": "example-bolt"
    },
    "taskid": 3
},
"pidDir": "..."
}

程序必須在pidDir指示的目錄建立一個空檔案,檔名為程序ID,然後將PID作為JSON物件寫到標準輸出。

{"pid" : 1234}

例如,如果接收到/tmp/example\n,指令碼的執行PID為123,那麼建立空檔案/tmp/example/123,並列印行{"pid":123}n和end\n到標準輸出。storm採用這種方式來跟蹤PID,以及在關閉的時候殺死程序。

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

這裡已經實現了函式read_msg,用於從標準輸入讀取訊息。多語言協議中訊息是json格式的單行或多行文字。當storm傳送單行內容為end\n訊息時說明訊息結束。

function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}
function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}
function write_line($line) {
    echo("$line\n");
}

使用flush()是非常關鍵的,有可能由於指定字元數量未滿足導致緩衝區的內容不會flush。這意味指令碼會掛起等待storm輸入,由於storm同樣在等待指令碼輸出,所以指令碼是不會接收到輸入的。所以一定要保證當指令碼輸出內容後立即flush。

開啟迴圈與訊息讀寫

這是最關鍵的一步,因為所有的工作都在這裡完成。這一步的實現取決於要實現spout還是bolt。在spout的情形中,應該傳送訊息。在bolt的情形中,迴圈,讀取訊息,處理它們,然後發射,確認或者失敗。
傳送數字的spout的實現如下。

$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
    $msg = read_msg();
    $cmd = json_decode($msg, true);
    if ($cmd['command']=='next') {
        if ($from<$to) {
            storm_emit(array("$from"));
            $task_ids = read_msg();
            $from++;
        } else {
            sleep(1);
        }
    }
    storm_sync();
}

從命令列引數中獲得from和to,開始迴圈。每當從storm取得訊息next,就說明可以發射新tuple。一旦所有的tuple傳送完成,沒有更多的tuple傳送,掛起。

要保證指令碼準備好下一個tuple,storm在發射下一個之前會等待文字行sync\n。要讀取命令,直接呼叫read_msg()來解碼。在bolts中有些小區別。

while(true) {
    $msg = read_msg();
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
    if (!empty($tuple["id"])) {
        if (isPrime($tuple["tuple"][0])) {
            storm_emit(array($tuple["tuple"][0]));
        }
        storm_ack($tuple["id"]);
    }
}

迴圈,從標準輸入讀取訊息。一旦接收到訊息,json解析。如果是一個tuple,對它做處理,即判斷是否是素數。在任何情形中,都要訊息確認。
在函式json_decode中使用JSON_BIGINT_AS_STRING來規避Java和PHP之間的格式問題。Java中傳送的大數,在PHP中獲取時會丟失精度,而這可能會導致問題。要解決這個問題,告訴PHP將大數當作字串處理,在json訊息中列印時不使用雙引號。PHP 5.4.0 或更高版本必須使用這個引數。

相關資料

《Getting Started with Storm》