使用記憶體對映開發高效能程序間訊息通訊元件
一、背景
專案開發中免不了各模組或系統之間進行訊息通訊,目前熱門的訊息中介軟體有Redis、RabbitMQ、Kafka、RocketMQ等等。
以上幾種元件中Redis在訊息佇列方面表現還可以,但是如果涉及釋出訂閱功能,就不行了,最近專案就使用了redis的釋出訂閱,
每秒只能發出幾千條,雖然目前綽綽有餘,但是瓶頸可以預期。
其餘的幾種都是比較重量級的訊息中介軟體,什麼跨平臺、分散式、叢集、支援N種協議等等,很高大全,
我們可能就只使用了其中1、2個功能。嚴格來說,專案中整合這幾種MQ的工作量是不小的,對於中小型系統來說,可能維護MQ
穩定的工作量都比專案還大,難度也高,所有功能用全了的程式設計師恐怕不多。
從長遠考慮出發,選擇重量級MQ恐怕是板上釘釘的事,但是專案一開始就上這幾種,我覺得那也是欠缺考慮的。如果專案
根本不要求跨機器通訊,那殺雞就不要用牛刀了。比如,你只是在模組之間、執行緒之間、程序之間,或者是在同一主機的各種不同系統之間,
其實都可以不用重量級MQ。當然你使用了也沒事,看個人選擇。
最近的專案有這麼個場景,採集近所有底層裝置,每個裝置有點3000個,總共20多萬個點需要採集上來。剛開始使用了Redis的釋出訂閱,
但是程式毫無疑問地掛了,根本帶不起來;因為程式啟動時每個點的值都是從0變成N,就需要發訊息出來,那一開始訊息是很多的,redis根本
處理不完,而且有很高頻率的超時斷線。以至於想換RabbitMQ,後來想想還是算了,因為那樣增加專案難度不說,後期維護也是個難題。
說到底這是模組之間的通訊,是主程式(Winform)呼叫採集C++的DLL類庫,發出訊息後主程式和web端訂閱,在主程式與DLL這邊,在DLL
方法上增加一個回撥函式就搞定了,完全不用走訊息中介軟體,Web端要哪些點的實時值就先ASK,先請求需要看哪些點,如何在主程式這邊
釋出那些點的實時值訊息,這樣釋出訂閱的資料量少了2、3個數量級不止。
二、需求
針對上邊的業務場景,因為是模組之間的執行緒間通訊,這樣搞問題不大;如果是程序之間也要那麼高頻率的通訊,那就不好辦了,我們
不想使用重量級MQ,又想高頻率傳輸訊息,怎麼辦呢?網上搜索了一番,貌似沒看到有成熟的速度又快、體量又小,部署又簡單的中介軟體。
所以在下不才,針對這個問題拋磚引玉,做一個demo出來供大家討論一下。
三、原理
應題,就是使用記憶體對映來做同一個機器下各種訊息的通訊,之前也寫過一篇關於使用共享記憶體實現快速讀寫的文章,點選前往瀏覽
“.net環境下跨程序、高頻率讀寫資料”,但是記憶體對映比較適合做訊息佇列,因為訊息可以持久化在本地,沒讀完下次進來還可以接著讀。
我預想是這樣設計:
1、釋出訂閱涉及到2個主要方法:Publish(string channel)、Subscribe(string channel, Callback callback);
2、為每個channel生成一個檔案:channel.db,預設每個db可以儲存1000個同類型的結構體訊息作為訊息佇列,從頭部寫入,尾部讀出。
每個db檔案前面留一個索引區作為釋出方與訂閱方各自的讀寫位置。釋出與訂閱前,先讀寫這個索引區,因為是一對一讀寫,所以
可以完美避開讀寫鎖,大大提高效能。
3、針對一對多需求,單獨設計一個config.db檔案儲存種channel與其相關訂閱資訊,大概原理圖如下:
4、解決讀寫不加鎖問題
我們看結構體:SIndex有三個屬性
1) WriteIndex 記錄釋出方(Pubish)最後寫入資料的位置
2) ReadIndex 記錄訂閱方(Subscribe)最後讀取資料的位置
3) Over 表示WriteIndex已達到佇列最大值,再WriteIndex小於等於佇列最大值前,讀寫如下圖:
WriteIndex達到最大值後再往下寫Over就要取反,如由False變為True。WriteIndex=0
如果此時沒有訂閱方,那新訊息就會被拋棄,因為已無空間儲存。
4) 如果ReadIndex數值到佇列最大值,Over也取反,此時ReadIndex = 0,讀寫又變成圖1所示
5) 讀寫過程中並不存在互斥的情況,只要管理好讀寫位置,就可以避免加鎖。
四、介面設計
4.1、主要引數定義
#define FM_MAX_CHANNEL 100 // 暫定最多100個不同頻道 #define FM_MAX_SUBSCRIBE 3 // 暫定最多3個訂閱使用者 #define FM_MAX_ROWS 1000 // 暫定最多佇列大小為1000 #define FM_DISCONNECT_TIME 5000 // 超過5000毫秒無心跳更新視為訂閱斷開 #define FM_KEEP_CONN_CYCLE 1000 // 保持心跳連線的時間週期 #define FM_NOTHING -1 // 空白,陣列為0等 #define FM_WORD_SIZE sizeof(WORD) // WORD長度 #define FM_INDEX_SIZE sizeof(SIndex) // SIndex長度
4.2、結構體
1 // 索引 2 typedef struct 3 { 4 WORD WriteIndex; 5 WORD ReadIndex; 6 WORD Over; // 當W或R超過MAX一次,Over取反一次,Over預設為False 7 }SIndex; 8 9 // 記憶體對映引數 10 typedef struct 11 { 12 HANDLE FileHandle; 13 HANDLE FileMappingHandle; 14 LPVOID MapViewOfFileHandle; 15 UINT StructSize; 16 char FileName[20]; 17 UINT SubscribeIndex; 18 WORD Conned; 19 }SDbConnInfo; 20 21 // 頻道 22 typedef struct 23 { 24 char ChannelName[20]; 25 UINT StructSize; 26 DWORD Subscribe1LastTime; 27 DWORD Subscribe2LastTime; 28 DWORD Subscribe3LastTime; 29 }SChannel; 30 31 // 頻道與訂閱對映 32 typedef struct 33 { 34 char ChannelName[20]; 35 SDbConnInfo DbConnInfo[FM_MAX_SUBSCRIBE]; 36 }SChannelMapDbConnInfo;View Code
4.3、主要方法
// 釋出資訊 template<typename T> int Publish(const char *channel, T* data); // 訂閱資訊 template<typename T> void Subscribe(const char *channel, SubscribeCallBackHandle callback);
五、程式碼實現
5.1 、FMDBManager,主要管理記憶體對映相關操作,因為是讀寫位置不一樣,所以不需要加互斥量
1 class FMDBManager 2 { 3 public: 4 FMDBManager() {}; 5 ~FMDBManager() {}; 6 7 public: 8 static int Create(SDbConnInfo *info) 9 { 10 CString fileName(info->FileName); 11 DWORD totalSize = (FM_MAX_ROWS * info->StructSize) + FM_INDEX_SIZE; 12 13 info->FileHandle = CreateFile(fileName, (GENERIC_READ | GENERIC_WRITE), (FILE_SHARE_READ | FILE_SHARE_WRITE), 14 NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL); 15 16 info->FileMappingHandle = CreateFileMapping(info->FileHandle, NULL, PAGE_READWRITE, 0, totalSize, NULL); 17 18 if(info->FileMappingHandle == NULL || info->FileMappingHandle == INVALID_HANDLE_VALUE) 19 { 20 Log(""); 21 CloseHandle(info->FileHandle); 22 return enumFail; 23 } 24 25 if(GetLastError() == ERROR_ALREADY_EXISTS) 26 { 27 Log(""); 28 return enumFail; 29 } 30 31 // init 32 info->MapViewOfFileHandle = MapViewOfFile(info->FileMappingHandle, FILE_MAP_ALL_ACCESS, 0, 0, totalSize); 33 34 if(info->MapViewOfFileHandle == NULL) 35 { 36 Log(""); 37 CloseHandle(info->FileMappingHandle); 38 CloseHandle(info->FileHandle); 39 return enumFail; 40 } 41 42 return enumSuccess; 43 } 44 45 protected: 46 int Write(void *data, UINT order, SDbConnInfo *info) 47 { 48 if(info->MapViewOfFileHandle == NULL) 49 { 50 Log(""); 51 return enumFail; 52 } 53 else 54 memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, data, info->StructSize); 55 56 return enumSuccess; 57 } 58 int Read(void *data, UINT order, SDbConnInfo *info) 59 { 60 if(info->MapViewOfFileHandle == NULL) 61 { 62 Log(""); 63 return enumFail; 64 } 65 else 66 memcpy(data, (char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, info->StructSize); 67 68 return enumSuccess; 69 } 70 int Delete(UINT order, SDbConnInfo *info) 71 { 72 if(info->MapViewOfFileHandle == NULL) 73 { 74 Log(""); 75 return enumFail; 76 } 77 else 78 memset((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, 0, info->StructSize); 79 80 return enumSuccess; 81 } 82 83 int WriteConfig(void *data, UINT order, UINT pos, UINT size, SDbConnInfo *info) 84 { 85 if(info->MapViewOfFileHandle == NULL) 86 { 87 Log(""); 88 return enumFail; 89 } 90 else 91 memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE + pos, data, size); 92 93 return enumSuccess; 94 } 95 int WriteIndex(void *data, UINT pos, UINT size, SDbConnInfo *info) 96 { 97 if(info->MapViewOfFileHandle == NULL) 98 { 99 Log(""); 100 return enumFail; 101 } 102 else 103 memcpy((char *)info->MapViewOfFileHandle + pos, data, size); 104 105 return enumSuccess; 106 } 107 int ReadIndex(SIndex *sIndex, SDbConnInfo *info) 108 { 109 if(info->MapViewOfFileHandle == NULL) 110 { 111 Log(""); 112 return enumFail; 113 } 114 else 115 memcpy(sIndex, (char *)info->MapViewOfFileHandle, FM_INDEX_SIZE); 116 117 return enumSuccess; 118 } 119 };View Code
5.2、FMDBClient,記憶體對映客戶端,主要封裝Publish與Subscribe方法給前端呼叫,遮蔽複雜性
1 class FMDBClient : public FMDBManager 2 { 3 private: 4 mutable std::mutex mut; 5 SChannelMapDbConnInfo channelMapDbConnInfo = { 0 }; 6 7 bool CanWrite(SIndex *sIndex) 8 { 9 int nextWriteIndex = sIndex->WriteIndex + 1; 10 if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0; 11 12 return nextWriteIndex != sIndex->ReadIndex; 13 } 14 bool CanRead(SIndex *sIndex) { 15 if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex; 16 else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex; 17 } 18 19 int GetDbConnInfo(const char *channel, int size) 20 { 21 int rest = enumFail; 22 23 for(int i = 0; i < FM_MAX_CHANNEL; i++) 24 { 25 char channelNameTmp[20] = { 0 }; 26 sprintf_s(channelNameTmp, "%s", channel); 27 28 if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName)) 29 { 30 channelMapDbConnInfo = channelMapDbConnInfoArray[i]; 31 rest = enumSuccess; 32 break; 33 } 34 } 35 36 return rest; 37 } 38 int SetDbConnInfo(const char *channel, UINT *subscribeIndex, SDbConnInfo *dbConnInfo) 39 { 40 std::lock_guard<std::mutex> lk(mut); 41 42 int nextSubscribeIndex = fmdbConfig->GetNextSubscribeIndex(channel); 43 if(nextSubscribeIndex == FM_NOTHING) 44 { 45 SChannel sChannel = { 0 }; 46 sprintf_s(sChannel.ChannelName, "%s", channel); 47 sChannel.Subscribe1LastTime = GetTickCount(); 48 sChannel.StructSize = dbConnInfo->StructSize; 49 50 sprintf_s(dbConnInfo->FileName, "%s.1.db", channel); 51 if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0) 52 { 53 dbConnInfo->SubscribeIndex = 1; 54 *subscribeIndex = dbConnInfo->SubscribeIndex; 55 56 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Insert(&sChannel); 57 else sprintf_s(dbConnInfo->FileName, "%s", channel); //還原名稱 58 } 59 } 60 61 if(nextSubscribeIndex > 1) 62 { 63 sprintf_s(dbConnInfo->FileName, "%s.%d.db", channel, nextSubscribeIndex); 64 if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0) 65 { 66 dbConnInfo->SubscribeIndex = nextSubscribeIndex; 67 *subscribeIndex = nextSubscribeIndex; 68 69 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Save(channel, nextSubscribeIndex); 70 else sprintf_s(dbConnInfo->FileName, "%s", channel); //還原名稱 71 } 72 } 73 74 return enumFail; 75 } 76 bool SetSubscribeConned(const char *channel, int subscribeIndex, SDbConnInfo *dbConnInfo) 77 { 78 int rest = enumFail; 79 80 if(subscribeIndex <= 0) return rest; 81 82 for(int i = 0; i < FM_MAX_CHANNEL; i++) 83 { 84 char channelNameTmp[20] = { 0 }; 85 sprintf_s(channelNameTmp, "%s", channel); 86 87 if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName)) 88 { 89 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].SubscribeIndex = dbConnInfo->SubscribeIndex; 90 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].Conned = 1; 91 rest = enumSuccess; 92 break; 93 } 94 } 95 96 return rest; 97 } 98 bool IsConning(SDbConnInfo *dbConnInfo) { return true; }; 99 100 public: 101 FMDBClient() 102 { 103 while(!fmdbConfigLoadFinish) { Sleep(200); } 104 }; 105 ~FMDBClient() {}; 106 107 public: 108 int failTimes = 0; 109 110 template<typename T> 111 int Publish(const char *channel, T* data) 112 { 113 int rest = enumFail; 114 115 // 查詢 116 if(GetDbConnInfo(channel, sizeof(T)) == enumFail) 117 { 118 printf_s("釋出%s失敗.\n", channel); 119 return enumFail; 120 } 121 122 for(int i = 0; i < FM_MAX_SUBSCRIBE; i++) 123 { 124 if(channelMapDbConnInfo.DbConnInfo[i].FileHandle == NULL) continue; 125 126 while(IsConning(&channelMapDbConnInfo.DbConnInfo[i])) 127 { 128 SIndex sIndex = { 0 }; 129 if(ReadIndex(&sIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumFail) 130 { 131 throw "對映檔案載入失敗"; 132 } 133 134 if(CanWrite(&sIndex)) 135 { 136 WORD writeIndex = sIndex.WriteIndex; 137 if(Write(data, writeIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumSuccess) 138 { 139 writeIndex++; 140 if(writeIndex > FM_MAX_ROWS) 141 { 142 writeIndex = 0; 143 144 WORD Over = TRUE; 145 WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]); 146 } 147 148 rest = WriteIndex(&writeIndex, 0, FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]); 149 break; 150 } 151 } 152 else 153 { 154 failTimes++; 155 } 156 } 157 } 158 159 return rest; 160 } 161 162 template<typename T> 163 void Subscribe(const char *channel, SubscribeCallBackHandle callback) 164 { 165 SDbConnInfo dbConnInfo = { 0 }; 166 dbConnInfo.StructSize = sizeof(T); 167 168 UINT subscribeIndex = 0; 169 if(SetDbConnInfo(channel, &subscribeIndex, &dbConnInfo) == enumFail) 170 { 171 printf_s("訂閱%s失敗.\n", channel); 172 return; 173 } 174 175 while(IsConning(&dbConnInfo)) 176 { 177 SetSubscribeConned(channel, subscribeIndex, &dbConnInfo); 178 179 SIndex sIndex = { 0 }; 180 if(ReadIndex(&sIndex, &dbConnInfo) == enumFail) throw "對映檔案載入失敗"; 181 if(!CanRead(&sIndex)) continue; 182 183 T t = { 0 }; 184 int readIndex = sIndex.ReadIndex; 185 if(Read(&t, readIndex, &dbConnInfo) == enumSuccess) 186 { 187 readIndex++; 188 if(readIndex > FM_MAX_ROWS) 189 { 190 readIndex = 0; 191 192 WORD Over = FALSE; 193 WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &dbConnInfo); 194 } 195 196 if(WriteIndex(&readIndex, FM_WORD_SIZE, FM_WORD_SIZE, &dbConnInfo) == enumSuccess) 197 if(Delete(sIndex.ReadIndex, &dbConnInfo) == enumSuccess) 198 if(callback(&t) == enumBreak) break; 199 } 200 } 201 } 202 };View Code
請注意上邊控制讀寫的2個方法
bool CanWrite(SIndex *sIndex) { int nextWriteIndex = sIndex->WriteIndex + 1; if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0; return nextWriteIndex != sIndex->ReadIndex; } bool CanRead(SIndex *sIndex) { if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex; else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex; }
我們可以分析一下,下一個WriteIndex值如果大於佇列最大值 WriteIndex置0,下一個WriteIndex數值如果不等於
正在讀的位置ReadIndex就能寫;如果WriteIndex沒有超出最大值,只要ReadIndex小於等於WriteIndex就能讀,
如果超出,就判斷ReadIndex大於WriteIndex就能讀。WriteIndex與ReadIndex數值在Publish與Subscribe中維護
5.3、建立新執行緒獲取最新訂閱的客戶端資訊,這個功能主要是動態地像多個Subscribe端發生訊息,比如訂閱發生在釋出之後,
也應該能收到訊息。
1 void Update() 2 { 3 while(true) 4 { 5 if(fmdbConfig->GetChannelArray() == enumSuccess) 6 { 7 for(int i = 0; i < FM_MAX_CHANNEL; i++) 8 { 9 if(fmdbConfig->IsFM_NOTHING(channelMapDbConnInfoArray[i].ChannelName)) continue; 10 11 for(int j = 0; j < FM_MAX_SUBSCRIBE; j++) 12 { 13 if(channelMapDbConnInfoArray[i].DbConnInfo[j].StructSize <= 0) continue; 14 15 // KeepConned 16 if(channelMapDbConnInfoArray[i].DbConnInfo[j].Conned) 17 { 18 fmdbConfig->KeepConned(channelMapDbConnInfoArray[i].ChannelName, 19 channelMapDbConnInfoArray[i].DbConnInfo[j].SubscribeIndex); 20 21 channelMapDbConnInfoArray[i].DbConnInfo[j].Conned = 0; 22 //printf_s("%s.KeepConned.\n", channelDbParsArray[i].SDbPars[j].Channel); 23 } 24 25 if(!fmdbConfig->Exists(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName)) 26 { 27 FMDBManager::Create(&channelMapDbConnInfoArray[i].DbConnInfo[j]); 28 fmdbConfig->AddChannel(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName); 29 } 30 } 31 } 32 } 33 34 fmdbConfigLoadFinish = true; 35 Sleep(1000); 36 } 37 } 38 thread th(Update);View Code
六、Demo測試
6.1、Producer.cpp
1 #include "pch.h" 2 #include "../FMDB.h" 3 4 using namespace std; 5 6 int main() 7 { 8 FMClient * client = new FMClient(); 9 10 int times = 0; 11 int index = 0; 12 int total = 0; 13 UINT structSize = sizeof(SPerson); 14 DWORD dwStartTmp = GetTickCount(); 15 16 while(TRUE) 17 { 18 times++; 19 if(index == 0) 20 { 21 dwStartTmp = GetTickCount(); 22 } 23 24 SPerson sPerson = { 0 }; 25 sPerson.Idx = index; 26 sprintf_s(sPerson.Name, "Name.%d", index); 27 sPerson.Age = index; 28 29 if(client->Publish("Person", &sPerson) == enumSuccess) 30 { 31 if(index % 2 == 0) total = total + sPerson.Idx; 32 else total = total - sPerson.Idx; 33 34 index++; 35 if(index % 50000 == 0) 36 printf_s("傳送條數: %d, 耗時:%d \n", index, (GetTickCount() - dwStartTmp)); 37 } 38 39 if(index >= 2000000) break; 40 } 41 42 printf_s("呼叫次數: %d, 成功條數: %d, 檢驗值: %d \n", times, index, total); 43 system("pause"); 44 }
6.2、Consumer.cpp
1 #include "pch.h" 2 #include "../FMDB.h" 3 4 using namespace std; 5 6 int index = 0; 7 int total = 0; 8 DWORD dwStartTmp = GetTickCount(); 9 10 int SubscribeCallback(void *msg) 11 { 12 SPerson * person = (SPerson *)msg; 13 14 if(index == 0) 15 { 16 dwStartTmp = GetTickCount(); 17 } 18 19 if(index % 2 == 0) total = total + person->Idx; 20 else total = total - person->Idx; 21 22 index++; 23 if(index % 50000 == 0) 24 { 25 printf("接收條數: %d, 耗時:%d, Idx:%d, Name:%s, Age:%d\n", 26 index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age); 27 } 28 29 if(index >= 2000000) 30 { 31 return enumBreak; 32 } 33 34 return enumSuccess; 35 }; 36 37 int main() 38 { 39 FMClient * client = new FMClient(); 40 client->Subscribe<SPerson>("Person", SubscribeCallback); 41 42 printf("接收條數: %d, 檢驗值: %d \n", index, total); 43 system("pause"); 44 }
6.3、執行,測試用例中使用了向佇列傳送200萬條資料,訊息大小128位元組,訂閱端也是接受到200萬資料後退出,並且列印檢驗值。
1) 檢驗值計算:0+1-2+3-4+ --------- - 2000000 = -1000000,如果佇列執行正常,那兩邊的檢驗值應該都是是 -1000000.
2) 每5萬條列印一次日誌,執行情況如下
一對一方式執行三次,分別耗時(毫秒):2886、2979、2871
3) 一對二方式執行三次,分別耗時(毫秒):4087、4009、4040
4)執行過程中產生的檔案
6.4、200萬資料一對一耗時近3秒,貌似也不是非常快是不是?但是這就是最大速度了嗎?
當然不是哦,別忘了這是debug版本,我們切換到release版本看速度會不會有所提升。
一對一執行三次耗時分別是:1224、1373、1326
厲害了,
SPerson結構體128位元組,每秒可以處理180萬資料,當然實際運用肯定達不到,因為處理其他業務邏輯也要耗時間。
好了,為了這個demo腦殼都想疼了,思考模型,除錯BUG,期間各種問題,實在茶壺煮餃子,有苦說不出。
你看,又浪費我週末2天時間,期間就吃了一餐,今天的還沒吃呢,等下去旁邊山上走走,不然就要發黴了。拜拜。。。