1. 程式人生 > >[Python開發] ctypes+struct實現類c的結構化資料序列處理

[Python開發] ctypes+struct實現類c的結構化資料序列處理

1. 用C/C++實現的結構化資料處理


在涉及到比較底層的通訊協議開發過程中, 往往需要開發語言能夠有效的表達和處理所定義的通訊協議的資料結構. 在這方面是C/C++語言是具有天然優勢的: 通過struct, union, 和bit-fields, C/C++能夠以一種最有效率也最自然的方式處理此類問題.

舉例說明一下, 下圖是智慧電網用於遠端自動抄表的通訊協議的一部分 
這裡寫圖片描述 
這裡寫圖片描述

用C可以描述如下:

  struct
  {
    unsigned char  uRouter:1;  //路由標識
    unsigned char   uSubNode:1;//附屬節點標識
    unsigned char   uCM:1;     //通訊模組標識
    unsigned char   uCD:1;     //衝突檢測
    unsigned char   uLevel:4;  //中繼級別
    unsigned char   uChannel:4;//通道標識
    unsigned char   uErrBate:4;//糾錯編碼標識
    unsigned char   uResBytes; //預計應答位元組數
    unsigned short  uSpeed:15; //通訊波特率,BIN格式
    unsigned short  uUnit:1;   //0:bps;1:kbps
    unsigned char  uReserve;
  } Req;

這樣不僅清楚的描述了完全符合通訊協議要求的報文資料結構, 而且還有至少以下兩個優點: 
1. 對結構中的任意變數取址賦值取值極其方便, 如

    struct Req r;
    r.uCD = 0;
    r.uChannel = 0x0F;

並不必費心的計算偏移量. 而且如果以後通訊協議升級了, 只需要將資料結構定義更改即可, 其餘程式碼完全不用變動. 
2. 更重要的是, 這個資料結構在計算機記憶體中天然的就是按照通訊協議的序列結構排列的(假設大端小端問題已設定正確), 只需要

    struct Req r;
    ...
    send((unsigned char *)&r, sizeof(r));

就可以以通訊協議完全一致的格式將資料轉換成位元組流傳送出去了. 而接收解析也同樣方便:

    struct Req rs;
    unsigned char rcv_buffer[100];
    ...
    rcv(rcv_buffer sizeof(Req));
    memcpy((unsigned char *)&rs, rcv_buffer, sizeof(r));

2. 用Python實現的結構化資料處理


現在問題來了: 如果用Python, 還能夠同樣方便的實現上述的結構化資料處理嗎? 也就是需要實現以下功能:

  1. 能夠以變數名訪問資料段, 不需要手動計算偏移量
  2. 能夠處理bit級的資料段
  3. 能夠方便的形成序列化通訊位元組流, 也能方便的從接收的位元組流中解析資料;

有人可能覺得這不是問題: 用python的字典不是也能實現嗎? 仔細想一想, 字典只能夠提供第一種需求, 即以變數名訪問資料段. 但python因為是高階語言, 整數只提供int一種資料結構, 而協議中很多時候資料段是bit級的, 或單位元組, 兩位元組, 三位元組的. 只用python原生的資料結構是不能直接訪問bit級的資料段的, 甚至連資料體最後到底佔了幾字節, 都不能方便的統計.

為了解決這個問題, 本質還是要退回到C語言的級別來. 好在python提供了ctypes這個庫, 能夠讓我們在python中實現類似C語言的功能.

>>> from ctypes import *
>>> class Req(Structure):
    _fields_=[('uRouter',c_ubyte,1),
            ('uSubNode',c_ubyte,1),
            ('uCM',c_ubyte,1),
            ('uCD',c_ubyte,1),
            ('uLevel',c_ubyte,4),
            ('uChannel',c_ubyte,4),
            ('uErrBate',c_ubyte,4),
            ('uResBytes',c_ubyte),
            ('uSpeed',c_ushort,15),
            ('uUnit',c_ushort,1),
            ('uReserve',c_ubyte)]
>>> r=Req()
>>> sizeof(r)
8
>>> r.uUnit=1
>>> print r.uUnit
1
>>> r.uUnit=2
>>> print r.uUnit
0

ctypes庫的最主要作用其實是用於python程式呼叫c編譯器生成的庫和dll, 但我們這裡只用到資料結構這一塊.

ctypes在使用時有以下注意事項:

  • 自定義的結構體類必須繼承Structure或Union類;
  • 自定義的結構體類中必須定義一個名為fields的列表變數, 其中每個元素是一個tuple, 定義了結構體每個資料單元資訊, 格式是(‘變數名字串’, 變數資料型別 [, 位元數])
  • 定義了class後, 可以用sizeof(類名)檢視資料體位元組數, 和c語言一樣. 然後用例項名.成員名進行相應資料單元的訪問, 如果繼承後定義了init()方法, 還可以進行類的初始化操作

3. 序列資料流處理


有了結構體, 上面的三條要求滿足了倆個, 關於第三個要求, ctypes雖然提供了cast()方法, 但經過我研究, 發現cast其實只能實現簡單的陣列等結構的資料型別指標轉換, 但無法像c那樣將結構體物件地址轉換成位元組地址的. 這種情況下就需要python的另一個庫:struct

struct是專門用於結構體與資料流轉換的庫, 我們用到的主要方法是pack()和unpack(). pack()的使用說明如下:

struct.pack(fmt, v1, v2, …) 
Return a string containing the values v1, v2, … packed according to the given format. The arguments must match the values required by the format exactly.

舉個例子:

>>> pack('BHB',1,2,3)
'\x01\x00\x02\x00\x03'
  • pack()的用法和format()很像, 第一個引數用一個字串指明瞭要轉換的格式, 例如’B’表示8位無符號整數, ‘H’表示16位無符號整數等等, 具體詳見python幫助裡關於struct庫的說明. 這裡的’BHB’就等於指明瞭, 將後面的三個數轉成位元組流, 第一個數以8位無符號數表示, 第二個以16位無符號數表示, 第三個以8位無符號數表示.

等等! 哪裡不對啊? 兩個8位無符號數, 一個16位無符號數, 加起來應該4個位元組才對. 可是我們看轉換結果’\x01\x00\x02\x00\x03’一共是五個位元組, 最後一個3也被當16無符號數處理了, 難道是bug了?

這個問題其實在幫助文件裡也說的很清楚了, 這是所謂machine’s native format和standard format的區別. 簡而言之就是, 對於有些C編譯器, 如果沒有做特殊編譯約束, 出於處理字寬的考慮, 對類似unsigned char這樣的資料, 並非真的用1位元組表示, 而是用處理時最適合cpu暫存器的長度表示, 比如跟在一個無符號16位數後面的一個無符號8位數, 就同樣用16位位寬表示. 這樣儘管浪費了記憶體, 但在定址賦值等處理起來更有效率… 總而言之, 如果一定要求嚴格的8位和16位, 就需要使用standard format, 就是在格式字串的首字母加以限定, 如:

>>> pack('>BhB',1,2,3)
'\x01\x00\x02\x03'

這裡的>表示: 位元組流轉換使用standard format, 而且使用大端模式.

4. 結構體的位元組流轉換


有了pack()這個工具, 再回到前面的結構體位元組流轉換上… 發現還是有問題啊, 因為pack()可以實現單位元組, 雙位元組, 卻沒法對bit field這種東西操作. 又該怎麼解決呢.

其實這個問題, 我也沒找到好的解決辦法, 畢竟pack()需要我們手工一個個指定變數, 定義順序和位元組長度. 這裡我提供一種解決方案, 那就是借用Union.

仍以前面的結構體為例, 換一種寫法:

>>> class Flag_Struct(Structure):
    _fields_=[('uRouter',c_ubyte,1),
            ('uSubNode',c_ubyte,1),
            ('uCM',c_ubyte,1),
            ('uCD',c_ubyte,1),
            ('uLevel',c_ubyte,4)]


>>> class Flag_Union(Union):
    _fields_=[('whole',c_ubyte),
            ('flag_struct',Flag_Struct)]


>>> class Channel_Struct(Structure):
    _fields_=[('uChannel',c_ubyte,4),
            ('uErrBate',c_ubyte,4)]


>>> class Channel_Union(Union):
    _fields_=[('whole',c_ubyte),
            ('channel_struct',Channel_Struct)]


>>> class Speed_Struct(Structure):
    _fields_=[('uSpeed',c_ushort,15),
            ('uUnit',c_ushort,1)]

>>> class Speed_Union(Union):
    _fields_=[('whole',c_ushort),
            ('speed_struct',Speed_Struct)]



>>> class Req(Structure):
    _pack_=1
    _fields_=[('flag',Flag_Union),
            ('channel',Channel_Union),
            ('uResBytes',c_ubyte),
            ('speed',Speed_Union),
            ('uReserve',c_ubyte)]

簡而言之, 就是所有涉及bit-field的欄位都用一個union和子struct來表示. (其中pack是為了1位元組對齊, 原因與上一節介紹過的native format和standard format類似). 這樣做的目的是為了折中位元欄位訪問與整位元組的轉化處理, 例如:

>>> r=Req()
>>> r.speed.speed_struct.uUnit=1
>>> r.flag.flag_struct.uLevel=0xf
>>> ack('>BBBHB',r.flag.whole,r.channel.whole,r.uResBytes,r.speed.whole,r.uReserve)
'\xf0\x00\x00\x80\x00\x00'

5. 一種更簡單的位元組流轉化方法


後來通過仔細檢視文件, 發現其實ctypes裡提供了一種更簡單的位元組流轉化方法:

string_at(addressof(r),sizeof(r))

addressof()和string_at都是ctypes裡提供的方法. 這是最接近於原生c的處理方法, 這樣連union都不用定義了

>>> class Req(Structure):
    _pack_=1
    _fields_=[('uRouter',c_ubyte,1),
            ('uSubNode',c_ubyte,1),
            ('uCM',c_ubyte,1),
            ('uCD',c_ubyte,1),
            ('uLevel',c_ubyte,4),
            ('uChannel',c_ubyte,4),
            ('uErrBate',c_ubyte,4),
            ('uResBytes',c_ubyte),
            ('uSpeed',c_ushort,15),
            ('uUnit',c_ushort,1),
            ('uReserve',c_ubyte)]


>>> sizeof(Req)
6
>>> r=Req()
>>> r.uUnit=1
>>> r.uCM=1
>>> string_at(addressof(r),sizeof(r))
'\x04\x00\x00\x00\x80\x00'

如果需要大端的資料結構, 超類需要選擇BigEndianStructure, 此時bit-field的定義也是從高到低的, 需要重新調整定義的順序, 如下:

>>> class Req(BigEndianStructure):
    _pack_=1
    _fields_=[('uLevel',c_ubyte,4),
            ('uCD',c_ubyte,1),
            ('uCM',c_ubyte,1),
            ('uSubNode',c_ubyte,1),
            ('uRouter',c_ubyte,1),
            ('uErrBate',c_ubyte,4),
            ('uChannel',c_ubyte,4),
            ('uResBytes',c_ubyte),
            ('uUnit',c_ushort,1),
            ('uSpeed',c_ushort,15),
            ('uReserve',c_ubyte)]


>>> r=Req()
>>> r.uLevel=0xf
>>> r.uUnit=1
>>> string_at(addressof(r),sizeof(r))
'\xf0\x00\x00\x80\x00\x00'

最後有人要問了: Python是一種高階語言, 為啥要做這麼低階的事情呢? 其實術業有專攻, 對於嵌入式通訊, 用python做高層的輔助測試工具是非常方便的.

(2015-12-14 補充)將位元組流灌注到結構體中實現解析的方法:

r = Req()
s = io_rcv()        #receive byte stream from io
memmove(addressof(r),s,sizeof(Req))
...