1. 程式人生 > >使用記憶體對映開發高效能程序間訊息通訊元件

使用記憶體對映開發高效能程序間訊息通訊元件

一、背景

  專案開發中免不了各模組或系統之間進行訊息通訊,目前熱門的訊息中介軟體有Redis、RabbitMQ、Kafka、RocketMQ等等。

以上幾種元件中Redis在訊息佇列方面表現還可以,但是如果涉及釋出訂閱功能,就不行了,最近專案就使用了redis的釋出訂閱,

每秒只能發出幾千條,雖然目前綽綽有餘,但是瓶頸可以預期。

  其餘的幾種都是比較重量級的訊息中介軟體,什麼跨平臺、分散式、叢集、支援N種協議等等,很高大全,

我們可能就只使用了其中1、2個功能。嚴格來說,專案中整合這幾種MQ的工作量是不小的,對於中小型系統來說,可能維護MQ

穩定的工作量都比專案還大,難度也高,所有功能用全了的程式設計師恐怕不多。

  從長遠考慮出發,選擇重量級MQ恐怕是板上釘釘的事,但是專案一開始就上這幾種,我覺得那也是欠缺考慮的。如果專案

根本不要求跨機器通訊,那殺雞就不要用牛刀了。比如,你只是在模組之間、執行緒之間、程序之間,或者是在同一主機的各種不同系統之間,

其實都可以不用重量級MQ。當然你使用了也沒事,看個人選擇。

  最近的專案有這麼個場景,採集近所有底層裝置,每個裝置有點3000個,總共20多萬個點需要採集上來。剛開始使用了Redis的釋出訂閱,

但是程式毫無疑問地掛了,根本帶不起來;因為程式啟動時每個點的值都是從0變成N,就需要發訊息出來,那一開始訊息是很多的,redis根本

處理不完,而且有很高頻率的超時斷線。以至於想換RabbitMQ,後來想想還是算了,因為那樣增加專案難度不說,後期維護也是個難題。

說到底這是模組之間的通訊,是主程式(Winform)呼叫採集C++的DLL類庫,發出訊息後主程式和web端訂閱,在主程式與DLL這邊,在DLL

方法上增加一個回撥函式就搞定了,完全不用走訊息中介軟體,Web端要哪些點的實時值就先ASK,先請求需要看哪些點,如何在主程式這邊

釋出那些點的實時值訊息,這樣釋出訂閱的資料量少了2、3個數量級不止。

二、需求

  針對上邊的業務場景,因為是模組之間的執行緒間通訊,這樣搞問題不大;如果是程序之間也要那麼高頻率的通訊,那就不好辦了,我們

不想使用重量級MQ,又想高頻率傳輸訊息,怎麼辦呢?網上搜索了一番,貌似沒看到有成熟的速度又快、體量又小,部署又簡單的中介軟體。

所以在下不才,針對這個問題拋磚引玉,做一個demo出來供大家討論一下。

三、原理

  應題,就是使用記憶體對映來做同一個機器下各種訊息的通訊,之前也寫過一篇關於使用共享記憶體實現快速讀寫的文章,點選前往瀏覽

“.net環境下跨程序、高頻率讀寫資料”,但是記憶體對映比較適合做訊息佇列,因為訊息可以持久化在本地,沒讀完下次進來還可以接著讀。

我預想是這樣設計:

1、釋出訂閱涉及到2個主要方法:Publish(string channel)、Subscribe(string channel, Callback callback);

2、為每個channel生成一個檔案:channel.db,預設每個db可以儲存1000個同類型的結構體訊息作為訊息佇列,從頭部寫入,尾部讀出。

   每個db檔案前面留一個索引區作為釋出方與訂閱方各自的讀寫位置。釋出與訂閱前,先讀寫這個索引區,因為是一對一讀寫,所以

      可以完美避開讀寫鎖,大大提高效能。

3、針對一對多需求,單獨設計一個config.db檔案儲存種channel與其相關訂閱資訊,大概原理圖如下:

 

4、解決讀寫不加鎖問題

我們看結構體:SIndex有三個屬性

1) WriteIndex 記錄釋出方(Pubish)最後寫入資料的位置

2) ReadIndex 記錄訂閱方(Subscribe)最後讀取資料的位置

3) Over 表示WriteIndex已達到佇列最大值,再WriteIndex小於等於佇列最大值前,讀寫如下圖:

WriteIndex達到最大值後再往下寫Over就要取反,如由False變為True。WriteIndex=0

如果此時沒有訂閱方,那新訊息就會被拋棄,因為已無空間儲存。

4) 如果ReadIndex數值到佇列最大值,Over也取反,此時ReadIndex = 0,讀寫又變成圖1所示

5) 讀寫過程中並不存在互斥的情況,只要管理好讀寫位置,就可以避免加鎖。

四、介面設計

4.1、主要引數定義

#define FM_MAX_CHANNEL		100		// 暫定最多100個不同頻道
#define FM_MAX_SUBSCRIBE	3		// 暫定最多3個訂閱使用者
#define FM_MAX_ROWS			1000	// 暫定最多佇列大小為1000
#define FM_DISCONNECT_TIME  5000	// 超過5000毫秒無心跳更新視為訂閱斷開
#define FM_KEEP_CONN_CYCLE  1000	// 保持心跳連線的時間週期
#define FM_NOTHING			-1		// 空白,陣列為0等
#define FM_WORD_SIZE		sizeof(WORD)	// WORD長度
#define FM_INDEX_SIZE		sizeof(SIndex)  // SIndex長度 

4.2、結構體

 1 // 索引
 2 typedef struct
 3 {
 4     WORD WriteIndex;
 5     WORD ReadIndex;
 6     WORD Over;         // 當W或R超過MAX一次,Over取反一次,Over預設為False
 7 }SIndex;
 8 
 9 // 記憶體對映引數
10 typedef struct
11 {
12     HANDLE FileHandle;
13     HANDLE FileMappingHandle;
14     LPVOID MapViewOfFileHandle;
15     UINT StructSize;
16     char FileName[20];
17     UINT SubscribeIndex;
18     WORD Conned;
19 }SDbConnInfo;
20 
21 // 頻道
22 typedef struct
23 {
24     char ChannelName[20];
25     UINT StructSize;
26     DWORD Subscribe1LastTime;
27     DWORD Subscribe2LastTime;
28     DWORD Subscribe3LastTime;
29 }SChannel;
30 
31 // 頻道與訂閱對映
32 typedef struct
33 {
34     char ChannelName[20];
35     SDbConnInfo DbConnInfo[FM_MAX_SUBSCRIBE];
36 }SChannelMapDbConnInfo;
View Code

 

4.3、主要方法

	// 釋出資訊
	template<typename T>
	int Publish(const char *channel, T* data);

	// 訂閱資訊
	template<typename T>
	void Subscribe(const char *channel, SubscribeCallBackHandle callback);

五、程式碼實現

5.1 、FMDBManager,主要管理記憶體對映相關操作,因為是讀寫位置不一樣,所以不需要加互斥量

  1 class FMDBManager
  2 {
  3 public:
  4     FMDBManager() {};
  5     ~FMDBManager() {};
  6 
  7 public:
  8     static int Create(SDbConnInfo *info)
  9     {
 10         CString fileName(info->FileName);
 11         DWORD totalSize = (FM_MAX_ROWS * info->StructSize) + FM_INDEX_SIZE;
 12 
 13         info->FileHandle = CreateFile(fileName, (GENERIC_READ | GENERIC_WRITE), (FILE_SHARE_READ | FILE_SHARE_WRITE),
 14             NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL);
 15 
 16         info->FileMappingHandle = CreateFileMapping(info->FileHandle, NULL, PAGE_READWRITE, 0, totalSize, NULL);
 17 
 18         if(info->FileMappingHandle == NULL || info->FileMappingHandle == INVALID_HANDLE_VALUE) 
 19         {
 20             Log("");
 21             CloseHandle(info->FileHandle);
 22             return enumFail;
 23         }
 24 
 25         if(GetLastError() == ERROR_ALREADY_EXISTS) 
 26         {
 27             Log("");
 28             return enumFail;
 29         }
 30 
 31         // init
 32         info->MapViewOfFileHandle = MapViewOfFile(info->FileMappingHandle, FILE_MAP_ALL_ACCESS, 0, 0, totalSize);
 33 
 34         if(info->MapViewOfFileHandle == NULL) 
 35         {
 36             Log("");
 37             CloseHandle(info->FileMappingHandle);
 38             CloseHandle(info->FileHandle);
 39             return enumFail;
 40         }
 41 
 42         return enumSuccess;
 43     }
 44 
 45 protected:
 46     int Write(void *data, UINT order, SDbConnInfo *info)
 47     {
 48         if(info->MapViewOfFileHandle == NULL) 
 49         {
 50             Log("");
 51             return enumFail;
 52         }
 53         else
 54             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, data, info->StructSize);
 55 
 56         return enumSuccess;
 57     }
 58     int Read(void *data, UINT order, SDbConnInfo *info)
 59     {
 60         if(info->MapViewOfFileHandle == NULL) 
 61         {
 62             Log("");
 63             return enumFail;
 64         }
 65         else
 66             memcpy(data, (char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, info->StructSize);
 67 
 68         return enumSuccess;
 69     }
 70     int Delete(UINT order, SDbConnInfo *info)
 71     {
 72         if(info->MapViewOfFileHandle == NULL) 
 73         {
 74             Log("");
 75             return enumFail;
 76         }
 77         else
 78             memset((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, 0, info->StructSize);
 79 
 80         return enumSuccess;
 81     }
 82 
 83     int WriteConfig(void *data, UINT order, UINT pos, UINT size, SDbConnInfo *info)
 84     {
 85         if(info->MapViewOfFileHandle == NULL) 
 86         {
 87             Log("");
 88             return enumFail;
 89         }
 90         else
 91             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE + pos, data, size);
 92 
 93         return enumSuccess;
 94     }
 95     int WriteIndex(void *data, UINT pos, UINT size, SDbConnInfo *info)
 96     {
 97         if(info->MapViewOfFileHandle == NULL) 
 98         {
 99             Log("");
100             return enumFail;
101         }
102         else
103             memcpy((char *)info->MapViewOfFileHandle + pos, data, size);
104 
105         return enumSuccess;
106     }
107     int ReadIndex(SIndex *sIndex, SDbConnInfo *info)
108     {
109         if(info->MapViewOfFileHandle == NULL) 
110         {
111             Log("");
112             return enumFail;
113         }
114         else
115             memcpy(sIndex, (char *)info->MapViewOfFileHandle, FM_INDEX_SIZE);
116 
117         return enumSuccess;
118     }
119 };
View Code

5.2、FMDBClient,記憶體對映客戶端,主要封裝Publish與Subscribe方法給前端呼叫,遮蔽複雜性

  1 class FMDBClient : public FMDBManager
  2 {
  3 private:
  4     mutable std::mutex mut;
  5     SChannelMapDbConnInfo channelMapDbConnInfo = { 0 };
  6 
  7     bool CanWrite(SIndex *sIndex)
  8     {
  9         int nextWriteIndex = sIndex->WriteIndex + 1;
 10         if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
 11 
 12         return nextWriteIndex != sIndex->ReadIndex;
 13     }
 14     bool CanRead(SIndex *sIndex) {
 15         if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
 16         else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
 17     }
 18 
 19     int GetDbConnInfo(const char *channel, int size)
 20     {
 21         int rest = enumFail;
 22 
 23         for(int i = 0; i < FM_MAX_CHANNEL; i++)
 24         {
 25             char channelNameTmp[20] = { 0 };
 26             sprintf_s(channelNameTmp, "%s", channel);
 27 
 28             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
 29             {
 30                 channelMapDbConnInfo = channelMapDbConnInfoArray[i];
 31                 rest = enumSuccess;
 32                 break;
 33             }
 34         }
 35 
 36         return rest;
 37     }
 38     int SetDbConnInfo(const char *channel, UINT *subscribeIndex, SDbConnInfo *dbConnInfo)
 39     {
 40         std::lock_guard<std::mutex> lk(mut);
 41 
 42         int nextSubscribeIndex = fmdbConfig->GetNextSubscribeIndex(channel);
 43         if(nextSubscribeIndex == FM_NOTHING)
 44         {
 45             SChannel sChannel = { 0 };
 46             sprintf_s(sChannel.ChannelName, "%s", channel);
 47             sChannel.Subscribe1LastTime = GetTickCount();
 48             sChannel.StructSize = dbConnInfo->StructSize;
 49 
 50             sprintf_s(dbConnInfo->FileName, "%s.1.db", channel);
 51             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
 52             {
 53                 dbConnInfo->SubscribeIndex = 1;
 54                 *subscribeIndex = dbConnInfo->SubscribeIndex;
 55 
 56                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Insert(&sChannel);
 57                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //還原名稱
 58             }
 59         }
 60 
 61         if(nextSubscribeIndex > 1)
 62         {
 63             sprintf_s(dbConnInfo->FileName, "%s.%d.db", channel, nextSubscribeIndex);
 64             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
 65             {
 66                 dbConnInfo->SubscribeIndex = nextSubscribeIndex;
 67                 *subscribeIndex = nextSubscribeIndex;
 68 
 69                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Save(channel, nextSubscribeIndex);
 70                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //還原名稱
 71             }
 72         }
 73 
 74         return enumFail;
 75     }
 76     bool SetSubscribeConned(const char *channel, int subscribeIndex, SDbConnInfo *dbConnInfo)
 77     {
 78         int rest = enumFail;
 79 
 80         if(subscribeIndex <= 0) return rest;
 81 
 82         for(int i = 0; i < FM_MAX_CHANNEL; i++)
 83         {
 84             char channelNameTmp[20] = { 0 };
 85             sprintf_s(channelNameTmp, "%s", channel);
 86 
 87             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
 88             {
 89                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].SubscribeIndex = dbConnInfo->SubscribeIndex; 
 90                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].Conned = 1;
 91                 rest = enumSuccess;
 92                 break;
 93             }
 94         }
 95 
 96         return rest;
 97     }
 98     bool IsConning(SDbConnInfo *dbConnInfo) { return true; };
 99 
100 public:
101     FMDBClient()
102     {
103         while(!fmdbConfigLoadFinish) { Sleep(200); }
104     };
105     ~FMDBClient() {};
106 
107 public:
108     int failTimes = 0;
109 
110     template<typename T>
111     int Publish(const char *channel, T* data)
112     {
113         int rest = enumFail;
114 
115         // 查詢
116         if(GetDbConnInfo(channel, sizeof(T)) == enumFail)
117         {
118             printf_s("釋出%s失敗.\n", channel);
119             return enumFail;
120         }
121 
122         for(int i = 0; i < FM_MAX_SUBSCRIBE; i++)
123         {
124             if(channelMapDbConnInfo.DbConnInfo[i].FileHandle == NULL) continue;
125 
126             while(IsConning(&channelMapDbConnInfo.DbConnInfo[i]))
127             {
128                 SIndex sIndex = { 0 };
129                 if(ReadIndex(&sIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumFail)
130                 {
131                     throw "對映檔案載入失敗";
132                 }
133 
134                 if(CanWrite(&sIndex))
135                 {
136                     WORD writeIndex = sIndex.WriteIndex;
137                     if(Write(data, writeIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumSuccess)
138                     {
139                         writeIndex++;
140                         if(writeIndex > FM_MAX_ROWS)
141                         {
142                             writeIndex = 0;
143 
144                             WORD Over = TRUE;
145                             WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
146                         }
147 
148                         rest = WriteIndex(&writeIndex, 0, FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
149                         break;
150                     }
151                 }
152                 else
153                 {
154                     failTimes++;
155                 }
156             }
157         }
158 
159         return rest;
160     }
161 
162     template<typename T>
163     void Subscribe(const char *channel, SubscribeCallBackHandle callback)
164     {
165         SDbConnInfo dbConnInfo = { 0 };
166         dbConnInfo.StructSize = sizeof(T);
167 
168         UINT subscribeIndex = 0;
169         if(SetDbConnInfo(channel, &subscribeIndex, &dbConnInfo) == enumFail)
170         {
171             printf_s("訂閱%s失敗.\n", channel);
172             return;
173         }
174 
175         while(IsConning(&dbConnInfo))
176         {
177             SetSubscribeConned(channel, subscribeIndex, &dbConnInfo);
178 
179             SIndex sIndex = { 0 };
180             if(ReadIndex(&sIndex, &dbConnInfo) == enumFail) throw "對映檔案載入失敗";
181             if(!CanRead(&sIndex)) continue;
182 
183             T t = { 0 };
184             int readIndex = sIndex.ReadIndex;
185             if(Read(&t, readIndex, &dbConnInfo) == enumSuccess)
186             {
187                 readIndex++;
188                 if(readIndex > FM_MAX_ROWS)
189                 {
190                     readIndex = 0;
191 
192                     WORD Over = FALSE;
193                     WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &dbConnInfo);
194                 }
195 
196                 if(WriteIndex(&readIndex, FM_WORD_SIZE, FM_WORD_SIZE, &dbConnInfo) == enumSuccess)
197                     if(Delete(sIndex.ReadIndex, &dbConnInfo) == enumSuccess)
198                         if(callback(&t) == enumBreak) break;
199             }
200         }
201     }
202 };
View Code

 請注意上邊控制讀寫的2個方法

	bool CanWrite(SIndex *sIndex)
	{
		int nextWriteIndex = sIndex->WriteIndex + 1;
		if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;

		return nextWriteIndex != sIndex->ReadIndex;
	}
	bool CanRead(SIndex *sIndex) 
	{
		if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
		else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
	}

我們可以分析一下,下一個WriteIndex值如果大於佇列最大值 WriteIndex置0,下一個WriteIndex數值如果不等於

正在讀的位置ReadIndex就能寫;如果WriteIndex沒有超出最大值,只要ReadIndex小於等於WriteIndex就能讀,

如果超出,就判斷ReadIndex大於WriteIndex就能讀。WriteIndex與ReadIndex數值在Publish與Subscribe中維護

5.3、建立新執行緒獲取最新訂閱的客戶端資訊,這個功能主要是動態地像多個Subscribe端發生訊息,比如訂閱發生在釋出之後,

也應該能收到訊息。

 1 void Update()
 2 {
 3     while(true)
 4     {
 5         if(fmdbConfig->GetChannelArray() == enumSuccess)
 6         {
 7             for(int i = 0; i < FM_MAX_CHANNEL; i++)
 8             {
 9                 if(fmdbConfig->IsFM_NOTHING(channelMapDbConnInfoArray[i].ChannelName)) continue;
10 
11                 for(int j = 0; j < FM_MAX_SUBSCRIBE; j++)
12                 {
13                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].StructSize <= 0) continue;
14 
15                     // KeepConned
16                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].Conned)
17                     {
18                         fmdbConfig->KeepConned(channelMapDbConnInfoArray[i].ChannelName,
19                             channelMapDbConnInfoArray[i].DbConnInfo[j].SubscribeIndex);
20 
21                         channelMapDbConnInfoArray[i].DbConnInfo[j].Conned = 0;
22                         //printf_s("%s.KeepConned.\n", channelDbParsArray[i].SDbPars[j].Channel);
23                     }
24 
25                     if(!fmdbConfig->Exists(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName))
26                     {
27                         FMDBManager::Create(&channelMapDbConnInfoArray[i].DbConnInfo[j]);
28                         fmdbConfig->AddChannel(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName);
29                     }
30                 }
31             }
32         }
33 
34         fmdbConfigLoadFinish = true;
35         Sleep(1000);
36     }
37 }
38 thread th(Update);
View Code

 六、Demo測試

6.1、Producer.cpp

 1 #include "pch.h"
 2 #include "../FMDB.h"
 3 
 4 using namespace std;
 5 
 6 int main()
 7 {
 8     FMClient * client = new FMClient();
 9 
10     int times = 0;
11     int index = 0;
12     int total = 0;
13     UINT structSize = sizeof(SPerson);
14     DWORD dwStartTmp = GetTickCount();
15 
16     while(TRUE)
17     {
18         times++;
19         if(index == 0)
20         {
21             dwStartTmp = GetTickCount();
22         }
23 
24         SPerson sPerson = { 0 };
25         sPerson.Idx = index;
26         sprintf_s(sPerson.Name, "Name.%d", index);
27         sPerson.Age = index;
28 
29         if(client->Publish("Person", &sPerson) == enumSuccess)
30         {
31             if(index % 2 == 0) total = total + sPerson.Idx;
32             else total = total - sPerson.Idx;
33 
34             index++;
35             if(index % 50000 == 0)
36                 printf_s("傳送條數: %d, 耗時:%d \n", index, (GetTickCount() - dwStartTmp));
37         }
38 
39         if(index >= 2000000) break;
40     }
41 
42     printf_s("呼叫次數: %d, 成功條數: %d, 檢驗值: %d \n", times, index, total);
43     system("pause");
44 }

6.2、Consumer.cpp

 1 #include "pch.h"
 2 #include "../FMDB.h"
 3 
 4 using namespace std;
 5 
 6 int index = 0;
 7 int total = 0;
 8 DWORD dwStartTmp = GetTickCount();
 9 
10 int SubscribeCallback(void *msg)
11 {
12     SPerson * person = (SPerson *)msg;
13 
14     if(index == 0)
15     {
16         dwStartTmp = GetTickCount();
17     }
18 
19     if(index % 2 == 0) total = total + person->Idx;
20     else total = total - person->Idx;
21 
22     index++;
23     if(index % 50000 == 0)
24     {
25         printf("接收條數: %d, 耗時:%d, Idx:%d, Name:%s, Age:%d\n",
26             index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);
27     }
28 
29     if(index >= 2000000)
30     {
31         return enumBreak;
32     }
33 
34     return enumSuccess;
35 };
36 
37 int main()
38 {
39     FMClient * client = new FMClient();
40     client->Subscribe<SPerson>("Person", SubscribeCallback);
41 
42     printf("接收條數: %d, 檢驗值: %d \n", index, total);
43     system("pause");
44 }

6.3、執行,測試用例中使用了向佇列傳送200萬條資料,訊息大小128位元組,訂閱端也是接受到200萬資料後退出,並且列印檢驗值。

1) 檢驗值計算:0+1-2+3-4+ ---------  - 2000000 = -1000000,如果佇列執行正常,那兩邊的檢驗值應該都是是 -1000000.

2) 每5萬條列印一次日誌,執行情況如下

一對一方式執行三次,分別耗時(毫秒):2886、2979、2871

3) 一對二方式執行三次,分別耗時(毫秒):4087、4009、4040

4)執行過程中產生的檔案

6.4、200萬資料一對一耗時近3秒,貌似也不是非常快是不是?但是這就是最大速度了嗎?

當然不是哦,別忘了這是debug版本,我們切換到release版本看速度會不會有所提升。

 

一對一執行三次耗時分別是:1224、1373、1326

厲害了,

SPerson結構體128位元組,每秒可以處理180萬資料,當然實際運用肯定達不到,因為處理其他業務邏輯也要耗時間。

好了,為了這個demo腦殼都想疼了,思考模型,除錯BUG,期間各種問題,實在茶壺煮餃子,有苦說不出。

你看,又浪費我週末2天時間,期間就吃了一餐,今天的還沒吃呢,等下去旁邊山上走走,不然就要發黴了。拜拜。。。