1. 程式人生 > >服務編排--Conductor 文件翻譯 (介紹與基本概念)

服務編排--Conductor 文件翻譯 (介紹與基本概念)

本文是對 Conductor 文件的簡單翻譯,建議你認真閱讀,如果閱讀後你仍然不知道如何使用,可以繼續關注本部落格,我會在後續的部落格中更新 Conductor 實戰

介紹

Conductor是一個微服務的編排引擎

Conductor 優點

Conductor,幫助我們協調基於微服務的流程,具有以下功能:

  • 允許建立複雜的流程/業務流,其中由微服務實現單個任務。
  • 基於JSON DSL的定義執行流程。
  • 為這些流程提供可見性和可追溯性。
  • 在暫停,恢復,重啟等周圍公開控制語義,以獲得更好的devops體驗。
  • 允許更多地重用現有的微服務,為管理提供更容易的途徑。
  • 使用者介面視覺化流程。
  • 能夠在需要時同步處理所有任務。
  • 能夠擴充套件數百萬個併發執行的流程。
  • 由客戶端提取的排隊服務支援。
  • 能夠在HTTP或其他傳輸上執行,例如gRPC。

為什麼不進行點對點編排?

通過點對點任務編排,我們發現隨著業務需求和複雜性的增長難以擴充套件。釋出/訂閱模型適用於最簡單的流程,
但很快就突出了與該方法相關的一些問題:

  • 流程“嵌入”在多個應用程式的程式碼中。
  • 通常,圍繞輸入/輸出,SLA等存在緊密耦合和假設,使得更難以適應不斷變化的需求。
  • 幾乎沒有辦法系統地回答“我們用過程X做了多少”?

基本概念

工作流定義

工作流是使用基於JSON的DSL定義的,包括一組作為工作流的一部分執行的任務。任務是在遠端機器上執行的控制任務(fork,條件等)或應用程式任務(例如編碼檔案)。

任務定義

  • 所有任務都需要在活動工作流程使用之前進行註冊。
  • 任務可以在多個工作流程中重複使用。工人任務分為兩類:
    • 系統任務
    • 工人任務

系統任務

系統任務在Conductor伺服器的JVM內執行,並由Conductor管理,以實現其可執行性和可擴充套件性。

名稱 目的
DYNAMIC 基於任務的輸入表示式派生的工作任務,而不是靜態定義為計劃的一部分
DECIDE 決策任務 - 實現案例……開關樣式分叉
FORK 分叉一組並行的任務。計劃每個集合並行執行
FORK_JOIN_DYNAMIC 與FORK類似,但FORK_JOIN_DYNAMIC不是在並行執行計劃中定義的任務集,而是根據此任務的輸入表示式生成並行任務
JOIN 補充FORK和FORK_JOIN_DYNAMIC。用於合併一個或多個並行分支*
SUB_WORKFLOW 將另一個工作流巢狀為子工作流任務。在執行時,它例項化子工作流並等待它完成
EVENT 在支援的事件系統中生成事件(例如,Conductor,SQS)

Conductor提供了一個API來建立在與引擎相同的JVM中執行的使用者定義任務。有關詳細資訊,請參閱WorkflowSystemTask介面。

工人任務

工作人員任務由應用程式實現,並在與Conductor不同的環境中執行。工作人員任務可以用任何語言實現。這些任務通過REST API端點與Conductor伺服器通訊,以輪詢任務發現並執行,並在執行後更新其狀態。

工作人員任務由計劃中的任務型別SIMPLE標識。

工作流任務的生命週期

這裡寫圖片描述

元資料定義

任務定義

Conductor維護著一個工作人員任務型別的登錄檔。在工作流程中使用之前必須註冊任務型別。

{
  "name": "encode_task",
  "retryCount": 3,
  "timeoutSeconds": 1200,
  "inputKeys": [
    "sourceRequestId",
    "qcElementType"
  ],
  "outputKeys": [
    "state",
    "skipped",
    "result"
  ],
  "timeoutPolicy": "TIME_OUT_WF",
  "retryLogic": "FIXED",
  "retryDelaySeconds": 600,
  "responseTimeoutSeconds": 3600
}
領域 描述 筆記
name 任務型別 唯一
retryCount 任務標記為失敗時嘗試重試的次數
retryLogic 重試機制 看下面的可能值
timeoutSeconds 以毫秒為單位的時間,在此之後,如果在轉換到IN_PROGRESS狀態後未完成任務,則將任務標記為TIMED_OUT 如果設定為0,則不會超時
timeoutPolicy 任務的超時策略 看下面的可能值
responseTimeoutSeconds 如果大於0,則在此時間之後未更新狀態時,將重新安排任務。當工作人員輪詢任務但由於錯誤/網路故障而無法完成時很有用。
outputKeys 任務輸出的鍵集。用於記錄任務的輸出

重試邏輯

  • FIXED :重新安排任務後的任務 retryDelaySeconds
  • EXPONENTIAL_BACKOFF:重新安排之後 retryDelaySeconds * attempNo

超時政策

  • RETRY :再次重試該任務
  • TIME_OUT_WF:工作流程標記為TIMED_OUT並終止
  • ALERT_ONLY:註冊計數器(task_timeout)

工作流定義

使用基於JSON的DSL定義工作流。

{
  "name": "encode_and_deploy",
  "description": "Encodes a file and deploys to CDN",
  "version": 1,
  "tasks": [
    {
      "name": "encode",
      "taskReferenceName": "encode",
      "type": "SIMPLE",
      "inputParameters": {
        "fileLocation": "${workflow.input.fileLocation}"
      }
    },
    {
      "name": "deploy",
      "taskReferenceName": "d1",
      "type": "SIMPLE",
      "inputParameters": {
        "fileLocation": "${encode.output.encodeLocation}"
      }

    }
  ],
  "outputParameters": {
    "cdn_url": "${d1.output.location}"
  },
  "schemaVersion": 2
}
領域 描述 筆記
name 工作流程的名稱
description 工作流程的描述性名稱
version 用於標識架構版本的數字欄位。使用遞增數字 啟動工作流程執行時,如果未指定,則使用具有最高版本的定義
tasks 一系列任務定義,如下所述。
outputParameters 用於生成工作流輸出的JSON模板 如果未指定,則將輸出定義為上次執行的任務的輸出
inputParameters 輸入引數列表。用於記錄工作流程所需的輸入 可選的

工作流程中的任務

tasks工作流中的屬性定義要按該順序執行的任務陣列。以下是每項任務所需的強制性最低引數:

領域 描述 筆記
name 任務名稱。在開始工作流程之前,必須使用Conductor註冊為任務型別
taskReferenceName 別名用於在工作流程中引用任務。必須是獨一無二的。
type 任務型別。SIMPLE用於遠端工作人員或其中一個系統任務型別執行的任務
description 任務描述 可選的
optional 對或錯。設定為true時 - 即使任務失敗,工作流也會繼續。任務的狀態反映為COMPLETED_WITH_ERRORS 預設為 false
inputParameters JSON模板,用於定義給予任務的輸入 有關詳細資訊,請參見“接線輸入和輸出”

除了這些引數,需要進行具體的任務型別附加引數記錄在這裡

連線輸入和輸出

當觸發新的執行時,客戶端會為工作流提供輸入。工作流輸入是通過${workflow.input…}表示式提供的JSON有效負載。

基於inputParameters工作流定義中配置的模板,為工作流中的每個任務提供輸入。 inputParameters是一個JSON片段,其值包含用於在執行期間對映工作流的輸入或輸出或其他任務的值的引數。

對映值的語法遵循以下模式:

$ {SOURCE.input / output.JSONPath}

- -
SOURCE 可以是任何任務的“工作流程”或引用名稱
input/output 指源的輸入或輸出
JSONPath JSON路徑表示式從源的輸入/輸出中提取JSON片段

JSON路徑支援

Conductor支援JSONPath規範並從此處使用Java實現。

考慮一個任務,其輸入配置為使用來自工作流的輸入/輸出引數和名為loc_task的任務。

{
  "inputParameters": {
    "movieId": "${workflow.input.movieId}",
    "url": "${workflow.input.fileLocation}",
    "lang": "${loc_task.output.languages[0]}",
    "http_request": {
      "method": "POST",
      "url": "http://example.com/${loc_task.output.fileId}/encode",
      "body": {
        "recipe": "${workflow.input.recipe}",
        "params": {
          "width": 100,
          "height": 100
        }
      },
      "headers": {
        "Accept": "application/json",
        "Content-Type": "application/json"
      }
    }
  }
}

請將以下內容視為工作流輸入

{
  "movieId": "movie_123",
  "fileLocation":"s3://moviebucket/file123",
  "recipe":"png"
}

並且loc_task的輸出如下;

{
  "fileId": "file_xxx_yyy_zzz",
  "languages": ["en","ja","es"]
}

在安排任務時,Conductor將合併工作流輸入和loc_task輸出中的值,並按如下方式建立任務輸入:

{
  "movieId": "movie_123",
  "url": "s3://moviebucket/file123",
  "lang": "en",
  "http_request": {
    "method": "POST",
    "url": "http://example.com/file_xxx_yyy_zzz/encode",
    "body": {
      "recipe": "png",
      "params": {
        "width": 100,
        "height": 100
      }
    },
    "headers": {
        "Accept": "application/json",
        "Content-Type": "application/json"
    }
  }
}

系統任務的建立

(DYNAMIC) 動態任務定義

引數:

名稱 描述
dynamicTaskNameParam 任務輸入中用於計劃任務的值的引數名稱。例如,如果引數的值是ABC,則排程的下一個任務是“ABC”型別。

{
  "name": "user_task",
  "taskReferenceName": "t1",
  "inputParameters": {
    "files": "${workflow.input.files}",
    "taskToExecute": "${workflow.input.user_supplied_task}"
  },
  "type": "DYNAMIC",
  "dynamicTaskNameParam": "taskToExecute"
}

如果使用輸入引數user_supplied_task的值作為user_task_2啟動工作流,則Conductor將在計劃此動態任務時排程user_task_2。

(DECIDE)決策任務定義

決策任務類似於case…switch程式語言中的語句。該任務需要3個引數:

引數:

名稱 描述
caseValueParam 任務輸入中引數的名稱,其值將用作開關。
decisionCases 可以鍵入值的對映值的caseValueParam值是要執行的任務列表。
defaultCase 在判定案例中找不到匹配值時要執行的任務列表(預設條件)

{
  "name": "decide_task",
  "taskReferenceName": "decide1",
  "inputParameters": {
    "case_value_param": "${workflow.input.movieType}"
  },
  "type": "DECISION",
  "caseValueParam": "case_value_param",
  "decisionCases": {
    "Show": [
      {
        "name": "setup_episodes",
        "taskReferenceName": "se1",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      },
      {
        "name": "generate_episode_artwork",
        "taskReferenceName": "ga",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      }
    ],
    "Movie": [
      {
        "name": "setup_movie",
        "taskReferenceName": "sm",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      },
      {
        "name": "generate_movie_artwork",
        "taskReferenceName": "gma",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      }
    ]
  }
}

Fork 並行任務定義

Fork用於排程並行任務集。

引數:

名稱 描述
forkTasks 任務列表列表。每個子列表計劃並行執行。但是,子列表中的任務是以序列方式安排的。

{
  "forkTasks": [
    [
      {
        "name": "task11",
        "taskReferenceName": "t11"
      },
      {
        "name": "task12",
        "taskReferenceName": "t12"
      }
    ],
    [
      {
        "name": "task21",
        "taskReferenceName": "t21"
      },
      {
        "name": "task22",
        "taskReferenceName": "t22"
      }
    ]
  ]
}

執行時,task11和task21被安排在同一時間執行。

Dynamic Fork (動態分支)

Dynamic fork與FORK_JOIN任務相同。除了在執行時使用任務的輸入提供要並行的任務列表。當並行的任務數量不固定並根據輸入而變化時很有用。

名稱 描述
dynamicForkTasksParam 包含要並行執行的工作流任務配置列表的引數的名稱
dynamicForkTasksInputParamName 引數的名稱,其值應為帶有鍵的對映,作為分叉任務的引用名稱和值作為分叉任務的輸入

{
  "inputParameters": {
     "dynamicTasks": "${taskA.output.dynamicTasksJSON}",
     "dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}"
  }
  "type": "FORK_JOIN_DYNAMIC",
  "dynamicForkTasksParam": "dynamicTasks",
  "dynamicForkTasksInputParamName": "dynamicTasksInput"
}

將taskA的輸出視為:

{
  "dynamicTasksInputJSON": {
    "forkedTask1": {
      "width": 100,
      "height": 100,
      "params": {
        "recipe": "jpg"
      }
    },
    "forkedTask2": {
      "width": 200,
      "height": 200,
      "params": {
        "recipe": "jpg"
      }
    }
  },
  "dynamicTasksJSON": [
    {
      "name": "encode_task",
      "taskReferenceName": "forkedTask1",
      "type": "SIMPLE"
    },
    {
      "name": "encode_task",
      "taskReferenceName": "forkedTask2",
      "type": "SIMPLE"
    }
  ]
}

執行時,Dynamic fork 任務將排程兩個型別為“encode_task”的並行任務,引用名稱為“forkedTask1”和“forkedTask2”,輸入由_ dynamicTasksInputJSON_指定

Dynamic Fork and Join

Join任務必須遵循FORK_JOIN_DYNAMIC

工作流定義必須包含一個Join任務定義,後跟FORK_JOIN_DYNAMIC任務。但是,考慮到任務的動態特性,此Join不需要joinOn引數。在完成之前,連線將等待所有並行分支任務完成。

與FORK不同,FORK可以執行並行流,每個fork按順序執行一系列任務,FORK_JOIN_DYNAMIC僅限於每個fork一個任務。但是,並行任務可以是子工作流,允許更復雜的執行流。

Join

Join任務用於等待fork任務生成的一個或多個任務的完成。

引數

名稱 描述
joinOn 任務引用名稱列表,JOIN將等待完成。

{
    "joinOn": ["taskRef1", "taskRef3"]
}

Join 任務輸出

Fork任務的輸出將是一個JSON物件,其中key是任務引用名稱,value是fork任務的輸出。

子工作流程

子工作流任務允許在另一個工作流中巢狀工作流。

引數

名稱 描述
subWorkflowParam 任務引用名稱列表,JOIN將等待完成。

{
  "name": "sub_workflow_task",
  "taskReferenceName": "sub1",
  "inputParameters": {
    "requestId": "${workflow.input.requestId}",
    "file": "${encode.output.location}"
  },
  "type": "SUB_WORKFLOW",
  "subWorkflowParam": {
    "name": "deployment_workflow",
    "version": 1
  }
}

執行時,deployment_workflow使用兩個輸入引數requestId和file執行a 。生成的工作流程完成後,任務標記為已完成。如果子工作流終止或失敗,則任務被標記為失敗並在配置時重試。

Wait

Wait 任務被實現為保持在IN_PROGRESS狀態的門,除非標記為外部觸發器COMPLETED或FAILED由外部觸發器標記。要使用Wait任務,請將任務型別設定為WAIT

引數
沒有要求

Wait 任務的外部觸發器

任務資源端點可用於將任務的狀態更新為終止狀態。

Contrib模組提供SQS整合,外部系統可以將訊息放入伺服器偵聽的預配置佇列中。當訊息到達時,它們被標記為COMPLETED或FAILED。

SQS佇列

  • 可以使用以下API檢索伺服器用於更新任務狀態的SQS佇列:
GET /queue
  • 更新任務狀態時,訊息需要符合以下規範:
    • 訊息必須是有效的JSON字串。
    • 訊息JSON應包含一個名為key的鍵externalId,該值是一個包含以下鍵的JSONified字串:
    • workflowId:工作流程的ID
    • taskRefName:應更新的任務引用名稱。
    • 每個佇列代表一個特定的任務狀態,並相應地標記任務。例如,傳送到COMPLETED佇列的訊息將任務狀態標記為COMPLETED。
    • 任務的輸出隨訊息更新。

示例SQS有效負載:

{
  "some_key": "valuex",
  "externalId": "{\"taskRefName\":\"TASK_REFERENCE_NAME\",\"workflowId\":\"WORKFLOW_ID\"}"
}

HTTP

HTTP任務用於通過HTTP呼叫另一個微服務。

引數

該任務需要一個輸入引數http_request,該引數作為任務輸入的一部分,具有以下詳細資訊:

名稱 描述
URI 服務的URI。使用vipAddress或包含伺服器地址時可以是部分的。
method HTTP方法。其中一個GET,PUT,POST,DELETE,OPTIONS,HEAD
accept 根據伺服器的要求接受標頭。
contentType 內容型別 - 支援的型別是text / plain,text / html和application / json
headers 要與請求一起傳送的其他http標頭的對映。
body 請求正文
vipAddress 使用基於發現的服務URL時。

HTTP任務輸出

名稱 描述
response JSON主體包含響應(如果存在)
headers 響應標題
statusCode 整數狀態程式碼

任務使用vipAddress輸入有效負載

{
  "http_request": {
    "vipAddress": "examplevip-prod",
    "uri": "/",
    "method": "GET",
    "accept": "text/plain"
  }
}

任務使用絕對URL輸入

{
  "http_request": {
    "uri": "http://example.com/",
    "method": "GET",
    "accept": "text/plain"
  }
}

該任務被標記為FAILED無法完成請求或遠端伺服器返回非成功的狀態程式碼。

注意

HTTP任務當前僅支援Content-Type作為application / json,並且能夠解析文字以及JSON響應。目前不支援XML輸入/輸出。但是,如果無法將響應解析為JSON或Text,則將字串表示形式儲存為文字值。

Event (事件)

事件任務提供將事件(訊息)釋出到Conductor或外部事件系統(如SQS)的功能。事件任務對於為工作流和任務建立基於事件的依賴項非常有用。

引數

名稱 描述
sink 生成的事件的合格名稱。例如,導體或sqs:sqs_queue_name

{
    "sink": 'sqs:example_sqs_queue_name'
}

使用Conductor作為接收器生成事件時,事件名稱遵循以下結構: conductor::

對於SQS,請使用佇列的名稱而不是URI。Conductor根據名稱查詢URI。

警告

使用SQS時,將ContribsModule新增到部署中。需要使用AWSCredentialsProvider為Conductor配置模組,以便能夠使用AWS API。

支援的接收器

  • Conductor
  • SQS

事件任務輸入

給予事件任務的輸入可作為有效負載用於已釋出的訊息。例如,如果訊息被放入SQS佇列(接收器是sqs),則訊息有效負載將是任務的輸入。

事件任務輸出

event_produced 生成的事件的名稱。