1. 程式人生 > >基於Netty的物聯網應用

基於Netty的物聯網應用

物聯網是將無處不在(Ubiquitous)的末端裝置(Devices)和設施(Facilities),包括具備“內在智慧”的感測器、移動終端、工業系統、樓控系統、家庭智慧設施、視訊監控系統等、和“外在使能”(Enabled)的,如貼上RFID的各種資產(Assets)、攜帶無線終端的個人與車輛等等“智慧化物件或動物”或“智慧塵埃”(Mote),通過各種無線和/或有線的長距離和/或短距離通訊網路。這次我們要說的是智慧農業的一個專案。

本專案是基於區域網和即將到來的5G為資訊載體,以終端節點(EndNodes)、閘道器(Gateway)、雲伺服器(LoRaWAN Server)和客戶端(Client)組成。用於監測溫室,大棚等區域性環境變化。做到實時監控,提前預防。

先讓我們一起看一下ChannelPipeline對事件流的攔截和處理流程

每個ChannelHandler 被新增到ChannelPipeline 後,都會建立一個ChannelHandlerContext 並與之建立的ChannelHandler 關聯繫結。

在ChannelHandler 新增到 ChannelPipeline 時會建立一個例項,就是介面 ChannelHandlerContext,它代表了 ChannelHandler 和ChannelPipeline 之間的關聯。介面ChannelHandlerContext 主要是對通過同一個 ChannelPipeline 關聯的 ChannelHandler 之間的互動進行管理

那麼我就不多說了,直接開幹。

第一步:先啟動執行緒。ServerServletListener

publicvoid contextInitialized(ServletContextEvent arg0) {

Thread thread = new Thread(new Runnable() {

@Override

publicvoid run() {

try {

nc = NettyClient.getInstance();

nc.setRecvCallback(new RecvData());

nc.setSendCallback(new SendData());

nc.connect("127.0.0.1", 9001);

} catch (Exception e) {

nc.shutdown();

log.error("啟動Netty服務失敗:" + e);

}

}

});

thread.start();

}

TcpHandler負責與管道打交道,是整個專案的最底層,他繼承自ChannelInboundHandlerAdapter

是接收LoRa終端回傳資料最底層的類。

TcpHandler

//recvMessage方法接收從LoRa客戶端傳過來的引數

privatevoid recvMessage(ByteBuf buf) {

byte[] cbBuf = newbyte[buf.readableBytes()];

buf.readBytes(cbBuf);

logger.debug("硬體型別:" + cbBuf[0]);

switch (cbBuf[0]) {

case 3:

       recvTHSensor(cbBuf);

break;

     }

   }

//cbBuf是包含了最原始的資料資訊

//這個類的主要作用是對原始資料包進行處理

privatevoid recvTHSensor(byte[] cbBuf) {

System.out.println("========TcpHandler==recvTHSensor==========");

intlength = cbBuf.length;

if (6 == length) {

       MsgTHSensorStateNotify msg = new MsgTHSensorStateNotify();

booleanb = msg.Unpacking(cbBuf);

       ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();

if ((b) && (callback != null))

callback.onTHSensorStateNotify(msg);

else

logger.info("解包出錯!");

     }

elseif (10 == length) {

       MsgTHSensorNotify msg = new MsgTHSensorNotify();

booleanb = msg.Unpacking(cbBuf);

       ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();

if ((b) && (callback != null))

callback.onTHSensorNotify(msg);

else

logger.info("解包出錯!");

     }  

}

MsgTHSensorNotify

//這個類的主要作用是對原始資料包進行拆包處理處理獲取溫度和溼度資訊

publicboolean Unpacking(byte[] data)

   {

intlength = data.length;

if (10 != length) returnfalse;

this.humidity = (Byte.toString(data[(length - 4)]) + "." + Byte.toString(data[(length - 3)]));

this.temperature = (Byte.toString(data[(length - 2)]) + "." + Byte.toString(data[(length - 1)]));

returntrue;

   }

RecvData

//收到感測器觸發資料MsgTHSensorNotify解析出來的資料存入到資料庫中

publicvoid onTHSensorNotify(MsgTHSensorNotify arg0) {

logger.debug(arg0.toString());

try {

IDataRecordSetDao idrs = new DataRecordSetDao();

DataRecordSet drs = new DataRecordSet();

drs.setTemperature(arg0.getTemperature());

System.out.println("======onTHSensorNotifygetTemperature的值為"+arg0.getTemperature()+"==========");

drs.setHumidity(arg0.getHumidity());

System.out.println("=============onTHSensorNotifygetHumidity的值為"+arg0.getHumidity()+"=====");

drs.setData_time((new Date()).getTime());

idrs.add(drs);

} catch (SQLException e) {

logger.warn("資料新增出錯"+e);

}

}

需要說明的一點是DataRecordSet是對MsgTHSensorNotify資料的封裝,他的作用是將MsgTHSensorNotify的資料進行進一步的封裝,以便資料庫查詢資料接收資料用在資料層。MsgTHSensorNotify是對TcpHandler傳過來的資料進行提取封裝的類。

DataRecordSetDao是資料庫操作類

IDataRecordSetDao

DataRecordSet是對MsgTHSensorNotify資料的封裝,他的作用是將MsgTHSensorNotify的資料進行進一步的封裝,以便資料庫查詢資料接收資料用在資料層。

@Override

publicvoid add(DataRecordSet drs) throws SQLException {

String sql = "insert into `data_record`(`temperature`,`humidity`,`dt`) values (?,?,?);";

PreparedStatement pstmt = connection.prepareStatement(sql);

pstmt.setString(1, drs.getTemperature());

pstmt.setString(2, drs.getHumidity());

pstmt.setLong(3, valueToLongs(drs.getData_time()));

System.out.println("===============插入資料庫==============");

pstmt.executeUpdate();

pstmt.close();

connection.close();

}

這裡必須強調一點的是drs.setData_time((new Date()).getTime());//設定的是毫秒數

他的數值已經很大了,細細一數已經到13位數了(1525513938762),這就涉及到儲存的問題了。首先說java部分吧。

publicvoid setData_time(longdata_time)set方法是long型別的引數,然後從Long的包裝類中valueOf(Long l)獲得啟發,便自己也寫了一個類似的方法,在網上很多人都在問如何儲存一個超過9位數的商品編號,那麼這就是一個很好的例子。

publicstatic Long valueToLongs(longl) {

returnnew Long(l);

}

資料庫部分你需要定義一個bigint型別的time值。

現在資料已經儲存到資料庫了,接下來我們在通過其他類來取出資料,更加客觀的展現給人們。

我們通過JSON傳過來請求引數"code":0,"device":"Web","expression":{"field":"","start_time":start,"end_time":end,"length":length},"signature":"LoRa"

建立相應的實體類來接收資料,要強調一點的是實體類屬性最好與AJAX傳過來的鍵一致。類似POJO。這點很重要,否則會發生意想不到的錯誤,而且還不好修改。具體原因請了解@ResponseBody@RequestBody的匹配規則。

先來看看AJAX請求的資料。

Url部分一定要修改成自己的url地址

Spring是位於前端控制器部分,他負責請求轉發和資料處理。

@RequestMapping(value ="/getSensorRecord", method = {RequestMethod.POST }, produces = "application/json;charset=utf-8")

public@ResponseBody Map<String, Object> getSensorRecord(@RequestBody RecvJson recv) {

Map<String, Object> result = new HashMap<String, Object>();

if (null != recv) {

if (recv.getCode() == QueryCode.TemperatureHumidityCode && "LoRa".equals(recv.getSignature())) {

try {

//資料庫接收資料,查詢資料的類

IDataRecordSetDao drd = new DataRecordSetDao();

intlength = (0 != recv.getExpression().getLength())?recv.getExpression().getLength():20 ;

ArrayList<DataRecordSet> list = drd.query(recv.getExpression().getStart_time(),

recv.getExpression().getEnd_time(), length);

intlistLength = (list.size() > 20)?length:list.size();

for (inti = 0; i < listLength; i++) {

Map<String, Object> unit = new HashMap<String, Object>();

unit.put("id", i + 1);

if ("T".equals(recv.getExpression().getField().trim())) {

unit.put("temperature", list.get(i).getTemperature());

} elseif ("H".equals(recv.getExpression().getField().trim())) {

unit.put("humidity", list.get(i).getHumidity());

} else {

unit.put("temperature", list.get(i).getTemperature());

unit.put("humidity", list.get(i).getHumidity());

}

unit.put("time",

(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(list.get(i).getData_time())));

result.put(Integer.toString(i + 1), unit);

unit = null;

}

result.put("error", 0);

result.put("data", recv.getCode());

list = null;

drd = null;

} catch (Exception e) {

logger.error("查詢資料出錯:" + e);

}

} else {

// {"error":1,"data":"0","message":"codesignature錯誤!"}

result.put("error", 1);

result.put("data", recv.getCode());

result.put("message", "codesignature錯誤!");

}

}

returnresult;

}

這部分我來詳細解釋一下。

@RequestMapping

RequestMapping是一個用來處理請求地址對映的註解,可用於類或方法上。用於類上,表示類中的所有響應請求的方法都是以該地址作為父路徑。

RequestMapping註解有六個屬性,下面我們把她分成三類進行說明。

1valuemethod

value指定請求的實際地址,指定的地址可以是URI Template 模式;

method指定請求的method型別, GETPOSTPUTDELETE等;

2consumesproduces

consumes: 指定處理請求的提交內容型別(Content-Type),例如application/json, text/html;

produces:    指定返回的內容型別,僅當request請求頭中的(Accept)型別中包含該指定型別才返回;

3paramsheaders

params: 指定request中必須包含某些引數值是,才讓該方法處理。

headers: 指定request中必須包含某些指定的header值,才能讓該方法處理請求。

Bat啟動類

入口函式IotHubServer,這個啟動類是由Netty框架實現的,適合有一定基礎的朋友,在這裡我是參考李林鋒編著的《Netty權威指南》當知識積累到一定程度,要學會看書,找資料,看論文,這是培養思維方式。很多同學有一個誤區,當有一個新技術出現的時候,如果腦海裡第一時間想到的是有沒有視訊,這就完了,出視訊的時候基本上是有人已經研究透這東西了,隨著時間和經驗的增長,要去當領跑者,而不是侷限於跟隨者,要提升自己的認知。當然,遇到問題最好先看原始碼,這樣提升很快。

首先讓我們看一下入口main函式:

publicstaticvoid main(String[] args) {

PropertyConfigurator.configure("config/log4j.properties");

System.out.println("===============IotHubServer==》》》main============================");

newIotHubServer(Port).run();

}

/**

 *用於啟動服務端 IotHubChannelInitializer

 */

publicvoidrun() {

try {

IotHubChannelInitializer iothub = new IotHubChannelInitializer();

iothub.run(this.port);

System.out.println("=================run============================");

} catch (Exception e) {

logger.error("服務啟動失敗->" + e.getMessage());

}

}

IotHubChannelInitializer

需要說明的是,Netty協議通訊雙方鏈路建立成功之後,雙方可以進行全雙工通訊,無論客戶端還是服務端,都可以主動傳送訊息給對方,通訊方式可以是TWO WAY或者ONE WAY。雙方之間的心跳採用的是Ping-Pong機制,當鏈路處於空閒狀態時,客戶端主動傳送Ping訊息給服務端,服務端接收到訊息後傳送應答訊息Pong給客戶端。

如果客戶端連續傳送NPing訊息都沒有收到服務端返回的Pong訊息,說明鏈路已經掛死或者雙方處於異常狀態,客戶端主動關閉連線,間隔週期T後發起重連操作,直到重連成功。

publicvoidrun(intport) throws Exception {

//配置伺服器端的NIO執行緒組,接受客戶端的連線、TCP資料的讀寫

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup);

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.childHandler(newIotHubChannelInitializer()); 

bootstrap<