Spring StateMachine 狀態機引擎在專案中的應用(二)--持久化
背景
每次用到的時候新建立一個狀態機,太奢侈了,官方檔案裡面也提到過這點。
而且創建出來的例項,其狀態也跟當前訂單的不符;spring statemachine暫時不支援每次建立時指定當前狀態,所以對狀態機引擎例項的持久化,就成了必須要考慮的問題。(不過在後續版本有直接指定狀態的方式,這個後面會寫)
擴充套件一下
這裡擴充套件說明一下,狀態機引擎的持久化一直是比較容易引起討論的,因為很多場景並不希望再多儲存一些中間非業務資料,之前在淘寶工作時,淘寶的訂單系統tradeplatform自己實現了一套workflowEngine,其實說白了也就是一套狀態機引擎,所有的配置都放在xml中,每次每個環節的請求過來,都會重新建立一個狀態機引擎例項,並根據當前的訂單狀態來設定引擎例項的狀態。
workflowEngine沒有做持久化,私下裡猜測下這樣實現的原因:1、淘係資料量太大,一天幾千萬筆訂單,額外的資訊儲存就要耗費很多儲存資源;2、完全自主開發的狀態機引擎,可定製化比較強,根據自己的業務需要可以按自己的需要處理。
而反過來,spring statemachine並不支援隨意指定初始狀態,每次建立都是固定的初始化狀態,其實也只是有好處的,標準版流程,而且可以保證安全,每個節點都是按照事先定義好的流程跑下來,而不是隨意指定。所以,狀態機引擎例項的持久化,我們這次的主題,那就繼續聊下去吧。
持久化
spring statemachine 本身支援了記憶體、redis及db的持久化,記憶體持久化就不說了,看原始碼實現就是放在了hashmap裡,平時也沒誰專案中可以這麼奢侈,啥啥都放在記憶體中,而且一旦重啟…..?。下面詳細說下利用redis進行的持久化操作。
依賴引入
spring statemachine 本身是提供了一個redis儲存的元件的,在1.2.10.RELEASE版本中,這個元件需要通過依賴引入,同時需要引入的還有序列化的元件kyro、data-common:
gradle引入依賴 (build.gradle 或者 libraries.gradle,由自己專案的gradle組織方式來定):
compile 'org.springframework.statemachine:spring-statemachine-core:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-data-common:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-kyro:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-redis:1.2.10.RELEASE'複製程式碼
當然如果是maven的話,一樣的,pom.xml如下:
<dependencies>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-data-common</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-kyro</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-redis</artifactId>
<version>1.2.10.RELEASE</version>
</dependency>
</dependencies>複製程式碼
先把持久化的呼叫軌跡說明下
說明:
spring statemachine持久化時,採用了三層結構設計,persister —>persist —>repository。
- 其中persister中封裝了write和restore兩個方法,分別用於持久化寫及反序列化讀出。
- persist只是一層皮,主要還是呼叫repository中的實際實現;但是在這裡,由於redis儲存不保證百分百資料安全,所以我實現了一個自定義的persist,其中封裝了資料寫入db、從db中讀取的邏輯。
- repository中做了兩件事兒
- 序列化/反序列化資料,將引擎例項與二進位制陣列互相轉換
- 讀、寫redis
詳細的實現
Persister
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.statemachine.redis.RedisStateMachinePersister;
@Configuration
public class BizOrderRedisStateMachinePersisterConfig {
@Autowired
private StateMachinePersist bizOrderRedisStateMachineContextPersist;
@Bean(name = "bizOrderRedisStateMachinePersister",autowire = Autowire.BY_TYPE)
public StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister() {
return new RedisStateMachinePersister<>(bizOrderRedisStateMachineContextPersist);
}
}複製程式碼
這裡採用官方samples中初始化的方式,通過@Bean註解來建立一個RedisStateMachinePersister例項,注意其中傳遞進去的Persist為自定義的bizOrderRedisStateMachineContextPersist
Persist
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.kryo.MessageHeadersSerializer;
import org.springframework.statemachine.kryo.StateMachineContextSerializer;
import org.springframework.statemachine.kryo.UUIDSerializer;
import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
import java.util.UUID;
@Component("bizOrderRedisStateMachineContextPersist")
public class BizOrderRedisStateMachineContextPersist implements StateMachinePersist<BizOrderStatusEnum,String> {
@Autowired
@Qualifier("redisStateMachineContextRepository")
private RedisStateMachineContextRepository<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> redisStateMachineContextRepository;
@Autowired
private BizOrderStateMachineContextRepository bizOrderStateMachineContextRepository;
// 加入儲存到DB的資料repository,biz_order_state_machine_context表結構:
// bizOrderId
// contextStr
// curStatus
// updateTime
/**
* Write a {@link StateMachineContext} into a persistent store
* with a context object {@code T}.
*
* @param context the context
* @param contextObj the context ojb
* @throws Exception the exception
*/
@Override
@Transactional
public void write(StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> context,String contextObj) throws Exception {
redisStateMachineContextRepository.save(context,contextObj);
// save to db
BizOrderStateMachineContext queryResult = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);
if (null == queryResult) {
BizOrderStateMachineContext bosmContext = new BizOrderStateMachineContext(contextObj,context.getState().getStatus(),serialize(context));
bizOrderStateMachineContextRepository.insertSelective(bosmContext);
} else {
queryResult.setCurOrderStatus(context.getState().getStatus());
queryResult.setContext(serialize(context));
bizOrderStateMachineContextRepository.updateByPrimaryKeySelective(queryResult);
}
}
/**
* Read a {@link StateMachineContext} from a persistent store
* with a context object {@code T}.
*
* @param contextObj the context ojb
* @return the state machine context
* @throws Exception the exception
*/
@Override
public StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> read(String contextObj) throws Exception {
StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> context = redisStateMachineContextRepository.getContext(contextObj);
//redis 訪快取擊穿
if (null != context && BizOrderConstants.STATE_MACHINE_CONTEXT_ISNULL.equalsIgnoreCase(context.getId())) {
return null;
}
//redis 為空走db
if (null == context) {
BizOrderStateMachineContext boSMContext = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);
if (null != boSMContext) {
context = deserialize(boSMContext.getContext());
redisStateMachineContextRepository.save(context,contextObj);
} else {
context = new StateMachineContextIsNull();
redisStateMachineContextRepository.save(context,contextObj);
}
}
return context;
}
private String serialize(StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> context) throws UnsupportedEncodingException {
Kryo kryo = kryoThreadLocal.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Output output = new Output(out);
kryo.writeObject(output,context);
output.close();
return Base64.getEncoder().encodeToString(out.toByteArray());
}
@SuppressWarnings("unchecked")
private StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> deserialize(String data) throws UnsupportedEncodingException {
if (StringUtils.isEmpty(data)) {
return null;
}
Kryo kryo = kryoThreadLocal.get();
ByteArrayInputStream in = new ByteArrayInputStream(Base64.getDecoder().decode(data));
Input input = new Input(in);
return kryo.readObject(input,StateMachineContext.class);
}
private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {
@SuppressWarnings("rawtypes")
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(StateMachineContext.class,new StateMachineContextSerializer());
kryo.addDefaultSerializer(MessageHeaders.class,new MessageHeadersSerializer());
kryo.addDefaultSerializer(UUID.class,new UUIDSerializer());
return kryo;
}
};
}複製程式碼
說明:
- 如果只是持久化到redis中,那麼BizOrderStateMachineContextRepository相關的所有內容均可刪除。不過由於redis無法承諾百分百的資料安全,所以我這裡做了兩層持久化,redis+db
- 存入redis中的資料預設採用kryo來序列化及反序列化,RedisStateMachineContextRepository中實現了對應程式碼。但是spring statemachine預設的db儲存比較複雜,需要建立多張表,參加下圖:
這裡需要額外建立5張表,分別儲存ActionGuardStateStateMachineTransition,比較複雜。
- 所以這裡建立了一張表bizorderstatemachinecontext,結構很簡單:bizOrderId,contextStr,curStatus,updateTime,其中關鍵是contextStr,用於儲存與redis中相同的內容
Repository
有兩個repository,一個是spring statemachine提供的redisRepo,另一個則是專案中基於mybatis的repo,先是db-repo:
import org.apache.ibatis.annotations.Param;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface BizOrderStateMachineContextRepository {
int deleteByPrimaryKey(Long id);
BizOrderStateMachineContext selectByOrderId(String bizOrderId);
int updateByPrimaryKey(BizOrderStateMachineContext BizOrderStateMachineContext);
int updateByPrimaryKeySelective(BizOrderStateMachineContext BizOrderStateMachineContext);
int insertSelective(BizOrderStateMachineContext BizOrderStateMachineContext);
int selectCount(BizOrderStateMachineContext BizOrderStateMachineContext);
List<BizOrderStateMachineContext> selectPage(@Param("BizOrderStateMachineContext") BizOrderStateMachineContext BizOrderStateMachineContext,@Param("pageable") Pageable pageable);
}複製程式碼
然後是redisRepo
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
@Configuration
public class BizOrderRedisStateMachineRepositoryConfig {
/**
* 接入asgard後,redis的connectionFactory可以通過serviceName + InnerConnectionFactory來注入
*/
@Autowired
private RedisConnectionFactory finOrderRedisInnerConnectionFactory;
@Bean(name = "redisStateMachineContextRepository",autowire = Autowire.BY_TYPE)
public RedisStateMachineContextRepository<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> redisStateMachineContextRepository() {
return new RedisStateMachineContextRepository<>(finOrderRedisInnerConnectionFactory);
}
}
複製程式碼
使用方式
@Autowired
@Qualifier("bizOrderRedisStateMachinePersister")
private StateMachinePersister<BizOrderStatusEnum,String> bizOrderRedisStateMachinePersister;
......
bizOrderRedisStateMachinePersister.persist(stateMachine,request.getBizCode());
......
StateMachine<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> stateMachine
= bizOrderRedisStateMachinePersister.restore(srcStateMachine,statusRequest.getBizCode());
......複製程式碼
支援,關於spring statemachine的持久化就交代完了,下面就是最關鍵的,怎麼利用狀態機來串聯業務,下一節將會詳細描述。