在storm中使用非Java語言
storm由Java實現,但通過multilang protocl(多語言協議),能夠使用php,python,ruby或者javascript來寫spout和bolt。 多語言協議是storm中實現的一種特殊協議,它使用標準輸入和標準輸出作為與執行spout和bolt任務的程序之間通訊的通道。訊息以json格式或者普通的文字行通過通道傳輸。
多語言協議的實現細節
該協議依賴於作為程序之間通訊通道的標準輸入和標準輸出。一個指令碼想要生效需要採取下列步驟
- 初始化握手
- 開始迴圈
- 讀、寫tuple
初始化握手
要控制程序(啟動或者停止),storm需要知道的正在執行指令碼的程序PID。根據多語言協議,當處理過程開始的第一件事情就是storm會發射一個帶有配置,拓撲上下文和PID目錄的Json物件到標準輸入。它看上去跟下面的程式碼塊差不多:
{
"conf": {
"topology.message.timeout.secs": 3,
},
"context": {
"task->component": {
"1": "example-spout",
"2": "__acker",
"3": "example-bolt"
},
"taskid": 3
},
"pidDir": "..."
}
程序必須在pidDir指示的目錄建立一個空檔案,檔名為程序ID,然後將PID作為JSON物件寫到標準輸出。
{"pid" : 1234}
例如,如果接收到/tmp/example\n,指令碼的執行PID為123,那麼建立空檔案/tmp/example/123,並列印行{"pid":123}n和end\n到標準輸出。storm採用這種方式來跟蹤PID,以及在關閉的時候殺死程序。
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
這裡已經實現了函式read_msg,用於從標準輸入讀取訊息。多語言協議中訊息是json格式的單行或多行文字。當storm傳送單行內容為end\n訊息時說明訊息結束。
function read_msg() {
$msg = "";
while(true) {
$l = fgets(STDIN);
$line = substr($l,0,-1);
if($line=="end") {
break;
}
$msg = "$msg$line\n";
}
return substr($msg, 0, -1);
}
function storm_send($json) {
write_line(json_encode($json));
write_line("end");
}
function write_line($line) {
echo("$line\n");
}
使用flush()是非常關鍵的,有可能由於指定字元數量未滿足導致緩衝區的內容不會flush。這意味指令碼會掛起等待storm輸入,由於storm同樣在等待指令碼輸出,所以指令碼是不會接收到輸入的。所以一定要保證當指令碼輸出內容後立即flush。 |
開啟迴圈與訊息讀寫
這是最關鍵的一步,因為所有的工作都在這裡完成。這一步的實現取決於要實現spout還是bolt。在spout的情形中,應該傳送訊息。在bolt的情形中,迴圈,讀取訊息,處理它們,然後發射,確認或者失敗。
傳送數字的spout的實現如下。
$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
$msg = read_msg();
$cmd = json_decode($msg, true);
if ($cmd['command']=='next') {
if ($from<$to) {
storm_emit(array("$from"));
$task_ids = read_msg();
$from++;
} else {
sleep(1);
}
}
storm_sync();
}
從命令列引數中獲得from和to,開始迴圈。每當從storm取得訊息next,就說明可以發射新tuple。一旦所有的tuple傳送完成,沒有更多的tuple傳送,掛起。
要保證指令碼準備好下一個tuple,storm在發射下一個之前會等待文字行sync\n。要讀取命令,直接呼叫read_msg()來解碼。在bolts中有些小區別。
while(true) {
$msg = read_msg();
$tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
if (!empty($tuple["id"])) {
if (isPrime($tuple["tuple"][0])) {
storm_emit(array($tuple["tuple"][0]));
}
storm_ack($tuple["id"]);
}
}
迴圈,從標準輸入讀取訊息。一旦接收到訊息,json解析。如果是一個tuple,對它做處理,即判斷是否是素數。在任何情形中,都要訊息確認。
在函式json_decode中使用JSON_BIGINT_AS_STRING來規避Java和PHP之間的格式問題。Java中傳送的大數,在PHP中獲取時會丟失精度,而這可能會導致問題。要解決這個問題,告訴PHP將大數當作字串處理,在json訊息中列印時不使用雙引號。PHP 5.4.0 或更高版本必須使用這個引數。
相關資料
《Getting Started with Storm》