S4系統模型分析和關鍵原始碼讀解
S4(Simple Scalable Stream System) 流資料處理系統是Yahoo!公司提出的,在2011年的時候成為Apache軟體基金下的一個孵化專案,可惜的是在2014年的時候該孵化專案“退休”了,具體原因未知!!從這裡可以瞭解它當前的狀態資訊:link. 閱讀了所發表的論文S4:Distributed Stream Computing Platform之後,發現該系統模型有其獨特之處,總結起來就是
- 靈活
- 系統架構模型和程式設計介面簡單
- 可拓展性強
雖然該專案已經退休,但是因為其”系統架構模型和程式設計介面簡單“,其論文通俗易懂,並且原始碼開放,因此決定從系統模型架構和原始碼兩個方面較為深入地學習它。
文章目錄
接下來先講系統模型架構,然後針對幾個問題通過原始碼來尋找答案,以此來了解S4的實現過程。
系統模型架構
S4系統把流處理的邏輯過程表示成一個或者多個有向無環圖,如上圖所示,在圖中有兩個關鍵組建,Stream和ProcessingElement(PE)。在上圖中Stream表示成帶箭頭的線,負責資料的傳遞;PE表示一個節點,代表一個處理單元,也是使用者業務邏輯所在地。Stream中傳遞的主要為以Event為基類的物件,也就是事件物件。什麼是事件物件呢?從程式設計實現的角度來講,一個事件物件有其時間和其他屬性,這裡的時間通常表示事件所發現的時間。每個Event物件能夠通過兩個欄位資訊一起確定其所屬的Stream,分別是1. Event的型別,在Java中表示為繼承了Event類的子類的Class型別,2. Event的Key值,這個可以為null。這裡的Key和Value跟Hadoop的Key和Value是一個設計思想。舉一個例子
PE作為使用者的邏輯處理單元,需要使用者程式設計實現或者重寫下面幾個主要方法。系統會為使用者隱藏其他細節,比如PE如何接收資料,如何傳送處理完的中間資料等等。
@Override protected void onCreate() {} @Override protected void onRemove() {} public void onEvent(Event event) {} public void onTrigger(Event event) {}
這樣的系統模型,看起來非常簡單且靈活,但是需要考慮並解決一些問題。
一個Stream只會傳輸一種型別的Event物件或者同種型別的Key相同的Event物件。然後可能向一個或者多個PE傳送Event資料。一臺機器中可能有一個或者多個PE,整個系統使用Zookeeper來協調整合。
下面這張圖片是系統的整體架構圖,Processing Node表示一臺機器,在機器接收到一個事件之後,需要根據事件的型別和key值來判斷該事件應該由哪一個PE來處理。在PE處理之後,由Emiter傳送。瞭解了系統模型架構,下面通過原始碼來學習具體的實現。
原始碼下載地址:download, 建議使用Intelij IDEA匯入。
主要包含三個小模組,分別為s4-base, s4-comm和s4-core,第二個模組用於資料傳輸,我們的原始碼基本指涉及到s4-base和s4-core.
programer的程式設計模型
在看原始碼之前,非常有必要了解程式設計人員如何程式設計構建和使用PE。使用者需要繼承ProcessingElement類,然後覆寫和實現onEvent和onTrigger等方法。其中onEvent是事件到來時的處理邏輯,onTrigger是要觸發輸出處理結果的邏輯。觸發條件有兩個,一個是事件數量觸發,另一個是時間觸發。在下面的程式碼onEvent方法中,每來一個事件就計數加一,在onTrigger中,每來一個事件就將結果放到輸出流中。
public class CounterPE extends ProcessingElement {
private Stream<CountEvent>[] countStream;
public CounterPE(App app) {
super(app);
}
public Stream<CountEvent>[] getCountStream() {
return countStream;
}
public void setCountStream(Stream<CountEvent>... countStream) {
this.countStream = countStream;
}
private long counter = 0;
public void onEvent(Event event) {
counter += 1;
}
public void onTrigger(Event event) {
CountEvent countEvent = new CountEvent(getId(), counter);
emit(countEvent, countStream);
}
@Override
protected void onCreate() {
}
@Override
protected void onRemove() {
}
}
系統實現邏輯
- Event類是所有事件類的基類,是該系統的基礎類。一個事件主要包含事件和其他儲存在Map中的屬性。
public class Event {
final private long time;
private String streamName;
private int appId;
private Map<String, Data<?>> map;
在Stream中,Event物件進一步被封裝成EventMessage物件,顧名思義,EventMessage物件類似於網際網路的一個訊息報文,相比於Event物件,其主要是增加了資料在不同PE、不同機器上面傳輸所需要的定位屬性。從下面原始碼的定義看出,一個EventMessage能夠根據appName和streamName來定位資料的接收者。
public class EventMessage {
private String appName;
private String streamName;
private byte[] serializedEvent;
一個Stream物件只容納一種型別的事件資料,使用的資料結構為ArrayBlockingQueue阻塞佇列,
protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
其特點是,當佇列為空的時候,如果要獲取,阻塞take的執行緒,直到有新的資料;如果佇列滿了,阻塞put的執行緒,直到佇列不滿。
- Stream類主要欄位為如下所示,具體請看註釋。一個stream只儲存一種型別的事件資料,儲存在一個阻塞佇列中,專門使用一個執行緒不斷地讀取佇列中地資料並呼叫PE的方法處理。下面我們來具體看Stream類如何將事件資料儲存到佇列中,以及如何讀取和處理佇列中的資料。
public class Stream<T extends Event> implements Runnable, Streamable {
protected Key<T> key; //該物件定義了獲取Event物件的key的方法
private ProcessingElement[] targetPEs;//一個Stream中的資料可能流向多個PE
protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
private Thread thread;//新開一個執行緒不斷地從阻塞佇列中讀取事件
private Class<T> eventType = null;//儲存該Stream的事件型別
Stream作為連線不同PE或者說不同節點的組建,所以它具有兩個重要的功能,1是接收資料,2是傳送由PE處理後的資料。
- 接收資料
Stream中的接收event的方法很簡單,只是簡單地把event物件放入阻塞佇列中。因為會有很多個Stream物件,因此使用了Reciever類來統一管理所有的Stream類的receiveEvent方法。也就是下面的receiveEvent方法在Reciever類中被呼叫。
public void receiveEvent(EventMessage event) {
try {
queue.put(event);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
Thread.currentThread().interrupt();
}
}
Reciever類實現了Runnable介面,下面的方法是Reciever類的run方法。在該方法中,首先通過appName和streamName來找到對應的Stream物件,然後呼叫其receiveEvent方法。
public void run() {
// TODO: this thread never seems to get interrupted. SHould we catch an interrupted exception from listener
// here?
while (!Thread.interrupted()) {
byte[] message = listener.recv();
EventMessage event = (EventMessage) serDeser.deserialize(message);
int appId = Integer.valueOf(event.getAppName());
String streamId = event.getStreamName();
/*
* Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:
* make this more efficient for the case in which we send the same event to multiple PEs.
*/
try {
streams.get(appId).get(streamId).receiveEvent(event);
} catch (NullPointerException e) {
logger.error("Could not find target stream for event with appId={} and streamId={}", appId, streamId);
}
}
}
小結:統一使用Reciever來接收event物件,通過appName和streamName找到對應的Stream物件,然後呼叫Stream物件的recieveEvent方法,將event物件放入queue中。
- 傳送資料
下面的函式用於傳送一個Event物件給拓撲圖中下一層節點,這裡的下一層節點有可能包含當前Stream類的阻塞佇列queue。Stream中的put方法表示了該傳送的邏輯。在該方法中,首先設定事件的streamName和App Name。然後檢測該Stream是否跟遠端的節點關聯,如果沒有,直接把event放入當前的queue中,如果有,使用sender.sendToRemotePartitions(event)
來將event傳送到遠端佇列中,同時put到本地的當前的queue中。下面將會講sender.sendToRemotePartitions(event)
方法。
public void put(Event event) {
try {
event.setStreamId(getName());
event.setAppId(app.getId());
/*
* Events may be sent to local or remote partitions or both. The following code implements the logic.
*/
if (key != null) {
/*
* We send to a specific PE instance using the key but we don't know if the target partition is remote
* or local. We need to ask the sender.
*/
if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
/*
* Sender checked and decided that the target is local so we simply put the event in the queue and
* we save the trip over the network.
*/
queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app
.getSerDeser().serialize(event)));
}
} else {
/*
* We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every
* node. The sender method takes care of the remote partitions an we take care of putting the event into
* the queue.
*/
sender.sendToRemotePartitions(event);
queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser()
.serialize(event)));
}
} catch (InterruptedException e) {
logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
Thread.currentThread().interrupt();
}
}
該方法用於講一個event物件傳送到拓撲圖中的下一層的節點。如何定位下一層的節點呢?這裡只用到了AppID (跟App Name相同)和stream Name,這兩個資訊便可以定位了。每一個Event物件都包含這兩個屬性。
emitter.send
方法將event物件傳送到下一個遠端節點,目前有TCP和UDP兩種實現。
public void sendToRemotePartitions(Event event) {
for (int i = 0; i < emitter.getPartitionCount(); i++) {
/* Don't use the comm layer when we send to the same partition. */
if (localPartitionId != i)
emitter.send(i, new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser .serialize(event)));
}
}
講到這裡,有一個問題,Stream類的傳送evnet物件的方法put方法最後由誰來呼叫呢?答案是先由PE來呼叫,最終由使用者在PE繼承類中呼叫。下面是ProcessingElement類emit方法的程式碼。該方法沒有使用到PE類的成員變數,這個需要注意,一個PE類沒有將Stream作為成員變數。emit方法只是簡單地遍歷所傳入的Stream物件陣列,然後呼叫它們的put方法,該方法在上面講過。而最終這個emit方法將由使用者呼叫,在其所繼承的ProcessingElement類中,具體請看上一節程式碼例子。
//Helper method to be used by PE implementation classes. Sends an event to all the target streams.
protected <T extends Event> void emit(T event, Stream<T>[] streamArray) {
for (int i = 0; i < streamArray.length; i++) {
streamArray[i].put(event);
}
}
現在,我們知道了一個流如何獲取event資料以及如何傳送event資料,現在資料已經儲存在一個阻塞佇列中了,那麼它是怎麼從佇列中獲取資料然後處理的呢?
Event資料處理
Stream類中有一個成員變數Thread thread, 用於不斷地從queue中獲取和處理資料。因此,處理邏輯放在run方法中,下面是run方法原始碼,其主要是採用阻塞的方式不斷從queue中獲取資料,然後呼叫相應的PE的pe.handleInputEvent(event)
方法。那麼pe.handleInputEvent(event);
方法又做了什麼呢?
@Override
public void run() {
while (true) {
try {
/* Get oldest event in queue. */
EventMessage eventMessage = queue.take();
@SuppressWarnings("unchecked")
T event = (T) app.getSerDeser().deserialize(eventMessage.getSerializedEvent());
/* Send event to each target PE. */
for (int i = 0; i < targetPEs.length; i++) {
if (key == null) {
/* Broadcast to all PE instances! */
/* STEP 1: find all PE instances. */
Collection<ProcessingElement> pes = targetPEs[i].getInstances();
/* STEP 2: iterate and pass event to PE instance. */
for (ProcessingElement pe : pes) {
pe.handleInputEvent(event);
}
} else {
/* We have a key, send to target PE. */
/* STEP 1: find the PE instance for key. */
ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));
/* STEP 2: pass event to PE instance. */
pe.handleInputEvent(event);
}
}
} catch (InterruptedException e) {
logger.info("Closing stream {}.", name);
receiver.removeStream(this);
Thread.currentThread().interrupt();
return;
}
}
}
PE的handleInputEvent
方法主要做了兩件事,1是在第一次呼叫該方法的時候,判斷是否有該PE的持久化歷史紀錄,如果有的話就恢復;2是呼叫OverloadDispatcher類的dispatchEvent和dispatchTrigger兩個方法。這兩個方法將呼叫由使用者實現的onEvent和onTrigger方法。onEvent和onTrigger方法在上一節講過,它們是使用者的邏輯處理單元。那麼,dispatchEvent和dispatchTrigger兩個方法是什麼樣子的呢?
protected void handleInputEvent(Event event) {
Object object;
if (isThreadSafe) {
object = new Object(); // a dummy object TODO improve this.
} else {
object = this;
}
synchronized (object) {
if (!recoveryAttempted) {
recover();
recoveryAttempted = true;
}
/* Dispatch onEvent() method. */
overloadDispatcher.dispatchEvent(this, event);
/* Dispatch onTrigger() method. */
if (haveTriggers && isTrigger(event)) {
overloadDispatcher.dispatchTrigger(this, event);
}
eventCount++;
dirty = true;
if (isCheckpointable()) {
checkpoint();
}
}
}
進入OverloadDispatcher 的定義,發現它是一個介面,而且沒有找到實現類。有這麼一行註釋Implementations of this interface are typically generated at runtime.
。原來該介面的實現類是動態實現的。使用的第三方工具包為org.objectweb.asm
。我們現在已經明確,下面的兩個方法將會呼叫由使用者定義的onEvent和onTrigger方法,呼叫的方法體是動態實現的。那麼問題有:為什麼使用動態的方式生成它們的實現方法呢?
public interface OverloadDispatcher {
public void dispatchEvent(ProcessingElement pe, Event event);
public void dispatchTrigger(ProcessingElement pe, Event event);
}
在回答這個問題之前,我們需要考慮另外一個問題,程式設計人員定義了一個繼承了ProcessingElement的類MyPE,實現了onEvent和onTrigger方法,S4系統程式設計人員在釋出系統之前是不知道使用者的PE繼承類的,那麼系統如何知道所繼承的類是MyPE,並且如何找到並載入MyPE類並呼叫onEvent和onTrigger方法呢?為了解決這個問題,S4使用了動態生成程式碼的方法。動態生成程式碼的類為org.apache.s4.core.gen.OverloadDispatcherGenerator
。這樣動態生成實現類的方式到底優雅不優雅呢?效能如何?有待考量。
多執行緒的使用
系統中使用到的多執行緒主要有:
- 讀取Stream類中的阻塞佇列中的資料的時候,使用了專門的執行緒
- Reciever類實現了Runnable介面,用於接收資料,並把資料儲存到對應的Stream中。
- 傳送資料的時候,使用到了NIO之Socket NIO,使用到了Netty框架。
關鍵組建的關係
一個Stream物件包含多個目標PE,對於每一個到來的event物件,根據事件型別或者事件型別+事件的key來確定相應的PE。然後呼叫PE的處理邏輯方法。
Reciever物件儲存了單個node下的所有的Stream物件引用,它統一接收event資料,然後根據事件型別或者事件型別+事件的key來確定相應的Stream物件。
問題
-
問題: 一個PE能否處理多中Event?
回答:一個PE只能處理一種型別的Event,但是可以處理同種型別的不同key的event。 -
問題:如果按照上面第一張圖片所示的例子,從上往下第二次拓撲中,一個單詞生成對應的一個PE,那麼最終會生成大量的PE,所佔用的記憶體也是很可觀的。有一些PE在生成之後很可能只用到了幾次,一直佔用這記憶體。
-
Event類是該系統最基礎的類,其使用到了Map作為成員變數,而map在put一個數據之後就會有一個預設初始化大小,如果map中只有一個數據,那麼意味著存在多個空的記憶體佔用。這導致一個Event的記憶體使用率很低。這樣的設計是不是有失優雅呢?
public class Event {
final private long time;
private String streamName;
private int appId;
private Map<String, Data<?>> map;
謝謝!