事件溯源【其他模式】
阿新 • • 發佈:2019-01-05
ray material sso line ren put 任務 can ima
事件溯源
@Slf4j public class EventSourcing { /** * 事件溯源模式: * Instead of storing just the current state of the data in a domain, * use an append-only store to record the full series of actions taken on that data. * The store acts as the system of record and can be used to materialize the domain objects. * This can simplify tasks in complex domains, by avoiding the need to synchronize the data model * and the business domain, while improving performance, scalability, and responsiveness. * It can also provide consistency for transactional data, and maintain full audit trails * and history that can enable compensating actions * 除了只存儲領域模型的當前狀態外,使用 append-only 存儲來記錄對該數據的所有修改操作。 * 該存儲充當記錄系統,可用於實現域對象的具體化。 * 這可以通過避免同步數據模型和業務領域模型,來簡化復雜領域模型中的任務,同時提高性能、可伸縮性和響應能力。 * 它還可以為事務數據提供一致性,並保持完整的審計跟蹤和修改歷史來執行補償操作。 */ /** * The constant ACCOUNT OF DAENERYS. */ public static final int ACCOUNT_OF_DAENERYS = 1; /** * The constant ACCOUNT OF JON. */ public static final int ACCOUNT_OF_JON = 2; @Test public void all() { DomainEventProcessor eventProcessor = new DomainEventProcessor(); log.info("Running the system first time............"); eventProcessor.reset(); log.info("Creating th accounts............"); eventProcessor .process(new AccountCreateEvent(0, new Date().getTime(), ACCOUNT_OF_DAENERYS, "Daenerys Targaryen")); eventProcessor.process(new AccountCreateEvent(1, new Date().getTime(), ACCOUNT_OF_JON, "Jon Snow")); log.info("Do some money operations............"); eventProcessor .process(new MoneyDepositEvent(2, new Date().getTime(), ACCOUNT_OF_DAENERYS, new BigDecimal("100000"))); eventProcessor.process(new MoneyDepositEvent(3, new Date().getTime(), ACCOUNT_OF_JON, new BigDecimal("100"))); eventProcessor.process(new MoneyTransferEvent(4, new Date().getTime(), new BigDecimal("10000"), ACCOUNT_OF_DAENERYS, ACCOUNT_OF_JON)); log.info("...............State:............"); log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString()); log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString()); log.info("At that point system had a shot down, state in memory is cleared............"); AccountAggregate.resetState(); log.info("Recover the system by the events in journal file............"); eventProcessor = new DomainEventProcessor(); eventProcessor.recover(); log.info("...............Recovered State:............"); log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString()); log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString()); } } /** * 1)需要持久化的事件抽象 */ @Data @RequiredArgsConstructor abstract class DomainEvent implements Serializable { private static final long serialVersionUID = 5922894715338132042L; /** * 唯一序列號 */ private final long sequenceId; /** * 創建時間 */ private final long createdTime; /** * 實際的事件類型 */ private final String eventClassName; private boolean realTime = true; public abstract void process(); } class AccountAggregate { private static Map<Integer, Account> accounts = new ConcurrentHashMap<>(); private AccountAggregate() { } public static void putAccount(Account account) { accounts.put(account.getAccountNo(), account); } public static Account getAccount(int accountNo) { final Account account = accounts.get(accountNo); if (account == null) { return null; } return account.copy(); } public static void resetState() { accounts = new ConcurrentHashMap<>(); } } /** * 2)需要持久化的具體事件 */ @Data class AccountCreateEvent extends DomainEvent { private static final long serialVersionUID = -493304186114851718L; private final int accountNo; private final String owner; public AccountCreateEvent(long sequenceId, long createdTime, int accountNo, String owner) { super(sequenceId, createdTime, "AccountCreateEvent"); this.accountNo = accountNo; this.owner = owner; } @Override public void process() { Account account = AccountAggregate.getAccount(accountNo); if (account != null) { throw new RuntimeException("Account already exists"); } account = new Account(accountNo, owner); account.handleEvent(this); } } @Data class MoneyDepositEvent extends DomainEvent { private final BigDecimal money; private final int accountNo; public MoneyDepositEvent(long sequenceId, long createdTime, int accountNo, BigDecimal money) { super(sequenceId, createdTime, "MoneyDepositEvent"); this.money = money; this.accountNo = accountNo; } @Override public void process() { final Account account = AccountAggregate.getAccount(accountNo); if (account == null) { throw new RuntimeException("Account not found"); } account.handleEvent(this); } } @Data class MoneyTransferEvent extends DomainEvent { private static final long serialVersionUID = -5846383677434713494L; private final BigDecimal money; private final int accountNoFrom; private final int accountNoTo; public MoneyTransferEvent(long sequenceId, long createdTime, BigDecimal money, int accountNoFrom, int accountNoTo) { super(sequenceId, createdTime, "MoneyTransferEvent"); this.money = money; this.accountNoFrom = accountNoFrom; this.accountNoTo = accountNoTo; } @Override public void process() { final Account accountFrom = AccountAggregate.getAccount(accountNoFrom); if (accountFrom == null) { throw new RuntimeException("Account not found " + accountNoFrom); } final Account accountTo = AccountAggregate.getAccount(accountNoTo); if (accountTo == null) { throw new RuntimeException("Account not found" + accountTo); } accountFrom.handleTransferFromEvent(this); accountTo.handleTransferToEvent(this); } } @Data @Slf4j class Account { private final int accountNo; private final String owner; private BigDecimal money; public Account(int accountNo, String owner) { this.accountNo = accountNo; this.owner = owner; money = BigDecimal.ZERO; } public Account copy() { final Account account = new Account(accountNo, owner); account.setMoney(money); return account; } private void depositMoney(BigDecimal money) { this.money = this.money.add(money); } private void withdrawMoney(BigDecimal money) { this.money = this.money.subtract(money); } private void handleDeposit(BigDecimal money, boolean realTime) { depositMoney(money); AccountAggregate.putAccount(this); if (realTime) { log.info("Some external api for only realtime execution could be called here."); } } private void handleWithdrawal(BigDecimal money, boolean realTime) { if (this.money.compareTo(money) == -1) { throw new RuntimeException("Insufficient Account Balance"); } withdrawMoney(money); AccountAggregate.putAccount(this); if (realTime) { log.info("Some external api for only realtime execution could be called here."); } } public void handleEvent(AccountCreateEvent accountCreateEvent) { AccountAggregate.putAccount(this); if (accountCreateEvent.isRealTime()) { log.info("Some external api for only realtime execution could be called here."); } } public void handleEvent(MoneyDepositEvent moneyDepositEvent) { handleDeposit(moneyDepositEvent.getMoney(), moneyDepositEvent.isRealTime()); } public void handleTransferFromEvent(MoneyTransferEvent moneyTransferEvent) { handleWithdrawal(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime()); } public void handleTransferToEvent(MoneyTransferEvent moneyTransferEvent) { handleDeposit(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime()); } } /** * 3)事件回溯 */ class JsonFileJournal { private final File aFile; private List<String> events; private int index = 0; public JsonFileJournal() { aFile = new File("Journal.json"); try { events = Files.lines(aFile.toPath()).collect(Collectors.toList()); } catch (final IOException e) { events = Lists.newArrayList(); } } public void write(DomainEvent domainEvent) { final Gson gson = new Gson(); JsonElement jsonElement; if (domainEvent instanceof AccountCreateEvent) { jsonElement = gson.toJsonTree(domainEvent, AccountCreateEvent.class); } else if (domainEvent instanceof MoneyDepositEvent) { jsonElement = gson.toJsonTree(domainEvent, MoneyDepositEvent.class); } else if (domainEvent instanceof MoneyTransferEvent) { jsonElement = gson.toJsonTree(domainEvent, MoneyTransferEvent.class); } else { throw new RuntimeException("Journal Event not recegnized"); } try (Writer output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(aFile, true), "UTF-8"))) { final String eventString = jsonElement.toString(); output.write(eventString + "\r\n"); } catch (final IOException e) { throw new RuntimeException(e); } } public void reset() { aFile.delete(); } public DomainEvent readNext() { if (index >= events.size()) { return null; } final String event = events.get(index); index++; final JsonParser parser = new JsonParser(); final JsonElement jsonElement = parser.parse(event); final String eventClassName = jsonElement.getAsJsonObject().get("eventClassName").getAsString(); final Gson gson = new Gson(); DomainEvent domainEvent; if (eventClassName.equals("AccountCreateEvent")) { domainEvent = gson.fromJson(jsonElement, AccountCreateEvent.class); } else if (eventClassName.equals("MoneyDepositEvent")) { domainEvent = gson.fromJson(jsonElement, MoneyDepositEvent.class); } else if (eventClassName.equals("MoneyTransferEvent")) { domainEvent = gson.fromJson(jsonElement, MoneyTransferEvent.class); } else { throw new RuntimeException("Journal Event not recegnized"); } domainEvent.setRealTime(false); return domainEvent; } } /** * 4)事件持久化和回溯的處理器 */ class DomainEventProcessor { private final JsonFileJournal processorJournal = new JsonFileJournal(); public void process(DomainEvent domainEvent) { domainEvent.process(); processorJournal.write(domainEvent); } public void reset() { processorJournal.reset(); } public void recover() { DomainEvent domainEvent; while (true) { domainEvent = processorJournal.readNext(); if (domainEvent == null) { break; } else { domainEvent.process(); } } } }
事件溯源【其他模式】