1. 程式人生 > 程式設計 >Spring StateMachine 狀態機引擎在專案中的應用(二)--持久化

Spring StateMachine 狀態機引擎在專案中的應用(二)--持久化

背景

每次用到的時候新建立一個狀態機,太奢侈了,官方檔案裡面也提到過這點。

而且創建出來的例項,其狀態也跟當前訂單的不符;spring statemachine暫時不支援每次建立時指定當前狀態,所以對狀態機引擎例項的持久化,就成了必須要考慮的問題。(不過在後續版本有直接指定狀態的方式,這個後面會寫)

擴充套件一下

這裡擴充套件說明一下,狀態機引擎的持久化一直是比較容易引起討論的,因為很多場景並不希望再多儲存一些中間非業務資料,之前在淘寶工作時,淘寶的訂單系統tradeplatform自己實現了一套workflowEngine,其實說白了也就是一套狀態機引擎,所有的配置都放在xml中,每次每個環節的請求過來,都會重新建立一個狀態機引擎例項,並根據當前的訂單狀態來設定引擎例項的狀態。

workflowEngine沒有做持久化,私下裡猜測下這樣實現的原因:1、淘係資料量太大,一天幾千萬筆訂單,額外的資訊儲存就要耗費很多儲存資源;2、完全自主開發的狀態機引擎,可定製化比較強,根據自己的業務需要可以按自己的需要處理。

而反過來,spring statemachine並不支援隨意指定初始狀態,每次建立都是固定的初始化狀態,其實也只是有好處的,標準版流程,而且可以保證安全,每個節點都是按照事先定義好的流程跑下來,而不是隨意指定。所以,狀態機引擎例項的持久化,我們這次的主題,那就繼續聊下去吧。

持久化

spring statemachine 本身支援了記憶體、redis及db的持久化,記憶體持久化就不說了,看原始碼實現就是放在了hashmap裡,平時也沒誰專案中可以這麼奢侈,啥啥都放在記憶體中,而且一旦重啟…..?。下面詳細說下利用redis進行的持久化操作。

依賴引入

spring statemachine 本身是提供了一個redis儲存的元件的,在1.2.10.RELEASE版本中,這個元件需要通過依賴引入,同時需要引入的還有序列化的元件kyro、data-common:

gradle引入依賴 (build.gradle 或者 libraries.gradle,由自己專案的gradle組織方式來定):

compile 'org.springframework.statemachine:spring-statemachine-core:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-data-common:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-kyro:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-redis:1.2.10.RELEASE'複製程式碼

當然如果是maven的話,一樣的,pom.xml如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-core</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-data-common</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-kyro</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-redis</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
</dependencies>複製程式碼

先把持久化的呼叫軌跡說明下

spring-statemachine-持久化.png

說明:

spring statemachine持久化時,採用了三層結構設計,persister —>persist —>repository。

  • 其中persister中封裝了write和restore兩個方法,分別用於持久化寫及反序列化讀出。
  • persist只是一層皮,主要還是呼叫repository中的實際實現;但是在這裡,由於redis儲存不保證百分百資料安全,所以我實現了一個自定義的persist,其中封裝了資料寫入db、從db中讀取的邏輯。
  • repository中做了兩件事兒
    • 序列化/反序列化資料,將引擎例項與二進位制陣列互相轉換
    • 讀、寫redis
詳細的實現

Persister

import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.statemachine.redis.RedisStateMachinePersister;

@Configuration
public class BizOrderRedisStateMachinePersisterConfig {

    @Autowired
    private StateMachinePersist bizOrderRedisStateMachineContextPersist;

    @Bean(name = "bizOrderRedisStateMachinePersister",autowire = Autowire.BY_TYPE)
    public StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister() {
        return new RedisStateMachinePersister<>(bizOrderRedisStateMachineContextPersist);
    }

}複製程式碼

這裡採用官方samples中初始化的方式,通過@Bean註解來建立一個RedisStateMachinePersister例項,注意其中傳遞進去的Persist為自定義的bizOrderRedisStateMachineContextPersist

Persist

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.kryo.MessageHeadersSerializer;
import org.springframework.statemachine.kryo.StateMachineContextSerializer;
import org.springframework.statemachine.kryo.UUIDSerializer;
import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
import java.util.UUID;

@Component("bizOrderRedisStateMachineContextPersist")
public class BizOrderRedisStateMachineContextPersist implements StateMachinePersist<BizOrderStatusEnum,String> {

    @Autowired
    @Qualifier("redisStateMachineContextRepository")
    private RedisStateMachineContextRepository<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> redisStateMachineContextRepository;

    @Autowired
    private BizOrderStateMachineContextRepository bizOrderStateMachineContextRepository;

    //  加入儲存到DB的資料repository,biz_order_state_machine_context表結構:
    //  bizOrderId
    //  contextStr
    //  curStatus
    //  updateTime

    /**
     * Write a {@link StateMachineContext} into a persistent store
     * with a context object {@code T}.
     *
     * @param context    the context
     * @param contextObj the context ojb
     * @throws Exception the exception
     */
    @Override
    @Transactional
    public void write(StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> context,String contextObj) throws Exception {

        redisStateMachineContextRepository.save(context,contextObj);
        //  save to db
        BizOrderStateMachineContext queryResult = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);

        if (null == queryResult) {
            BizOrderStateMachineContext bosmContext = new BizOrderStateMachineContext(contextObj,context.getState().getStatus(),serialize(context));
            bizOrderStateMachineContextRepository.insertSelective(bosmContext);
        } else {
            queryResult.setCurOrderStatus(context.getState().getStatus());
            queryResult.setContext(serialize(context));
            bizOrderStateMachineContextRepository.updateByPrimaryKeySelective(queryResult);
        }
    }

    /**
     * Read a {@link StateMachineContext} from a persistent store
     * with a context object {@code T}.
     *
     * @param contextObj the context ojb
     * @return the state machine context
     * @throws Exception the exception
     */
    @Override
    public StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> read(String contextObj) throws Exception {

        StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> context = redisStateMachineContextRepository.getContext(contextObj);
        //redis 訪快取擊穿
        if (null != context && BizOrderConstants.STATE_MACHINE_CONTEXT_ISNULL.equalsIgnoreCase(context.getId())) {
            return null;
        }
        //redis 為空走db
        if (null == context) {
            BizOrderStateMachineContext boSMContext = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);
            if (null != boSMContext) {
                context = deserialize(boSMContext.getContext());
                redisStateMachineContextRepository.save(context,contextObj);
            } else {
                context = new StateMachineContextIsNull();
                redisStateMachineContextRepository.save(context,contextObj);
            }
        }
        return context;
    }

    private String serialize(StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> context) throws UnsupportedEncodingException {
        Kryo kryo = kryoThreadLocal.get();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Output output = new Output(out);
        kryo.writeObject(output,context);
        output.close();
        return Base64.getEncoder().encodeToString(out.toByteArray());
    }

    @SuppressWarnings("unchecked")
    private StateMachineContext<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> deserialize(String data) throws UnsupportedEncodingException {
        if (StringUtils.isEmpty(data)) {
            return null;
        }
        Kryo kryo = kryoThreadLocal.get();
        ByteArrayInputStream in = new ByteArrayInputStream(Base64.getDecoder().decode(data));
        Input input = new Input(in);
        return kryo.readObject(input,StateMachineContext.class);
    }

    private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {

        @SuppressWarnings("rawtypes")
        @Override
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            kryo.addDefaultSerializer(StateMachineContext.class,new StateMachineContextSerializer());
            kryo.addDefaultSerializer(MessageHeaders.class,new MessageHeadersSerializer());
            kryo.addDefaultSerializer(UUID.class,new UUIDSerializer());
            return kryo;
        }
    };
}複製程式碼

說明:

  1. 如果只是持久化到redis中,那麼BizOrderStateMachineContextRepository相關的所有內容均可刪除。不過由於redis無法承諾百分百的資料安全,所以我這裡做了兩層持久化,redis+db
  2. 存入redis中的資料預設採用kryo來序列化及反序列化,RedisStateMachineContextRepository中實現了對應程式碼。但是spring statemachine預設的db儲存比較複雜,需要建立多張表,參加下圖:

jpa-table.png

這裡需要額外建立5張表,分別儲存ActionGuardStateStateMachineTransition,比較複雜。

  1. 所以這裡建立了一張表bizorderstatemachinecontext,結構很簡單:bizOrderId,contextStr,curStatus,updateTime,其中關鍵是contextStr,用於儲存與redis中相同的內容
Repository

有兩個repository,一個是spring statemachine提供的redisRepo,另一個則是專案中基於mybatis的repo,先是db-repo:

   import org.apache.ibatis.annotations.Param;
   import org.springframework.data.domain.Pageable;
   import org.springframework.stereotype.Repository;
   
   import java.util.List;
   
   @Repository
   public interface BizOrderStateMachineContextRepository {
   
         int deleteByPrimaryKey(Long id);
       
       BizOrderStateMachineContext selectByOrderId(String bizOrderId);
       
       int updateByPrimaryKey(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       int updateByPrimaryKeySelective(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       int insertSelective(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       int selectCount(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       List<BizOrderStateMachineContext> selectPage(@Param("BizOrderStateMachineContext") BizOrderStateMachineContext BizOrderStateMachineContext,@Param("pageable") Pageable pageable);
       
   }複製程式碼

然後是redisRepo

   import org.springframework.beans.factory.annotation.Autowire;
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.context.annotation.Bean;
   import org.springframework.context.annotation.Configuration;
   import org.springframework.data.redis.connection.RedisConnectionFactory;
   import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
   
   @Configuration
   public class BizOrderRedisStateMachineRepositoryConfig {
   
       /**
        * 接入asgard後,redis的connectionFactory可以通過serviceName + InnerConnectionFactory來注入
        */
       @Autowired
       private RedisConnectionFactory finOrderRedisInnerConnectionFactory;
   
       @Bean(name = "redisStateMachineContextRepository",autowire = Autowire.BY_TYPE)
       public RedisStateMachineContextRepository<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> redisStateMachineContextRepository() {
   
           return new RedisStateMachineContextRepository<>(finOrderRedisInnerConnectionFactory);
       }
   }
   複製程式碼

使用方式

    @Autowired
    @Qualifier("bizOrderRedisStateMachinePersister")
    private StateMachinePersister<BizOrderStatusEnum,String> bizOrderRedisStateMachinePersister;
   
   ......
     bizOrderRedisStateMachinePersister.persist(stateMachine,request.getBizCode());
   ......
     StateMachine<BizOrderStatusEnum,BizOrderStatusChangeEventEnum> stateMachine
                   =      bizOrderRedisStateMachinePersister.restore(srcStateMachine,statusRequest.getBizCode());
   ......複製程式碼

支援,關於spring statemachine的持久化就交代完了,下面就是最關鍵的,怎麼利用狀態機來串聯業務,下一節將會詳細描述。