【bird-java】分散式服務間的事件匯流排EventBus
阿新 • • 發佈:2018-12-29
什麼是EventBus
EventBus是對釋出-訂閱模式的一種實現。其以一種非常優雅的方式實現了元件間的解耦與通訊,在Android開發、DDD等領域都有非常廣泛的應用。
事件流大致如下:
- Producer向EventBus傳送事件。
- EventBus向所有監聽了該事件的Consumer推送事件。
- 監聽了該事件的Consumer消費事件。
注:一個元件即可以是Producer,也可以是Consumer。
分散式服務間的EventBus
在分散式系統中,事件在服務之間的傳遞要比單機EventBus複雜很多。有沒有一種適用於分散式服務之間的,並且事件傳遞就像單機一樣簡單的EventBus呢?在GitHub上搜索了JAVA實現的EventBus,排名前十的幾乎都是用於Android或JAVA的單機事件匯流排。良久之後...還是自己動手吧。叢集環境下的EventBus比單機版需要多考慮一些問題,比如:
- 服務叢集部署的情況下,如何保證每個叢集均可訂閱該事件,且每個叢集只能消費一次該事件。
- 如何實現一個服務內部多個`xxxService`訂閱同一事件。
解決方案:
- 使用`kafka`實現叢集間的釋出訂閱(基於`topic`),同一叢集處於同一個kafka的consumer-group來保證每個叢集只會消費一次該事件。
- 服務在啟動時可反射獲得所有實現了`IEventHandler<TEventArg>`的類並快取,服務消費訊息時獲取所有註冊了該訊息的handler並呼叫其`HandleEvent`方法。
部分關鍵原始碼
1、事件訊息的定義
publicabstract class EventArg implements IEventArg{ private Date eventTime; public EventArg(){ eventTime = new Date(); } public Date getEventTime() { return eventTime; } public void setEventTime(Date eventTime) { this.eventTime = eventTime; } }
事件訊息預設記錄建立時間,可擴充套件其他欄位,比如傳送時間、標識等。
2、使用spring-kafka傳送訊息
/** * kafka事件註冊器,向kafka佇列中push訊息 */ @Component public class KafkaRegister implements IEventRegister { @Autowired(required = false) private KafkaTemplate<String,IEventArg> kafkaTemplate; /** * 事件註冊 * * @param eventArg 事件引數 */ @Override public void regist(IEventArg eventArg) { kafkaTemplate.send(getTopic(eventArg),eventArg); } /** * 獲取kafka的topic * * * @param eventArg * @return topic */ private String getTopic(IEventArg eventArg){ return eventArg.getClass().getName(); } }
3、消費kafka訊息並執行當前服務中所有訂閱了該訊息的事件
/** * kafka事件監聽器 */ public class KafkaEventArgListener implements MessageListener<String,EventArg> { @Autowired private IEventHandlerFactory eventHandlerFactory; @Override public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) { if (consumerRecord == null) return; EventArg value = consumerRecord.value(); Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value); if (handlers == null) return; for (IEventHandler handler : handlers) { handler.HandleEvent(value); } } }
EventBus的使用
1、事件的定義。所有事件均繼承於上文EventArg抽象類,示例如下:
public class TestEventArg extends EventArg{ private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
2、事件釋出。示例程式碼:
eventBus.push(new TestEventArg());
3、事件訂閱。一個服務釋出事件之後,任何服務中的任何`xxxServiceImpl`類均可訂閱該事件,實現`IEventHandler<TEventArg>`介面即可完成事件的訂閱,示例如下:
public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> { @Override public void HandleEvent(TestEventArg eventArg) { System.out.println("notify zero======"); } }