資料匯流排模式【其他模式】
阿新 • • 發佈:2019-01-05
資料匯流排模式
@SuppressWarnings("boxing") public class DataBus { /** * 資料匯流排模式: * Allows send of messages/events between components of an application * without them needing to know about each other. * They only need to know about the type of the message/event being sent. * 允許在應用程式的元件之間傳送訊息,元件之間不需要相互瞭解。 * 他們只需要知道傳送訊息或事件的型別。 */ @Test public void all() { final IDataBus dataBus = DataBusImpl.of(Lists.newArrayList()); dataBus.attach(new MessageMember()); dataBus.attach(new AggregateMemeber()); dataBus.publish(StartEvent.of(System.currentTimeMillis())); dataBus.publish(MessageEvent.of("hello")); dataBus.publish(MessageEvent.of("world")); dataBus.publish(EndEvent.of(System.currentTimeMillis())); } } /** * 1)資料匯流排抽象 */ interface IDataBus { void attach(Member member); void detach(Member member); void publish(DataEvent dataEvent); } /** * 2)參與者抽象 */ interface Member extends Consumer<DataEvent> { IDataBus getDataBus(); void inject(IDataBus dataBus); } /** * 3)事件抽象 */ interface DataEvent { } /** * 4)資料匯流排實現 */ @Value(staticConstructor = "of") class DataBusImpl implements IDataBus { private List<Member> members; @Override public void attach(Member member) { members.add(member); member.inject(this); } @Override public void detach(Member member) { members.remove(member); } @Override public void publish(DataEvent dataEvent) { members.forEach(mem -> mem.accept(dataEvent)); } } /** * 5)具體的事件 */ @Value(staticConstructor = "of") class StartEvent implements DataEvent { private final Long start; } @Value(staticConstructor = "of") class EndEvent implements DataEvent { private final Long end; } @Value(staticConstructor = "of") class MessageEvent implements DataEvent { private final String message; } /** * 6)具體的參與者 */ @Value(staticConstructor = "of") class AggregateEvent implements DataEvent { private final String aggregatedmessage; } abstract class BaseMember implements Member { private IDataBus dataBus; @Override public void inject(IDataBus dataBus) { this.dataBus = dataBus; } @Override public IDataBus getDataBus() { return dataBus; } } @Slf4j class MessageMember extends BaseMember { private Long start; private Long end; private final List<String> messages = Lists.newArrayList();; public void handle(StartEvent t) { start = t.getStart(); messages.clear(); } public void handle(EndEvent t) { end = t.getEnd(); final String join = String.join("-", messages); log.info("start:{} end:{} messages:{}", start, end, join); getDataBus().publish(AggregateEvent.of(join)); } public void handle(MessageEvent t) { messages.add(t.getMessage()); } @Override public void accept(DataEvent t) { if (StartEvent.class.isInstance(t)) { handle((StartEvent) t); } else if (EndEvent.class.isInstance(t)) { handle((EndEvent) t); } else if (MessageEvent.class.isInstance(t)) { handle((MessageEvent) t); } } } @Slf4j class AggregateMemeber extends BaseMember { @Override public void accept(DataEvent t) { if (AggregateEvent.class.isInstance(t)) { final AggregateEvent ae = (AggregateEvent) t; log.info("received:{} ", ae.getAggregatedmessage()); } } }