基於TableStore的物聯網元數據管理
背景
常見的企業級無線接入方案有兩種,分別被稱作廋AP和胖AP。瘦AP(AC+AP)架構為比較傳統的企業級無線接入方案,主要優點就是漫遊體驗好,但是AC宕機的話會導致所屬的AP全部無法工作。對於大型的辦公場所,漫遊的需求相對較弱,新型的胖AP(無AC,不會因為AC宕機導致網絡不可用)+ 雲端控制器架構成為了新興的一種企業無線接入方案,運維人員通過雲端對AP進行監控與管理。
某公司擁有無線AP約10,000臺、接入終端(STA)100,000個。設備以一定的周期上報其狀態到雲端,雲端將監控數據持久化後供用戶查看。
業務描述
每個AP設備會以10s的周期上報其當前狀態,上報數據格式為json,其格式如下:
AP狀態:
{
"ap_mac": "11:22:33:86:D9:E8", // AP WAN口MAC地址,AP設備唯一標識
"report_time": 1532315501985, // 上報時間戳,毫秒
"on_time": 1531417181972, // 設備上線時間戳,毫秒
"sta_cnt": 2, // 終端數量
"memory_usage": 38, // CPU使用率
"wan_recv_speed": 280, // WAN口下行速率 單位bps
"wan_sent_speed": 45348, // WAN口上行速率 單位bps
}
需求以及架構選型
需求
通過MAC地址查看每個AP最新的狀態。
用戶需要在管理系統上對基於各種條件對設備進行查詢。
需要對AP的各種指標進行排序,以便找出故障設備。
我們將上述的需求分為兩種:
排序。
基於這兩大類需求,我們給出如下的架構選型比較。
架構選型
針對這種IOT場景的設備狀態監控數據,下面針對幾種常見方案做比較。
MySQL
將設備上報的狀態數據直接寫入MySQL,並使用MySQL自帶的查詢、排序語句對數據進行分析,這種架構最為簡單,用戶運維成本較低。
這種架構僅僅適用於小規模量的數據,在大規模數據的情況下,MySQL的內部架構也導致了無法創建出一種萬用的索引來滿足多維查詢的需求。並且MySQL底層使用的是B+數作為存儲結構,會有隨機寫的問題,寫入性能較差。
MySQL在使用前必須指定表結構,也就是說後續新增需求的話,必須要修改表結構,在數據量較大的情況下修改表結構很容易造成鎖表導致線上故障。
MySQL + 自建Elasticsearch
由於MySQL的檢索能力較弱,MySQL + Elasticsearch也是業界比較常見的方案。用戶將數據寫入MySQL,並使用binlog訂閱工具(如canal)將數據異步寫入Elasticsearch,架構如下圖所示:
其中Canal Client需要用戶自己編寫與部署。相比單MySQL的架構,該方案很好地解決了MySQL在多維查詢和指定列排序能力弱的問題。但是帶了的問題也比較多:
Canal與Elasticsearch需要用戶自己部署,帶來的運維成本也相對提升。
Canal Client側負責讀取Canal傳輸過來MySQL增量改變數據,數據的一致性是需要用戶自己保證的。
使用表格存儲的SearchIndex功能
表格存儲底層存儲使用的LSM模型,很好地解決了MySQL寫入性能差的問題,特別適合IOT這種寫多讀少的場景。
用戶將數據寫入表格存儲後系統內部會將數據異步同步到SearchIndex,數據寫入TableStore到數據可查約有毫秒到秒級別的延遲,用戶無需關註運維相關的問題,數據一致性也有系統內部保證,做到了開箱即用。
結論
基於上面的比較,表格存儲更適合存儲AP的狀態數據,並且通過SearchIndex可以很容易地完成多維查詢以及排序。簡明的系統整體架構如下圖所示:
表結構設計
表格存儲底層使用主鍵的第一列將數據均分到對應的分區上,以達到負載均衡的目的。我們知道MAC地址的前3個字節為廠商碼,也就是說如果同一個廠家生產出來的設備MAC地址前3個字節大多會是相同的,如果直接使用MAC地址做主鍵的話可能會導致數據熱點,所以我們推薦對MAC地址做MD5之後做第一列主鍵。關於表結構設計的最佳實踐詳見這裏。
最新狀態數據
AP狀態
表名:wifi_ap_status
列類型 列名 類型 示例 備註
主鍵列 pk0 String 1b5de627b4a25553baf1f72af9afb96d MD5(ap_mac),對ap_mac做MD5
值列 ap_mac String 11:22:33:44:55:66 AP MAC地址
report_time Integer? 1537363646533 UTC時間戳,毫秒
on_time I?nteger 1537363646533 同上
sta_cnt I?nteger 10 所連接終端數
cpu_usage I?nteger 20 CPU使用率
memory_usage I?nteger 50 內存使用率
wan_recv_speed I?nteger 817 收數據速率,單位bps
wan_sent_speed I?nteger 2411 發數據速率,單位bps
代碼示例
下面將以AP狀態作為例子,給出全流程的代碼示例。
初始化
創建TableStore client
SyncClient syncClient = new SyncClient(
"$endpoint",
"$accessKeyId",
"$accessKeySecret",
"$instanceName"
);
SyncClient對象為線程安全,如果使用Spring的話可以將其作為一個單例Bean註入到其他對象中使用
創建TableStore表
表的創建可以在控制臺上完成,也可以通過SDK完成,如果使用SDK的話代碼示例如下
創建AP狀態表
// 指定表名
TableMeta tableMeta = new TableMeta("wifi_ap_status");
// 指定主鍵列,根據上面的表結構設計,這邊只有pk0一個主鍵列
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("pk0", PrimaryKeyType.STRING));
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta, new TableOptions(-1, 1));
syncClient.createTable(createTableRequest);
創建SearchIndex
與創建表相同,SearchIndex的創建可以通過控制臺完成,如果使用SDK的話,示例如下:
創建AP狀態SearchIndex
CreateSearchIndexRequest createSearchIndexRequest = new CreateSearchIndexRequest();
createSearchIndexRequest.setIndexName("wifi_ap_status");
createSearchIndexRequest.setTableName("wifi_ap_status");
IndexSchema indexSchema = new IndexSchema();
indexSchema.setIndexSetting(new IndexSetting(5));
indexSchema.setFieldSchemas(Arrays.asList(
new FieldSchema("ap_mac", FieldType.TEXT).setIndex(true), // 可搜索
new FieldSchema("report_time", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true), // 可搜索並可排序
new FieldSchema("sta_cnt", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("cpu_usage", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true),
new FieldSchema("memory_usage", FieldType.LONG).setIndex(true).setEnableSortAndAgg(true)
));
createSearchIndexRequest.setIndexSchema(indexSchema);
CreateSearchIndexResponse resp = syncClient.createSearchIndex(createSearchIndexRequest);
數據寫入
用戶只需使用原表格存儲的寫入功能將數據寫入即可,表格存儲內部會自動將數據導入SearchIndex,無需關心內部實現。
PutRowRequest putRowRequest = new PutRowRequest();
RowPutChange rowPutChange = new RowPutChange("wifi_ap_status");
String apMac = "11:22:33:86:D9:E8";
// 通過AP MAC計算MD5,防止產生數據熱點,這邊使用了apache的commons-codec庫
String pk0 = DigestUtils.md5Hex(apMac);
PrimaryKey pk = new PrimaryKey(new PrimaryKeyColumn[]{
new PrimaryKeyColumn("pk0", PrimaryKeyValue.fromString(pk0))
});
rowPutChange.setPrimaryKey(pk);
rowPutChange.addColumns(new Column[]{
new Column("ap_mac", ColumnValue.fromString(apMac)),
new Column("report_time", ColumnValue.fromLong(System.currentTimeMillis())),
new Column("on_time", ColumnValue.fromLong(System.currentTimeMillis())),
new Column("cpu_usage", ColumnValue.fromLong(56)),
new Column("sta_cnt", ColumnValue.fromLong(4)),
new Column("memory_usage", ColumnValue.fromLong(43)),
new Column("wan_recv_speed", ColumnValue.fromLong(280)),
new Column("wan_sent_speed", ColumnValue.fromLong(45348)),
});
putRowRequest.setRowChange(rowPutChange);
syncClient.putRow(putRowRequest);
數據讀取
數據讀取分為兩種:
1.基於原生的表格存儲的主鍵獲取
2.基於SearchIndex功能獲取
下面對於這兩種不通模式的讀取分別舉例說明
通過主鍵讀取
通過主鍵獲取AP狀態的話是直接從表格存儲的表中直接獲取的。也就是說,在通過主鍵獲取數據的時候是不需要通過SearchIndex功能的,代碼示例如下:
GetRowRequest getRowRequest = new GetRowRequest();
String apMac = "11:22:33:86:D9:E8";
// 通過AP MAC計算MD5,防止產生數據熱點,這邊使用了apache的commons-codec庫
String pk0 = DigestUtils.md5Hex(apMac);
// 設置主鍵
PrimaryKey pk = new PrimaryKey(new PrimaryKeyColumn[]{
new PrimaryKeyColumn("pk0", PrimaryKeyValue.fromString(pk0))
});
SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria("wifi_ap_status", pk);
singleRowQueryCriteria.setMaxVersions(1);
getRowRequest.setRowQueryCriteria(singleRowQueryCriteria);
GetRowResponse rowResponse = syncClient.getRow(getRowRequest);
Row row = rowResponse.getRow();
// 獲取主鍵列
PrimaryKey primaryKey = row.getPrimaryKey();
for (PrimaryKeyColumn primaryKeyColumn : primaryKey.getPrimaryKeyColumns()) {
System.out.println("PrimaryKeyColumn:(" + primaryKeyColumn.getName() + ":" + primaryKeyColumn.getValue() + ")");
}
// 獲取值列
for (Column column : row.getColumns()) {
System.out.println("Column:(" + column.getName() + ":" + column.getValue() + ")");
}
通過SearchIndex功能讀取
為了方便描述,下面通過SQL(僅為表示具體需求,SearchIndex暫不支持SQL語句)+代碼的形式給出示例來描述我們的場景。
多維查詢
如果需要通過非主鍵列進行多維查詢,我們可以使用syncClient的search方法,在上面的例子中,我們為wifi_ap_status表創建了SearchIndex,並且指定了索引列。
如果要實現下面的SQL:
SELECT
*
FROM wifi_ap_status
WHERE ap_mac LIKE ‘%86:D9:E8%‘ AND sta_cnt >= 2
用java語言實現的話,代碼如下
SearchQuery searchQuery = new SearchQuery();
// 使用BoolQuery來實現組合條件查詢,本例搜索了ap_mac包含86:D9:E8並且sta_cnt大於等於2的數據
BoolQuery query = new BoolQuery();
// 使用短語搜索模糊匹配ap_mac
MatchPhraseQuery macQuery = new MatchPhraseQuery();
macQuery.setFieldName("ap_mac");
macQuery.setText("86:D9:E8");
// 使用範圍查詢sta_cnt
RangeQuery staCntQuery = new RangeQuery();
staCntQuery.setFieldName("sta_cnt");
staCntQuery.setFrom(ColumnValue.fromLong(2), true);
query.setMustQueries(Arrays.asList(
macQuery,
staCntQuery
));
searchQuery.setQuery(query);
// 構建搜索請求
SearchRequest searchRequest = new SearchRequest(
"wifi_ap_status", // 表格存儲表名
"wifi_ap_status", // SearchIndex索引名
searchQuery
);
// 設置需要返回的表列
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
// 設置返回所有列
columnsToGet.setReturnAll(true);
searchRequest.setColumnsToGet(columnsToGet);
// 搜索請求
SearchResponse searchResponse = syncClient.search(searchRequest);
List<Row> rows = searchResponse.getRows();
for (Row row : rows) {
PrimaryKey primaryKey = row.getPrimaryKey();
for (PrimaryKeyColumn primaryKeyColumn : primaryKey.getPrimaryKeyColumns()) {
System.out.println("PrimaryKeyColumn:(" + primaryKeyColumn.getName() + ":" + primaryKeyColumn.getValue() + ")");
}
for (Column column : row.getColumns()) {
System.out.println("Column:(" + column.getName() + ":" + column.getValue() + ")");
}
}
排序
排序功能也是我們的常見需求,比如我們需要查看在某個條件下掛載終端數最多的幾個AP,如果用SQL語句描述的話如下:
SELECT
*
FROM wifi_ap_status
WHERE ap_mac LIKE ‘%11:22:33%‘
ORDER BY sta_cnt DESC
如果用代碼表示的話,如下:
SearchQuery searchQuery = new SearchQuery();
// 使用短語搜索模糊匹配ap_mac
MatchPhraseQuery macQuery = new MatchPhraseQuery();
macQuery.setFieldName("ap_mac");
macQuery.setText("11:22:33");
searchQuery.setQuery(macQuery);
// 排序選項,sta_cnt降序
FieldSort staCntSorter = new FieldSort("sta_cnt");
staCntSorter.setOrder(SortOrder.DESC);
searchQuery.setSort(new Sort(Collections.singletonList(
staCntSorter
)));
// 構建搜索請求
SearchRequest searchRequest = new SearchRequest(
"wifi_ap_status",
"wifi_ap_status",
searchQuery
);
// 設置需要返回的表列
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
// 設置返回所有列
columnsToGet.setReturnAll(true);
searchRequest.setColumnsToGet(columnsToGet);
// 搜索請求
SearchResponse searchResponse = syncClient.search(searchRequest);
List<Row> rows = searchResponse.getRows();
原文鏈接
本文為雲棲社區原創內容,未經允許不得轉載。
基於TableStore的物聯網元數據管理