1. 程式人生 > 其它 >基於 Spring 實現管道模式的最佳實踐

基於 Spring 實現管道模式的最佳實踐

本篇為設計模式第二篇,第一篇可見設計模式最佳套路 —— 愉快地使用策略模式

管道模式(Pipeline Pattern) 是責任鏈模式(Chain of Responsibility Pattern)的常用變體之一。在管道模式中,管道扮演著流水線的角色,將資料傳遞到一個加工處理序列中,資料在每個步驟中被加工處理後,傳遞到下一個步驟進行加工處理,直到全部步驟處理完畢。

PS:純的責任鏈模式在鏈上只會有一個處理器用於處理資料,而管道模式上多個處理器都會處理資料。

何時使用管道模式
任務程式碼較為複雜,需要拆分為多個子步驟時,尤其是後續可能在任意位置新增新的子步驟、刪除舊的子步驟、交換子步驟順序,可以考慮使用管道模式。

你說的都對

愉快地使用管道模式

▐ 背景回放
最開始做模型平臺的時候,建立模型例項的功能,包括:“輸入資料校驗 -> 根據輸入建立模型例項 -> 儲存模型例項到相關 DB 表”總共三個步驟,也不算複雜,所以當時的程式碼大概是這樣的:

public class ModelServiceImpl implements ModelService {

/**
 * 提交模型(構建模型例項)
 */
public CommonReponse<Long> buildModelInstance(InstanceBuildRequest request) {
    // 輸入資料校驗
    validateInput(request);
    // 根據輸入建立模型例項
    ModelInstance instance = createModelInstance(request);
    // 儲存例項到相關 DB 表
    saveInstance(instance);
}

}
然而沒有過多久,我們發現表單輸入資料的格式並不完全符合模型的輸入要求,於是我們要加入 “表單資料的預處理”。這功能還沒動手呢,又有業務方提出自己也存在需要對資料進行處理的情況(比如根據商家的表單輸入,生成一些其他業務資料作為模型輸入)。

所以在 “輸入資料校驗” 之後,還需要加入 “表單輸入輸出預處理” 和 “業務方自定義資料處理(可選)”。這個時候我就面臨一個選擇:是否繼續通過在 buildModelInstance 中加入新的方法來實現這些新的處理步驟?好處就是可以當下偷懶,但是壞處呢:

ModelService 應該只用來接收 HSF 請求,而不應該承載業務邏輯,如果將 提交模型 的邏輯都寫在這個類當中,違反了 單一職責,而且後面會導致 類程式碼爆炸

將來每加入一個新的處理步驟或者刪除某個步驟,我就要修改 buildModelInstance 這個本應該非常內聚的方法,違反了 開閉原則

所以,為了不給以後的自己挖坑,我覺得要思考一個萬全的方案。這個時候,我小腦袋花開始飛轉,突然閃過了 Netty 中的 ChannelPipeline —— 對哦,管道模式,不就正是我需要的嘛!

管道模式的實現方式也是多種多樣,接下來基於前面的背景,我分享一下我目前基於 Spring 實現管道模式的 “最佳套路”(如果你有更好的套路,歡迎賜教,一起討論哦)。

沒時間解釋了快上車.jpg

▐ 定義管道處理的上下文
/**

  • 傳遞到管道的上下文
    */
    @Getter
    @Setter
    public class PipelineContext {

    /**

    • 處理開始時間
      */
      private LocalDateTime startTime;

    /**

    • 處理結束時間
      */
      private LocalDateTime endTime;

    /**

    • 獲取資料名稱
      */
      public String getName() {
      return this.getClass().getSimpleName();
      }
      }
      ▐ 定義上下文處理器

/**

  • 管道中的上下文處理器
    */
    public interface ContextHandler {

    /**

    • 處理輸入的上下文資料
    • @param context 處理時的上下文資料
    • @return 返回 true 則表示由下一個 ContextHandler 繼續處理,返回 false 則表示處理結束
      */
      boolean handle(T context);
      }
      為了方便說明,我們現在先定義出最早版 【提交模型邏輯】 的上下文和相關處理器:

/**

  • 模型例項構建的上下文
    */
    @Getter
    @Setter
    public class InstanceBuildContext extends PipelineContext {

    /**

    • 模型 id
      */
      private Long modelId;

    /**

    • 使用者 id
      */
      private long userId;

    /**

    • 表單輸入
      */
      private Map<String, Object> formInput;

    /**

    • 儲存模型例項完成後,記錄下 id
      */
      private Long instanceId;

    /**

    • 模型創建出錯時的錯誤資訊
      */
      private String errorMsg;

    // 其他引數

    @Override
    public String getName() {
    return “模型例項構建上下文”;
    }
    }
    處理器 - 輸入資料校驗:

@Component
public class InputDataPreChecker implements ContextHandler {

private final Logger logger = LoggerFactory.getLogger(this.getClass());


@Override
public boolean handle(InstanceBuildContext context) {
    logger.info("--輸入資料校驗--");


    Map<String, Object> formInput = context.getFormInput();


    if (MapUtils.isEmpty(formInput)) {
        context.setErrorMsg("表單輸入資料不能為空");
        return false;
    }


    String instanceName = (String) formInput.get("instanceName");


    if (StringUtils.isBlank(instanceName)) {
        context.setErrorMsg("表單輸入資料必須包含例項名稱");
        return false;
    }


    return true;
}

}
處理器 - 根據輸入建立模型例項:

@Component
public class ModelInstanceCreator implements ContextHandler {

private final Logger logger = LoggerFactory.getLogger(this.getClass());


@Override
public boolean handle(InstanceBuildContext context) {
    logger.info("--根據輸入資料建立模型例項--");


    // 假裝建立模型例項


    return true;
}

}
處理器 - 儲存模型例項到相關DB表:

@Component
public class ModelInstanceSaver implements ContextHandler {

private final Logger logger = LoggerFactory.getLogger(this.getClass());


@Override
public boolean handle(InstanceBuildContext context) {
    logger.info("--儲存模型例項到相關DB表--");


    // 假裝儲存模型例項


    return true;
}

}
到這裡,有個問題就出現了:應該使用什麼樣的方式,將同一種 Context 的 ContextHandler 串聯為管道呢?思考一下:

給 ContextHandler 加一個 setNext 方法,每個實現類必須指定其下一個處理器。缺點也很明顯,如果在當前管道中間加入一個新的 ContextHandler,那麼要勢必要修改前一個 ContextHandler 的 setNext 方法;另外,程式碼是寫給人閱讀的,這樣做沒法一眼就直觀的知道整個管道的處理鏈路,還要進入到每個相關的 ContextHandler 中去檢視才知道。

給 ContextHandler 加上 @Order 註解,根據 @Order 中給定的數字來確定每個 ContextHandler 的序列,一開始時每個數字間隔的可以大些(比如 10、20、30),後續加入新的 ContextHandler 時,可以指定數字為 (11、21、31)這種,那麼可以避免上面方案中要修改程式碼的問題,但是仍然無法避免要進入每個相關的 ContextHandler 中去檢視才能知道管道處理鏈路的問題。

提前寫好一份路由表,指定好 ”Context -> 管道“ 的對映(管道用 List 來表示),以及管道中處理器的順序 。Spring 來根據這份路由表,在啟動時就構建好一個 Map,Map 的鍵為 Context 的型別,值為 管道(即 List)。這樣的話,如果想知道每個管道的處理鏈路,直接看這份路由表就行,一目瞭然。缺點嘛,就是每次加入新的 ContextHandler 時,這份路由表也需要在對應管道上進行小改動 —— 但是如果能讓閱讀程式碼更清晰,我覺得這樣的修改是值得的、可接受的~

▐ 構建管道路由表
基於 Spring 的 Java Bean 配置,我們可以很方便的構建管道的路由表:

/**

  • 管道路由的配置
    */
    @Configuration
    public class PipelineRouteConfig implements ApplicationContextAware {

    /**

    • 資料型別->管道中處理器型別列表 的路由
      */
      private static final
      Map<Class<? extends PipelineContext>,
      List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);

    /*

    • 在這裡配置各種上下文型別對應的處理管道:鍵為上下文型別,值為處理器型別的列表
      */
      static {
      PIPELINE_ROUTE_MAP.put(InstanceBuildContext.class,
      Arrays.asList(
      InputDataPreChecker.class,
      ModelInstanceCreator.class,
      ModelInstanceSaver.class
      ));

      // 將來其他 Context 的管道配置
      }

    /**

    • 在 Spring 啟動時,根據路由表生成對應的管道對映關係
      */
      @Bean(“pipelineRouteMap”)
      public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() {
      return PIPELINE_ROUTE_MAP.entrySet()
      .stream()
      .collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
      }

    /**

    • 根據給定的管道中 ContextHandler 的型別的列表,構建管道
      */
      private List<? extends ContextHandler<? extends PipelineContext>> toPipeline(
      Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) {
      return entry.getValue()
      .stream()
      .map(appContext::getBean)
      .collect(Collectors.toList());
      }

    private ApplicationContext appContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    appContext = applicationContext;
    }
    }
    ▐ 定義管道執行器

最後一步,定義管道執行器。管道執行器 根據傳入的上下文資料的型別,找到其對應的管道,然後將上下文資料放入管道中去進行處理。

/**

  • 管道執行器
    */
    @Component
    public class PipelineExecutor {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**

    • 引用 PipelineRouteConfig 中的 pipelineRouteMap
      */
      @Resource
      private Map<Class<? extends PipelineContext>,
      List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;

    /**

    • 同步處理輸入的上下文資料

    • 如果處理時上下文資料流通到最後一個處理器且最後一個處理器返回 true,則返回 true,否則返回 false

    • @param context 輸入的上下文資料

    • @return 處理過程中管道是否暢通,暢通返回 true,不暢通返回 false
      */
      public boolean acceptSync(PipelineContext context) {
      Objects.requireNonNull(context, “上下文資料不能為 null”);
      // 拿到資料型別
      Class<? extends PipelineContext> dataType = context.getClass();
      // 獲取資料處理管道
      List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);

      if (CollectionUtils.isEmpty(pipeline)) {
      logger.error("{} 的管道為空", dataType.getSimpleName());
      return false;
      }

      // 管道是否暢通
      boolean lastSuccess = true;

      for (ContextHandler<? super PipelineContext> handler : pipeline) {
      try {
      // 當前處理器處理資料,並返回是否繼續向下處理
      lastSuccess = handler.handle(context);
      } catch (Throwable ex) {
      lastSuccess = false;
      logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
      }

       // 不再向下處理
       if (!lastSuccess) { break; }
      

      }

      return lastSuccess;
      }
      }

▐ 使用管道模式

此時,我們可以將最開始的 buildModelInstance 修改為:

public CommonResponse buildModelInstance(InstanceBuildRequest request) {
InstanceBuildContext data = createPipelineData(request);
boolean success = pipelineExecutor.acceptSync(data);

// 建立模型例項成功
if (success) {
    return CommonResponse.success(data.getInstanceId());
}


logger.error("建立模式例項失敗:{}", data.getErrorMsg());
return CommonResponse.failed(data.getErrorMsg());

}
我們模擬一下模型例項的建立過程:

引數正常時:

引數正常時

引數出錯時:

引數出錯時

這個時候我們再為 InstanceBuildContext 加入新的兩個 ContextHandler:FormInputPreprocessor(表單輸入資料預處理) 和 BizSideCustomProcessor(業務方自定義資料處理)。

@Component
public class FormInputPreprocessor implements ContextHandler {

private final Logger logger = LoggerFactory.getLogger(this.getClass());


@Override
public boolean handle(InstanceBuildContext context) {
    logger.info("--表單輸入資料預處理--");


    // 假裝進行表單輸入資料預處理


    return true;
}

}
@Component
public class BizSideCustomProcessor implements ContextHandler {

private final Logger logger = LoggerFactory.getLogger(this.getClass());


@Override
public boolean handle(InstanceBuildContext context) {
    logger.info("--業務方自定義資料處理--");


    // 先判斷是否存在自定義資料處理,如果沒有,直接返回 true


    // 呼叫業務方的自定義的表單資料處理


    return true;
}

}
此時 buildModelInstance 不需要做任何修改,我們只需要在 “路由表” 裡面,將這兩個 ContextHandler 加入到 InstanceBuildContext 關聯的管道中,Spring 啟動的時候,會自動幫我們構建好每種 Context 對應的管道:

加入新的處理器

再模擬一下模型例項的建立過程:

加入新的處理器後模型例項的建立過程

真香.gif

▐ 非同步處理

管道執行器 PipelineExecutor 中,acceptSync 是個同步的方法。

小蜜:看名字你就知道你悄悄埋伏筆了。

被你發現了.gif

對於步驟繁多的任務,很多時候我們更需要的是非同步處理,比如某些耗時長的定時任務。管道處理非同步化非常的簡單,我們先定義一個執行緒池,比如:











然後在 PipelineExecutor 中加入非同步處理的方法:

/**

  • 管道執行緒池
    */
    @Resource
    private ThreadPoolTaskExecutor pipelineThreadPool;

/**

  • 非同步處理輸入的上下文資料

  • @param context 上下文資料

  • @param callback 處理完成的回撥
    */
    public void acceptAsync(PipelineContext context, BiConsumer<PipelineContext, Boolean> callback) {
    pipelineThreadPool.execute(() -> {
    boolean success = acceptSync(context);

     if (callback != null) {
         callback.accept(context, success);
     }
    

    });
    }

▐ 通用處理

比如我們想記錄下每次管道處理的時間,以及在處理前和處理後都列印相關的日誌。那麼我們可以提供兩個通用的 ContextHandler,分別放在每個管道的頭和尾:

@Component
public class CommonHeadHandler implements ContextHandler {

private final Logger logger = LoggerFactory.getLogger(this.getClass());


@Override
public boolean handle(PipelineContext context) {
    logger.info("管道開始執行:context={}", JSON.toJSONString(context));


    // 設定開始時間
    context.setStartTime(LocalDateTime.now());


    return true;
}

}
@Component
public class CommonTailHandler implements ContextHandler {

private final Logger logger = LoggerFactory.getLogger(this.getClass());


@Override
public boolean handle(PipelineContext context) {
    // 設定處理結束時間
    context.setEndTime(LocalDateTime.now());


    logger.info("管道執行完畢:context={}", JSON.toJSONString(context));


    return true;
}

}
通用頭、尾處理器可以在路由表裡面放置,但是每次新加一種 PipelineContext 都要加一次,好像沒有必要 —— 我們直接修改下 管道執行器 PipelineExecutor 中的 acceptSync 方法:

@Component
public class PipelineExecutor {

......


@Autowired
private CommonHeadHandler commonHeadHandler;


@Autowired
private CommonTailHandler commonTailHandler;


public boolean acceptSync(PipelineContext context) {
    ......


    // 【通用頭處理器】處理
    commonHeadHandler.handle(context);


    // 管道是否暢通
    boolean lastSuccess = true;


    for (ContextHandler<? super PipelineContext> handler : pipeline) {
        try {
            // 當前處理器處理資料,並返回是否繼續向下處理
            lastSuccess = handler.handle(context);
        } catch (Throwable ex) {
            lastSuccess = false;
            logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
        }


        // 不再向下處理
        if (!lastSuccess) { break; }
    }


    // 【通用尾處理器】處理
    commonTailHandler.handle(context);


    return lastSuccess;
}

}

總結
通過管道模式,我們大幅降低了系統的耦合度和提升了內聚程度與擴充套件性:

ModelService 只負責處理 HSF 請求,不用關心具體的業務邏輯

PipelineExecutor 只做執行工作,不用關心具體的管道細節

每個 ContextHandler 只負責自己那部分的業務邏輯,不需要知道管道的結構,與其他ContextHandler 的業務邏輯解耦

新增、刪除 或者 交換子步驟時,都只需要操作路由表的配置,而不要修改原來的呼叫程式碼

淘系技術部-全域營銷團隊-誠招英才

戰鬥在阿里電商的核心地帶,負責連線供需兩端,支援電商營銷領域的各類產品、平臺和解決方案,其中包括聚划算、百億補貼、天貓U先、天貓小黑盒、天貓新品孵化、品牌號等重量級業務。我們深度參與雙11、618、99划算節等年度大促,不斷挑戰技術的極限! 團隊成員背景多樣,有深耕電商精研技術的老司機,也有朝氣蓬勃的小萌新,更有可顏可甜的小姐姐,期待具有好奇心和思考力的你的加入!

【招聘崗位】Java 技術專家 、資料工程師

如果您有興趣可將簡歷發至 [email protected] 或者新增作者微信 wx_zhou_mi 進行詳細諮詢,歡迎來撩~

✿ 拓展閱讀

作者|周密(之葉)

編輯|橙子君

出品|阿里巴巴新零售淘系技術