如何設計一個實時流計算系統
阿新 • • 發佈:2019-01-30
實時流計算的場景歸納起來多半是:
業務系統根據實時的操作,不斷生成事件(訊息/呼叫),然後引起一系列的處理分析,這個過程是分散在多臺計算機上並行完成的,看上去就像事件連續不斷的流經多個計算節點處理,形成一個實時流計算系統。
市場上流計算產品有很多,主要是通過訊息中樞結合工人模式實現,大致過程如下:
1、開發者實現好流程輸入輸出節點邏輯,上傳job到任務生產者
2、任務生產者將任務傳送到zookeeper,然後監控任務狀態
3、任務消費者從zookeeper上獲取任務
4、任務消費者啟動多個工人程序,每個程序又啟動多個執行緒執行任務
5、工人之間通過zeroMQ互動
我們看看如何做一個簡單的流計算系統,做法跟上面有些不同:
1、首先不過多依賴zookeerper,任務的分配最好直接給到工人,並能直接監控工人完成狀態,這樣效率會更高。
2、工人之間直接通訊,不依賴zeroMQ轉發。
3、並行管理扁平化,多程序下再分多執行緒意義不大,增加管理成本,實際上一臺機器8個程序,每個程序再開8個執行緒,總體跟8-10個程序或者執行緒的效果差不多(數量視機器效能不同)。
4、做成一個流計算系統,而不是平臺。
這裡我們藉助fourinone提供的api和框架去實現,第一次使用可以參考 分散式計算上手demo指南 ,開發包下載地址 http://code.google.com/p/fourinone/
大致思路:用工頭去做任務生產和分配,用工人去做任務執行,為了達到流的效果,需要在工人裡面呼叫工頭的方式,將多個工人節點串起來。
下面程式演示了連續多個訊息先發到一個工人節點A處理,然後再發到兩個工人節點B並行處理的流計算過程,並且獲取到最後處理結果列印輸出(如果不需要獲取結果可以直接返回)。
StreamCtorA:工頭A實現,它獲取到線上工人A,然後將訊息發給它處理,並輪循等待結果。工頭A的main函式模擬了多個訊息的連續呼叫。
StreamWorkerA:工人A實現,它接收到工頭A的訊息進行處理,然後建立一個工頭B,通過工頭B將結果同時發給兩個工人B處理,然後將結果返回工頭A。
StreamCtorB:工頭B實現,它獲取到線上兩個工人B,呼叫doTaskBatch等待兩個工人處理完成,然後返回結果給工人A。
StreamWorkerB:工人B實現,它接收到任務訊息後模擬處理後返回結果。
執行步驟(在本地模擬):
1、啟動ParkServerDemo(它的IP埠已經在配置檔案指定)
java -cp fourinone.jar; ParkServerDemo
2、啟動工人A
java -cp fourinone.jar; StreamWorkerA localhost 2008
3、啟動兩個工人B
java -cp fourinone.jar; StreamWorkerB localhost 2009
java -cp fourinone.jar; StreamWorkerB localhost 2010
4、啟動工頭A
java -cp fourinone.jar; StreamCtorA
多機部署說明:StreamCtorA可以單獨部署一臺機器,StreamWorkerA和StreamCtorB部署一臺機器,兩個StreamWorkerB可以部署兩臺機器。
總結:計算平臺和計算系統的區別
如果我們只有幾臺機器,但是每天有人開發不同的流處理應用要在這幾臺機器上執行,我們需要一個計算平臺來管理好job,讓開發者按照規範配置好流程和執行時節點申請,打包成job上傳,然後平臺根據每個job配置動態分配資源依次執行每個job內容。
如果我們的幾臺機器只為一個流處理業務服務,比如實時營銷,我們需要一個流計算系統,按照業務流程部署好計算節點即可,不需要執行多個job和動態分配資源,按照計算平臺的方式做只會增加複雜性,開發者也不清楚每臺機器上到底運行了什麼邏輯。
如果你想實現一個計算平臺,可以參考 動態部署 和程序管理功能(開發包內有指南)
//完整原始碼
// ParkServerDemo
import com.fourinone.BeanContext;
public class ParkServerDemo
{
public static void main(String[] args)
{
BeanContext.startPark();
}
}
//StreamCtorA
import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;
public class StreamCtorA extends Contractor
{
public WareHouse giveTask(WareHouse inhouse)
{
WorkerLocal[] wks = getWaitingWorkers("StreamWorkerA");
System.out.println("wks.length:"+wks.length);
WareHouse result = wks[0].doTask(inhouse);
while(true){
if(result.getStatus()!=WareHouse.NOTREADY)
{
break;
}
}
return result;
}
public static void main(String[] args)
{
StreamCtorA sc = new StreamCtorA();
for(int i=0;i<10;i++){
WareHouse msg = new WareHouse();
msg.put("msg","hello"+i);
WareHouse wh = sc.giveTask(msg);
System.out.println(wh);
}
sc.exit();
}
}
//StreamWorkerA
import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;
public class StreamWorkerA extends MigrantWorker
{
public WareHouse doTask(WareHouse inhouse)
{
System.out.println(inhouse);
//do something
StreamCtorB sc = new StreamCtorB();
WareHouse msg = new WareHouse();
msg.put("msg",inhouse.getString("msg")+",from StreamWorkerA");
WareHouse wh = sc.giveTask(msg);
sc.exit();
return wh;
}
public static void main(String[] args)
{
StreamWorkerA wd = new StreamWorkerA();
wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerA");
}
}
//StreamCtorB
import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;
public class StreamCtorB extends Contractor
{
public WareHouse giveTask(WareHouse inhouse)
{
WorkerLocal[] wks = getWaitingWorkers("StreamWorkerB");
System.out.println("wks.length:"+wks.length);
WareHouse[] hmarr = doTaskBatch(wks, inhouse);
WareHouse result = new WareHouse();
result.put("B1",hmarr[0]);
result.put("B2",hmarr[1]);
return result;
}
}
//StreamWorkerB
import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;
public class StreamWorkerB extends MigrantWorker
{
public WareHouse doTask(WareHouse inhouse)
{
System.out.println(inhouse);
//do something
inhouse.put("msg",inhouse.getString("msg")+",from StreamWorkerB");
return inhouse;
}
public static void main(String[] args)
{
StreamWorkerB wd = new StreamWorkerB();
wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerB");
}
}