1. 程式人生 > >一個跨平臺的多程序合作框架(一)基本原理

一個跨平臺的多程序合作框架(一)基本原理


上一篇文章中,我們探討了一種通過程序交換的合作框架。經過同學們半年多的討論、實驗,目前實現了這個想法。本篇首先介紹基本原理,後續將詳細講解開發過程。
本文的GitHub連結為 這裡

1. 什麼是Taskbus

Taskbus 是一種面向非專業開發者的跨平臺多程序合作框架,具有程序切割、語言無關、編譯器無關、架構無關四個特點。

非專業開發者是一個泛泛的概念,可以理解為沒有受過專業化的軟體工程化訓練的開發者。諸如需要頻繁自行開發小工具進行演算法驗證的高校教研團隊,以及深入某一領域(化工、機械、通訊、電子等)進行資料分析,需要長期從事非消費類工具軟體開發的工程師團隊。

Taskbus 從感官上提供一種類似Simulink或GNU-Radio的模組化拖拽介面,可用於在通用計算機上實現準實時的處理邏輯。但是,從結構上,其與二者完全不同。Taskbus 對編譯器、執行平臺、開發語言不做要求。它通過定義一種功能釋出與資料交換標準,提供一套程序管理平臺,以便把不同語言開發的程序組合起來。在例子中,您可以看到Python2/3,NodeJS,C#,Matlab、Qt、C++、MFC等工具鏈生成的模組,它們在平臺統一排程下完成功能。

MainUI

2. 關鍵特性

taskBus 的核心理念是 “定IO標準不定具體工具,定連線結構不定架構演算法”

  1. taskBus 僅定義資料交換的方法與格式,對具體實現語言、執行環境沒有要求。這帶來了非常高的靈活性。
  2. taskBus 僅定義程序間的邏輯連線結構,並不定義使用者搭建的具體功能所採用的架構、所需要的方法。
    它的關鍵特性:
  • 簡單: 程序通過標準輸入輸出(stdin,stdout,stderr)吞吐資料、命令列引數接受初始化引數。沒有對資料庫、COM、CORBA、動態連結、SOAP或網路協議的知識要求。基於平臺提供的除錯功能,經過錄制、回放操作,可以脫離平臺獨立除錯各個模組的邏輯。
  • 靈活: 通過專題(Subject)、通路(Path),通過標準輸入輸出管道可以分時處理多個邏輯流。它們之間的關係完全由模組設計者確定。基於網路模組、資料庫模組提供的功能,可以在不同的作業系統上構建分散式的處理系統。您甚至可以用封裝器把Matlab、Octave、Python程式包裝進來。
  • 穩定: 錯誤被控制在一個模組程序內部,易於發現問題。 可為各個模組設定獨立的優先順序(nice)。
  • 高效: 多程序並行、分散式計算與優先順序控制,使得通用PC計算環境也可達到實時處理/準實時處理的能力。當您的模組加入GPU加速等特性後,可以使整個系統性能得到大幅度提升。
  • 推送而非請求:與GNU-Radio不同,taskBus的前序模組主動向後端推送資料,而負荷控制由模組通過簡單的方法實現(參考下文“負荷控制”部分)。
    在這裡插入圖片描述

3. 基本原理

很多經典的程式語言課本都是從控制檯開始的. 控制檯程式通過鍵盤接收使用者輸入,向螢幕列印運算結果。事實上,無論在Linux還是Windows中,程序啟動時便有三個特殊的檔案控制代碼可用了。他們是標準輸出(stdout),標準輸入(stdin),標準錯誤(stderr)。預設情況下,stdin與鍵盤關聯,stdout、stderr與螢幕關聯。

大多數現代語言支援建立子程序, 並且可以通過 “管道重定向” 技術接管子程序的標準管道。Taskbus 技術即基於此特性,從各個子程序的stdout讀取資料,並轉發給需要的子程序(stdin)。

3.1 輸入輸出

一個實現XOR操作的教科書C程式,一般類似這樣:

#include <stdio.h>
int main(int argc, char *argv[])
{
	unsigned int a[4];
	scanf("%u,%u,%u,%u",a,a+1,a+2,a+3);
	for(int i = 0; i < 4; ++i)
		a[i] = 0xFFFFFFFF ^ a[i];
	printf("%u,%u,%u,%u\n",a[0],a[1],a[2],a[3]);
	return 0;
}

上面的程式裡,從鍵盤輸入四個數字,取反後輸出。由於鍵盤與stdin關聯,螢幕與stdout關聯,上述程式實質上與下面的程式等效:

	fscanf(stdin,"%u,%u,%u,%u",a,a+1,a+2,a+3);
	fprintf(stdout,"%u,%u,%u,%u\n",a[0],a[1],a[2],a[3]);

Taskbus 模組的輸入輸出與上述程式非常類似,唯一的區別是使用了二進位制讀寫函式:

	unsigned int a[4];
	fread(a,sizeof(int),4,stdin);
	for(int i = 0; i < 4; ++i)
		a[i] = 0xFFFFFFFF ^ a[i];
	fwrite(a,sizeof(int),4,stdout);

3.2 專題與通路

一個子程序只有一對輸入輸出管道。通過設定專題與通路,可以僅用一路管道分享多路內容。

3.2.1 專題

專題(Subject)指示一類資料。如音效卡採集的波形,以及從檔案中讀取的位元組流。
在圖形介面上,專題顯示為管腳。每個專題有一個便於記憶的名字;在執行時,taskBus平臺會根據連線關係,為每個專題設定一個整數ID。ID相同的輸入、輸出管腳會被連線起來。
在這裡插入圖片描述
一個專題的生產者生成資料,交給平臺。平臺把資料交給所有連線了此專題的消費者。

注意: 不同的管腳可以生產同一ID的專題,也可以監聽一個相同的專題。對生產者,這樣做的行為是一致的。對消費者,如何處理專題ID相同的情況,取決於模組實現者。

3.2.2 通路

通路(PATH)在一類專題中,區分一條獨立的自然時序。下面的例子中,兩路音效卡採集的資料匯入同一個FFT變換器。對於變換器來說,需要區分出兩條自然時序,才能不導致混淆。
在這裡插入圖片描述
上圖中的音效卡模組採用自身的程序ID(2、6)作為通路號,這樣非常便捷地標定了資料的來源。

3.3 帶有專題和通路的IO

鑑於上面的介紹,我們在前文程式碼中稍加修改,即可實現基於stdio的通訊。

void record( char a[], int data_len, int path_id)
{
	int out_subject_id, out_path_id;
	int out_data_len;
	char b[MAX_LEN];
	deal(a, datalen,path_id,
		 &out_subject_id,
		 &out_path_id,
		 &out_data_len,
		 b
		 );
	fwrite (&out_subject_id,sizeof(int),1,stdin);
	fwrite (&out_path_id,sizeof(int),1,stdin);
	fwrite (&out_data_len,sizeof(int),1,stdin);
	fwrite (b,sizeof(char),out_data_len,stdin);
}
int main(int argc, char *argv[])
{
	int subject_id, path_id;
	int data_len;
	char a[MAX_LEN];
	while (!finished())
	{
		fread (&subject_id,sizeof(int),1,stdin);
		fread (&path_id,sizeof(int),1,stdin);
		fread (&data_len,sizeof(int),1,stdin);
		fread (a,sizeof(char),data_len,stdin);
		switch (subject_id)
		{
		case ID_WAV:
			record(a,data_len,path_id);
			break;
		case ID_DAT:
			deal(a,data_len,path_id);
			break;
		default:
			break;
		}
	}
	return 0;
}

上述程式碼缺少上下文,但清晰的示意了taskBus最基本的通訊原理。不同模組之間,正是通過這樣的方法進行溝通的。

3.4 模組功能釋出

由開發者獨立開發的模組,需要使用JSON檔案釋出自己的功能。這樣,平臺就知道模組支援的專題型別、引數選項。
一個典型的功能描述檔案必須包括三部分,分別是:

  1. 引數表
  2. 輸入專題表
  3. 輸出專題表
    其他使用者自定義部分依舊可以讀入並顯示在平臺,但沒有實際的意義。 平臺提供的helloworld例子包含兩個功能,一個是位元抑或,一個是順序取反。JSON檔案的結構如下:
{
    "example_bitxor":{
        "name":"bitxor",
        "parameters":{
             "mask":{
                "type":"unsigned char",
                "tooltip":"bytemask",
                "default":255,
                "range":{
                    "min":0,
                    "max":255
                }
            }
        },
        "input_subjects":
        {
            "data_in":{
                "type":"byte",
                "tooltip":"input"
            }
        },
        "output_subjects":{
            "data_out":{
                "type":"byte",
                "tooltip":"output"
            }
        },
        "info":{
            "auther":"kelly",
            "version":[1,0,0],
            "mail":"[email protected]"
        }
    },
    "example_reverse":{
        "name":"reverse",
        "parameters":{
         },
        "input_subjects":
        {
            "data_in":{
                "type":"byte",
                "tooltip":"input"
            }
        },
        "output_subjects":{
            "data_out":{
                "type":"byte",
                "tooltip":"output"
            }
        },
        "info":{
            "auther":"kelly",
            "version":[1,1,0],
            "mail":"[email protected]"
        }
    }
}

可以看到,檔案由兩大塊組成。第一塊為 example_bitxor部分,第二塊為example_reverse,對應了兩個功能。

在各個功能內部,又分name、parameters、input_subject、output_subject四個子專案,分別對應友好名稱、靜態屬性、輸入專題、輸出專題。

**注意:**儘管各個屬性含有“type”型別指示以及range取值範圍指示,但平臺把所有輸入輸出看做位元組流,這些取值僅僅為了提醒使用者。一個成熟的模組應該有詳細的介面文件描述輸入輸出型別、位元組序、大小端。對文字型別,要明確或者可以設施字符集。上面的模組,在平臺上顯示為:
ExampleModules

3.5 命令列引數

taskBus 平臺根據JSON檔案啟動程序。在啟動各個功能模組時,taskBus 通過命令列引數送入所有的資訊。命令列引數有如下幾類。

類別 引數 意義 解釋
程序 ----instance= 向模組送入程序ID值 整形。用於區分獨立的程序,這個措施避免了模組自己來生成唯一ID
程序 ----function= 向模組指定當前例項開啟的功能。 一個模組可以支援很多功能。
程序 ----information 平臺請求模組輸出JSON描述並退出。 模組既可以附帶JSON檔案,也可以在本引數中printf JSON。
專題 ----<sub_name>= 向模組指定專題名對應的ID 專題名由各個模組確定,可以出現多條。
使用者屬性 ----= 使用者自定的初始化屬性 可在平臺“屬性”欄設定。

以上述模組JSON為例子,下圖中的模組,在啟動時,命令列如下:

[email protected]$ example_helloworld.exe --instance=6 --function=example_bitxor --mask=255 --data_in=6 --data_out=1

Comandline args

3.6 第一個Hello-world 模組的程式碼

我們把HelloWold模組的程式碼貼上到這裡,使用C++,可以非常方便的實現上述功能。

  • 該程式碼沒有使用任何標準C++之外的特性
  • 事實上,在第四章您可以看到,我們已經為接入平臺提前做好了不少簡化工作。但簡化工作會掩蓋細節,下面的程式碼仍舊是最體現模組執行原理的好例子。
#include <cstdio>
#include <string>
#include <cstring>
#include <cstdlib>
#ifdef WINNT
#include <io.h>
#include <fcntl.h>
#endif
using namespace std;
#define MAX_DTALEN 65536
int main(int argc, char * argv[])
{
	//In windows, stdio must be set to BINARY mode, to
	//prevent linebreak \\n\\r replace.
#ifdef WINNT
	setmode(fileno(stdout),O_BINARY);
	setmode(fileno(stdin),O_BINARY);
#endif
	bool bInfo = false, finished = false;
	int instance = 0;
	int sub_input = 0, sub_output = 0;
	char mask = 0;
	string function;
	//1. parse cmdline
	for (int i=1;i<argc;++i)
	{
		string arg_key = argv[i], arg_value = argv[i];
		int idx = arg_key.find('=');
		if (idx>=0 && idx<arg_key.size())
		{
			arg_key = arg_key.substr(0,idx);
			arg_value = arg_value.substr(idx+1);
		}
		if (arg_key=="--function")
			function = arg_value;
		else if (arg_key=="--information")
			bInfo = true;
		else if (arg_key=="--instance")
			instance = atoi(arg_value.c_str());
		else if (arg_key=="--data_in")
			sub_input = atoi(arg_value.c_str());
		else if (arg_key=="--data_out")
			sub_output = atoi(arg_value.c_str());
		else if (arg_key=="--mask")
			mask = atoi(arg_value.c_str());
		fprintf(stderr,"%s:%s\n",arg_key.c_str(),arg_value.c_str());
		fflush(stderr);
	}
	//2. function case
	if (bInfo)
	{
		//In this example, json file will be published with exe file.
		//We will return directly.  Or, you can output json here to stdout,
		//If you do not want to publish your json file.
		return 0;
	}
	else if (instance<=0 || function.length()==0)
		return -1;
	else
	{
		char header[4], data[MAX_DTALEN+1];
		memset(data,0,MAX_DTALEN+1);
		int n_sub = 0, n_path = 0, n_len = 0;
		while(false==finished)
		{
			fread(header,1,4,stdin);	//2.1 read header
			if (header[0]!=0x3C || header[1]!=0x5A || header[2]!=0x7E || header[3]!=0x69)
			{
				fprintf(stderr,"Bad header\n");
				break;
			}
			fread(&n_sub,sizeof(int),1,stdin);
			fread(&n_path,sizeof(int),1,stdin);
			fread(&n_len,sizeof(int),1,stdin);
			if (n_len<0 || n_len >MAX_DTALEN)
			{
				fprintf(stderr,"Bad length %d\n",n_len);
				break;
			}
			fread(data,sizeof(char),n_len,stdin);

			if (n_sub<=0)
			{
				if (strstr(data, "quit")!=nullptr)
				{
					finished = true;
					continue;
				}
			}
			else if (n_sub != sub_input)
				continue;

			if (function=="example_bitxor")
			{
				for (int i=0;i<n_len;++i)
					data[i] ^= mask;
			}
			else if (function=="example_reverse")
			{
				for (int i=0;i<n_len/2;++i)
				{
					char t = data[i];
					data[i] = data[n_len-1-i];
					data[n_len-1-i] = t;
				}
			}
			else
			{
				fprintf(stderr,"Unknown function %s\n",function.c_str());
				break;
			}
			fwrite(header,1,4,stdout);
			fwrite(&sub_output,sizeof(int),1,stdout);
			fwrite(&n_path,sizeof(int),1,stdout);
			fwrite(&n_len,sizeof(int),1,stdout);
			fwrite(data,sizeof(char),n_len,stdout);
			fflush(stdout);
		}
	}
	//3.exit
	return 0;
}

下一篇文章,我們通過一個示例來敘述客戶端模組的開發過程。