基於Netty的物聯網應用
物聯網是將無處不在(Ubiquitous)的末端裝置(Devices)和設施(Facilities),包括具備“內在智慧”的感測器、移動終端、工業系統、樓控系統、家庭智慧設施、視訊監控系統等、和“外在使能”(Enabled)的,如貼上RFID的各種資產(Assets)、攜帶無線終端的個人與車輛等等“智慧化物件或動物”或“智慧塵埃”(Mote),通過各種無線和/或有線的長距離和/或短距離通訊網路。這次我們要說的是智慧農業的一個專案。
本專案是基於區域網和即將到來的5G為資訊載體,以終端節點(EndNodes)、閘道器(Gateway)、雲伺服器(LoRaWAN Server)和客戶端(Client)組成。用於監測溫室,大棚等區域性環境變化。做到實時監控,提前預防。
先讓我們一起看一下ChannelPipeline對事件流的攔截和處理流程
每個ChannelHandler 被新增到ChannelPipeline 後,都會建立一個ChannelHandlerContext 並與之建立的ChannelHandler 關聯繫結。
在ChannelHandler 新增到 ChannelPipeline 時會建立一個例項,就是介面 ChannelHandlerContext,它代表了 ChannelHandler 和ChannelPipeline 之間的關聯。介面ChannelHandlerContext 主要是對通過同一個 ChannelPipeline 關聯的 ChannelHandler 之間的互動進行管理
那麼我就不多說了,直接開幹。
第一步:先啟動執行緒。ServerServletListener
publicvoid contextInitialized(ServletContextEvent arg0) {
Thread thread = new Thread(new Runnable() {
@Override
publicvoid run() {
try {
nc = NettyClient.getInstance();
nc.setRecvCallback(new RecvData());
nc.setSendCallback(new SendData());
nc.connect("127.0.0.1", 9001);
} catch (Exception e) {
nc.shutdown();
log.error("啟動Netty服務失敗:" + e);
}
}
});
thread.start();
}
TcpHandler負責與管道打交道,是整個專案的最底層,他繼承自ChannelInboundHandlerAdapter
是接收LoRa終端回傳資料最底層的類。
TcpHandler
//recvMessage方法接收從LoRa客戶端傳過來的引數
privatevoid recvMessage(ByteBuf buf) {
byte[] cbBuf = newbyte[buf.readableBytes()];
buf.readBytes(cbBuf);
logger.debug("硬體型別:" + cbBuf[0]);
switch (cbBuf[0]) {
case 3:
recvTHSensor(cbBuf);
break;
}
}
//cbBuf是包含了最原始的資料資訊
//這個類的主要作用是對原始資料包進行處理
privatevoid recvTHSensor(byte[] cbBuf) {
System.out.println("========TcpHandler==recvTHSensor==========");
intlength = cbBuf.length;
if (6 == length) {
MsgTHSensorStateNotify msg = new MsgTHSensorStateNotify();
booleanb = msg.Unpacking(cbBuf);
ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();
if ((b) && (callback != null))
callback.onTHSensorStateNotify(msg);
else
logger.info("解包出錯!");
}
elseif (10 == length) {
MsgTHSensorNotify msg = new MsgTHSensorNotify();
booleanb = msg.Unpacking(cbBuf);
ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();
if ((b) && (callback != null))
callback.onTHSensorNotify(msg);
else
logger.info("解包出錯!");
}
}
MsgTHSensorNotify
//這個類的主要作用是對原始資料包進行拆包處理處理,獲取溫度和溼度資訊
publicboolean Unpacking(byte[] data)
{
intlength = data.length;
if (10 != length) returnfalse;
this.humidity = (Byte.toString(data[(length - 4)]) + "." + Byte.toString(data[(length - 3)]));
this.temperature = (Byte.toString(data[(length - 2)]) + "." + Byte.toString(data[(length - 1)]));
returntrue;
}
RecvData
//收到感測器觸發資料,把MsgTHSensorNotify解析出來的資料存入到資料庫中
publicvoid onTHSensorNotify(MsgTHSensorNotify arg0) {
logger.debug(arg0.toString());
try {
IDataRecordSetDao idrs = new DataRecordSetDao();
DataRecordSet drs = new DataRecordSet();
drs.setTemperature(arg0.getTemperature());
System.out.println("======onTHSensorNotify中getTemperature的值為"+arg0.getTemperature()+"==========");
drs.setHumidity(arg0.getHumidity());
System.out.println("=============onTHSensorNotify中getHumidity的值為"+arg0.getHumidity()+"=====");
drs.setData_time((new Date()).getTime());
idrs.add(drs);
} catch (SQLException e) {
logger.warn("資料新增出錯"+e);
}
}
需要說明的一點是DataRecordSet是對MsgTHSensorNotify資料的封裝,他的作用是將MsgTHSensorNotify的資料進行進一步的封裝,以便資料庫查詢資料,接收資料,用在資料層。MsgTHSensorNotify是對TcpHandler傳過來的資料進行提取封裝的類。
DataRecordSetDao是資料庫操作類
IDataRecordSetDao
DataRecordSet是對MsgTHSensorNotify資料的封裝,他的作用是將MsgTHSensorNotify的資料進行進一步的封裝,以便資料庫查詢資料,接收資料,用在資料層。
@Override
publicvoid add(DataRecordSet drs) throws SQLException {
String sql = "insert into `data_record`(`temperature`,`humidity`,`dt`) values (?,?,?);";
PreparedStatement pstmt = connection.prepareStatement(sql);
pstmt.setString(1, drs.getTemperature());
pstmt.setString(2, drs.getHumidity());
pstmt.setLong(3, valueToLongs(drs.getData_time()));
System.out.println("===============插入資料庫==============");
pstmt.executeUpdate();
pstmt.close();
connection.close();
}
這裡必須強調一點的是drs.setData_time((new Date()).getTime());//設定的是毫秒數
他的數值已經很大了,細細一數已經到13位數了(1525513938762),這就涉及到儲存的問題了。首先說java部分吧。
publicvoid setData_time(longdata_time),set方法是long型別的引數,然後從Long的包裝類中valueOf(Long l)獲得啟發,便自己也寫了一個類似的方法,在網上很多人都在問如何儲存一個超過9位數的商品編號,那麼這就是一個很好的例子。
publicstatic Long valueToLongs(longl) {
returnnew Long(l);
}
資料庫部分你需要定義一個bigint型別的time值。
現在資料已經儲存到資料庫了,接下來我們在通過其他類來取出資料,更加客觀的展現給人們。
我們通過JSON傳過來請求引數"code":0,"device":"Web","expression":{"field":"","start_time":start,"end_time":end,"length":length},"signature":"LoRa"
建立相應的實體類來接收資料,要強調一點的是實體類屬性最好與AJAX傳過來的鍵一致。類似POJO。這點很重要,否則會發生意想不到的錯誤,而且還不好修改。具體原因請了解@ResponseBody和@RequestBody的匹配規則。
先來看看AJAX請求的資料。
Url部分一定要修改成自己的url地址
Spring是位於前端控制器部分,他負責請求轉發和資料處理。
@RequestMapping(value ="/getSensorRecord", method = {RequestMethod.POST }, produces = "application/json;charset=utf-8")
public@ResponseBody Map<String, Object> getSensorRecord(@RequestBody RecvJson recv) {
Map<String, Object> result = new HashMap<String, Object>();
if (null != recv) {
if (recv.getCode() == QueryCode.TemperatureHumidityCode && "LoRa".equals(recv.getSignature())) {
try {
//資料庫接收資料,查詢資料的類
IDataRecordSetDao drd = new DataRecordSetDao();
intlength = (0 != recv.getExpression().getLength())?recv.getExpression().getLength():20 ;
ArrayList<DataRecordSet> list = drd.query(recv.getExpression().getStart_time(),
recv.getExpression().getEnd_time(), length);
intlistLength = (list.size() > 20)?length:list.size();
for (inti = 0; i < listLength; i++) {
Map<String, Object> unit = new HashMap<String, Object>();
unit.put("id", i + 1);
if ("T".equals(recv.getExpression().getField().trim())) {
unit.put("temperature", list.get(i).getTemperature());
} elseif ("H".equals(recv.getExpression().getField().trim())) {
unit.put("humidity", list.get(i).getHumidity());
} else {
unit.put("temperature", list.get(i).getTemperature());
unit.put("humidity", list.get(i).getHumidity());
}
unit.put("time",
(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(list.get(i).getData_time())));
result.put(Integer.toString(i + 1), unit);
unit = null;
}
result.put("error", 0);
result.put("data", recv.getCode());
list = null;
drd = null;
} catch (Exception e) {
logger.error("查詢資料出錯:" + e);
}
} else {
// {"error":1,"data":"0","message":"code或signature錯誤!"}
result.put("error", 1);
result.put("data", recv.getCode());
result.put("message", "code或signature錯誤!");
}
}
returnresult;
}
這部分我來詳細解釋一下。
@RequestMapping
RequestMapping是一個用來處理請求地址對映的註解,可用於類或方法上。用於類上,表示類中的所有響應請求的方法都是以該地址作為父路徑。
RequestMapping註解有六個屬性,下面我們把她分成三類進行說明。
1、 value, method
value:指定請求的實際地址,指定的地址可以是URI Template 模式;
method:指定請求的method型別, GET、POST、PUT、DELETE等;
2、 consumes,produces;
consumes: 指定處理請求的提交內容型別(Content-Type),例如application/json, text/html;
produces: 指定返回的內容型別,僅當request請求頭中的(Accept)型別中包含該指定型別才返回;
3、 params,headers;
params: 指定request中必須包含某些引數值是,才讓該方法處理。
headers: 指定request中必須包含某些指定的header值,才能讓該方法處理請求。
Bat啟動類
入口函式IotHubServer,這個啟動類是由Netty框架實現的,適合有一定基礎的朋友,在這裡我是參考李林鋒編著的《Netty權威指南》當知識積累到一定程度,要學會看書,找資料,看論文,這是培養思維方式。很多同學有一個誤區,當有一個新技術出現的時候,如果腦海裡第一時間想到的是有沒有視訊,這就完了,出視訊的時候基本上是有人已經研究透這東西了,隨著時間和經驗的增長,要去當領跑者,而不是侷限於跟隨者,要提升自己的認知。當然,遇到問題最好先看原始碼,這樣提升很快。
首先讓我們看一下入口main函式:
publicstaticvoid main(String[] args) {
PropertyConfigurator.configure("config/log4j.properties");
System.out.println("===============IotHubServer==》》》main============================");
newIotHubServer(Port).run();
}
/**
*用於啟動服務端 IotHubChannelInitializer
*/
publicvoidrun() {
try {
IotHubChannelInitializer iothub = new IotHubChannelInitializer();
iothub.run(this.port);
System.out.println("=================run============================");
} catch (Exception e) {
logger.error("服務啟動失敗->" + e.getMessage());
}
}
IotHubChannelInitializer
需要說明的是,Netty協議通訊雙方鏈路建立成功之後,雙方可以進行全雙工通訊,無論客戶端還是服務端,都可以主動傳送訊息給對方,通訊方式可以是TWO WAY或者ONE WAY。雙方之間的心跳採用的是Ping-Pong機制,當鏈路處於空閒狀態時,客戶端主動傳送Ping訊息給服務端,服務端接收到訊息後傳送應答訊息Pong給客戶端。
如果客戶端連續傳送N條Ping訊息都沒有收到服務端返回的Pong訊息,說明鏈路已經掛死或者雙方處於異常狀態,客戶端主動關閉連線,間隔週期T後發起重連操作,直到重連成功。
publicvoidrun(intport) throws Exception {
//配置伺服器端的NIO執行緒組,接受客戶端的連線、TCP資料的讀寫
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(newIotHubChannelInitializer());
bootstrap<