一個跨平臺的多程序合作框架(一)基本原理
在上一篇文章中,我們探討了一種通過程序交換的合作框架。經過同學們半年多的討論、實驗,目前實現了這個想法。本篇首先介紹基本原理,後續將詳細講解開發過程。
本文的GitHub連結為 這裡
1. 什麼是Taskbus
Taskbus 是一種面向非專業開發者的跨平臺多程序合作框架,具有程序切割、語言無關、編譯器無關、架構無關四個特點。
非專業開發者是一個泛泛的概念,可以理解為沒有受過專業化的軟體工程化訓練的開發者。諸如需要頻繁自行開發小工具進行演算法驗證的高校教研團隊,以及深入某一領域(化工、機械、通訊、電子等)進行資料分析,需要長期從事非消費類工具軟體開發的工程師團隊。
Taskbus 從感官上提供一種類似Simulink或GNU-Radio的模組化拖拽介面,可用於在通用計算機上實現準實時的處理邏輯。但是,從結構上,其與二者完全不同。Taskbus 對編譯器、執行平臺、開發語言不做要求。它通過定義一種功能釋出與資料交換標準,提供一套程序管理平臺,以便把不同語言開發的程序組合起來。在例子中,您可以看到Python2/3,NodeJS,C#,Matlab、Qt、C++、MFC等工具鏈生成的模組,它們在平臺統一排程下完成功能。
2. 關鍵特性
taskBus 的核心理念是 “定IO標準不定具體工具,定連線結構不定架構演算法”
- taskBus 僅定義資料交換的方法與格式,對具體實現語言、執行環境沒有要求。這帶來了非常高的靈活性。
- taskBus 僅定義程序間的邏輯連線結構,並不定義使用者搭建的具體功能所採用的架構、所需要的方法。
它的關鍵特性:
- 簡單: 程序通過標準輸入輸出(stdin,stdout,stderr)吞吐資料、命令列引數接受初始化引數。沒有對資料庫、COM、CORBA、動態連結、SOAP或網路協議的知識要求。基於平臺提供的除錯功能,經過錄制、回放操作,可以脫離平臺獨立除錯各個模組的邏輯。
- 靈活: 通過專題(Subject)、通路(Path),通過標準輸入輸出管道可以分時處理多個邏輯流。它們之間的關係完全由模組設計者確定。基於網路模組、資料庫模組提供的功能,可以在不同的作業系統上構建分散式的處理系統。您甚至可以用封裝器把Matlab、Octave、Python程式包裝進來。
- 穩定: 錯誤被控制在一個模組程序內部,易於發現問題。 可為各個模組設定獨立的優先順序(nice)。
- 高效: 多程序並行、分散式計算與優先順序控制,使得通用PC計算環境也可達到實時處理/準實時處理的能力。當您的模組加入GPU加速等特性後,可以使整個系統性能得到大幅度提升。
- 推送而非請求:與GNU-Radio不同,taskBus的前序模組主動向後端推送資料,而負荷控制由模組通過簡單的方法實現(參考下文“負荷控制”部分)。
3. 基本原理
很多經典的程式語言課本都是從控制檯開始的. 控制檯程式通過鍵盤接收使用者輸入,向螢幕列印運算結果。事實上,無論在Linux還是Windows中,程序啟動時便有三個特殊的檔案控制代碼可用了。他們是標準輸出(stdout),標準輸入(stdin),標準錯誤(stderr)。預設情況下,stdin與鍵盤關聯,stdout、stderr與螢幕關聯。
大多數現代語言支援建立子程序, 並且可以通過 “管道重定向” 技術接管子程序的標準管道。Taskbus 技術即基於此特性,從各個子程序的stdout讀取資料,並轉發給需要的子程序(stdin)。
3.1 輸入輸出
一個實現XOR操作的教科書C程式,一般類似這樣:
#include <stdio.h>
int main(int argc, char *argv[])
{
unsigned int a[4];
scanf("%u,%u,%u,%u",a,a+1,a+2,a+3);
for(int i = 0; i < 4; ++i)
a[i] = 0xFFFFFFFF ^ a[i];
printf("%u,%u,%u,%u\n",a[0],a[1],a[2],a[3]);
return 0;
}
上面的程式裡,從鍵盤輸入四個數字,取反後輸出。由於鍵盤與stdin關聯,螢幕與stdout關聯,上述程式實質上與下面的程式等效:
fscanf(stdin,"%u,%u,%u,%u",a,a+1,a+2,a+3);
fprintf(stdout,"%u,%u,%u,%u\n",a[0],a[1],a[2],a[3]);
Taskbus 模組的輸入輸出與上述程式非常類似,唯一的區別是使用了二進位制讀寫函式:
unsigned int a[4];
fread(a,sizeof(int),4,stdin);
for(int i = 0; i < 4; ++i)
a[i] = 0xFFFFFFFF ^ a[i];
fwrite(a,sizeof(int),4,stdout);
3.2 專題與通路
一個子程序只有一對輸入輸出管道。通過設定專題與通路,可以僅用一路管道分享多路內容。
3.2.1 專題
專題(Subject)指示一類資料。如音效卡採集的波形,以及從檔案中讀取的位元組流。
在圖形介面上,專題顯示為管腳。每個專題有一個便於記憶的名字;在執行時,taskBus平臺會根據連線關係,為每個專題設定一個整數ID。ID相同的輸入、輸出管腳會被連線起來。
一個專題的生產者生成資料,交給平臺。平臺把資料交給所有連線了此專題的消費者。
注意: 不同的管腳可以生產同一ID的專題,也可以監聽一個相同的專題。對生產者,這樣做的行為是一致的。對消費者,如何處理專題ID相同的情況,取決於模組實現者。
3.2.2 通路
通路(PATH)在一類專題中,區分一條獨立的自然時序。下面的例子中,兩路音效卡採集的資料匯入同一個FFT變換器。對於變換器來說,需要區分出兩條自然時序,才能不導致混淆。
上圖中的音效卡模組採用自身的程序ID(2、6)作為通路號,這樣非常便捷地標定了資料的來源。
3.3 帶有專題和通路的IO
鑑於上面的介紹,我們在前文程式碼中稍加修改,即可實現基於stdio的通訊。
void record( char a[], int data_len, int path_id)
{
int out_subject_id, out_path_id;
int out_data_len;
char b[MAX_LEN];
deal(a, datalen,path_id,
&out_subject_id,
&out_path_id,
&out_data_len,
b
);
fwrite (&out_subject_id,sizeof(int),1,stdin);
fwrite (&out_path_id,sizeof(int),1,stdin);
fwrite (&out_data_len,sizeof(int),1,stdin);
fwrite (b,sizeof(char),out_data_len,stdin);
}
int main(int argc, char *argv[])
{
int subject_id, path_id;
int data_len;
char a[MAX_LEN];
while (!finished())
{
fread (&subject_id,sizeof(int),1,stdin);
fread (&path_id,sizeof(int),1,stdin);
fread (&data_len,sizeof(int),1,stdin);
fread (a,sizeof(char),data_len,stdin);
switch (subject_id)
{
case ID_WAV:
record(a,data_len,path_id);
break;
case ID_DAT:
deal(a,data_len,path_id);
break;
default:
break;
}
}
return 0;
}
上述程式碼缺少上下文,但清晰的示意了taskBus最基本的通訊原理。不同模組之間,正是通過這樣的方法進行溝通的。
3.4 模組功能釋出
由開發者獨立開發的模組,需要使用JSON檔案釋出自己的功能。這樣,平臺就知道模組支援的專題型別、引數選項。
一個典型的功能描述檔案必須包括三部分,分別是:
- 引數表
- 輸入專題表
- 輸出專題表
其他使用者自定義部分依舊可以讀入並顯示在平臺,但沒有實際的意義。 平臺提供的helloworld例子包含兩個功能,一個是位元抑或,一個是順序取反。JSON檔案的結構如下:
{
"example_bitxor":{
"name":"bitxor",
"parameters":{
"mask":{
"type":"unsigned char",
"tooltip":"bytemask",
"default":255,
"range":{
"min":0,
"max":255
}
}
},
"input_subjects":
{
"data_in":{
"type":"byte",
"tooltip":"input"
}
},
"output_subjects":{
"data_out":{
"type":"byte",
"tooltip":"output"
}
},
"info":{
"auther":"kelly",
"version":[1,0,0],
"mail":"[email protected]"
}
},
"example_reverse":{
"name":"reverse",
"parameters":{
},
"input_subjects":
{
"data_in":{
"type":"byte",
"tooltip":"input"
}
},
"output_subjects":{
"data_out":{
"type":"byte",
"tooltip":"output"
}
},
"info":{
"auther":"kelly",
"version":[1,1,0],
"mail":"[email protected]"
}
}
}
可以看到,檔案由兩大塊組成。第一塊為 example_bitxor部分,第二塊為example_reverse,對應了兩個功能。
在各個功能內部,又分name、parameters、input_subject、output_subject四個子專案,分別對應友好名稱、靜態屬性、輸入專題、輸出專題。
**注意:**儘管各個屬性含有“type”型別指示以及range取值範圍指示,但平臺把所有輸入輸出看做位元組流,這些取值僅僅為了提醒使用者。一個成熟的模組應該有詳細的介面文件描述輸入輸出型別、位元組序、大小端。對文字型別,要明確或者可以設施字符集。上面的模組,在平臺上顯示為:
3.5 命令列引數
taskBus 平臺根據JSON檔案啟動程序。在啟動各個功能模組時,taskBus 通過命令列引數送入所有的資訊。命令列引數有如下幾類。
類別 | 引數 | 意義 | 解釋 |
---|---|---|---|
程序 | ----instance= | 向模組送入程序ID值 | 整形。用於區分獨立的程序,這個措施避免了模組自己來生成唯一ID |
程序 | ----function= | 向模組指定當前例項開啟的功能。 | 一個模組可以支援很多功能。 |
程序 | ----information | 平臺請求模組輸出JSON描述並退出。 | 模組既可以附帶JSON檔案,也可以在本引數中printf JSON。 |
專題 | ----<sub_name>= | 向模組指定專題名對應的ID | 專題名由各個模組確定,可以出現多條。 |
使用者屬性 | ----= | 使用者自定的初始化屬性 | 可在平臺“屬性”欄設定。 |
以上述模組JSON為例子,下圖中的模組,在啟動時,命令列如下:
[email protected]$ example_helloworld.exe --instance=6 --function=example_bitxor --mask=255 --data_in=6 --data_out=1
3.6 第一個Hello-world 模組的程式碼
我們把HelloWold模組的程式碼貼上到這裡,使用C++,可以非常方便的實現上述功能。
- 該程式碼沒有使用任何標準C++之外的特性
- 事實上,在第四章您可以看到,我們已經為接入平臺提前做好了不少簡化工作。但簡化工作會掩蓋細節,下面的程式碼仍舊是最體現模組執行原理的好例子。
#include <cstdio>
#include <string>
#include <cstring>
#include <cstdlib>
#ifdef WINNT
#include <io.h>
#include <fcntl.h>
#endif
using namespace std;
#define MAX_DTALEN 65536
int main(int argc, char * argv[])
{
//In windows, stdio must be set to BINARY mode, to
//prevent linebreak \\n\\r replace.
#ifdef WINNT
setmode(fileno(stdout),O_BINARY);
setmode(fileno(stdin),O_BINARY);
#endif
bool bInfo = false, finished = false;
int instance = 0;
int sub_input = 0, sub_output = 0;
char mask = 0;
string function;
//1. parse cmdline
for (int i=1;i<argc;++i)
{
string arg_key = argv[i], arg_value = argv[i];
int idx = arg_key.find('=');
if (idx>=0 && idx<arg_key.size())
{
arg_key = arg_key.substr(0,idx);
arg_value = arg_value.substr(idx+1);
}
if (arg_key=="--function")
function = arg_value;
else if (arg_key=="--information")
bInfo = true;
else if (arg_key=="--instance")
instance = atoi(arg_value.c_str());
else if (arg_key=="--data_in")
sub_input = atoi(arg_value.c_str());
else if (arg_key=="--data_out")
sub_output = atoi(arg_value.c_str());
else if (arg_key=="--mask")
mask = atoi(arg_value.c_str());
fprintf(stderr,"%s:%s\n",arg_key.c_str(),arg_value.c_str());
fflush(stderr);
}
//2. function case
if (bInfo)
{
//In this example, json file will be published with exe file.
//We will return directly. Or, you can output json here to stdout,
//If you do not want to publish your json file.
return 0;
}
else if (instance<=0 || function.length()==0)
return -1;
else
{
char header[4], data[MAX_DTALEN+1];
memset(data,0,MAX_DTALEN+1);
int n_sub = 0, n_path = 0, n_len = 0;
while(false==finished)
{
fread(header,1,4,stdin); //2.1 read header
if (header[0]!=0x3C || header[1]!=0x5A || header[2]!=0x7E || header[3]!=0x69)
{
fprintf(stderr,"Bad header\n");
break;
}
fread(&n_sub,sizeof(int),1,stdin);
fread(&n_path,sizeof(int),1,stdin);
fread(&n_len,sizeof(int),1,stdin);
if (n_len<0 || n_len >MAX_DTALEN)
{
fprintf(stderr,"Bad length %d\n",n_len);
break;
}
fread(data,sizeof(char),n_len,stdin);
if (n_sub<=0)
{
if (strstr(data, "quit")!=nullptr)
{
finished = true;
continue;
}
}
else if (n_sub != sub_input)
continue;
if (function=="example_bitxor")
{
for (int i=0;i<n_len;++i)
data[i] ^= mask;
}
else if (function=="example_reverse")
{
for (int i=0;i<n_len/2;++i)
{
char t = data[i];
data[i] = data[n_len-1-i];
data[n_len-1-i] = t;
}
}
else
{
fprintf(stderr,"Unknown function %s\n",function.c_str());
break;
}
fwrite(header,1,4,stdout);
fwrite(&sub_output,sizeof(int),1,stdout);
fwrite(&n_path,sizeof(int),1,stdout);
fwrite(&n_len,sizeof(int),1,stdout);
fwrite(data,sizeof(char),n_len,stdout);
fflush(stdout);
}
}
//3.exit
return 0;
}
下一篇文章,我們通過一個示例來敘述客戶端模組的開發過程。