1. 程式人生 > >解讀Paho MQTT原始碼

解讀Paho MQTT原始碼

這兩天要重點突破一下MQTT的東西, 找到了它的原始碼,解讀一下,作為下一步優化的路標。 Paho是基於socket開做的,本質上還是維持一個長socket。 以TCP socket為例:(org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule) 發起連線 SocketAddress sockaddr = new InetSocketAddress(host, port); socket = factory.createSocket(); socket.connect(sockaddr, conTimeout*1000); //傳出 接收訊息 socket.getInputStream();
//傳入 心跳、釋出訊息 socket.getOutputStream(); 中斷連線 if (socket != null) {     socket.close(); } 這麼簡單的程式碼怎麼搭起一個資訊管理中心呢? 那怎麼知道socket什麼時候斷呢?這時候就不得不提到一個介面: public interface MqttCallback {     public void connectionLost(Throwable cause);     public void messageArrived(String topic, MqttMessage message) throws Exception;
    public void deliveryComplete(IMqttDeliveryToken token); } “connectionLost”  好在connectionLost 只有一個入口:org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost 這個入口也只有一次呼叫:org.eclipse.paho.client.mqttv3.internal.CommsCallback.shutdownConnection 找了一下呼叫,發現就是在“每個訊息接受/發出”時(即getInputStream、getOutputStream 讀寫處),檢查Exception,有Exception就提示客戶端中斷連線。
由這樣的設計可以知道,這個函式connectionLost 的最大誤差在一個“心跳週期”。 (當然可以通過各種 “優化手段” 去優化,不過最惡劣的情況就是這樣了。) 基於getInputStream、getOutputStream,Paho封裝了兩個方法:CommsReceiver 、CommsSender。很直觀的名字,裡面也很大方的開了兩條執行緒,以CommsSender為例子: 執行緒在這裡開啟:(發現這個庫很多地方都是這樣開執行緒) public void start(String threadName) {     synchronized (lifecycle) {         if (!running) {             running = true;             sendThread = new Thread(this, threadName);             sendThread.start();         }     } } 然後這樣做: out = new MqttOutputStream(clientState, OutputStream ); public void run() {     while (running && (out != null)) {         message = clientState.get();         out.write(message);         out.flush();     } } 簡單的說 就是不斷的從clientState裡面拿資訊,往socket裡面送。其實也不是“不斷”,在clientState.get()裡面有鎖,一個訊息解一次鎖,解開了就發一次,不然就在clientState.get死迴圈等 鎖,那怎麼等呢? protected MqttWireMessage get() throws MqttException {     synchronized (queueLock) {         while (result == null) {             queueLock.wait();             result = (MqttWireMessage)pendingMessages.elementAt(0);         }     } } 看到queueLock.wait();就是在這裡 等,pendingMessages就是訊息佇列。要找什麼時候發訊息?就找什麼時候這個鎖被解開了。 寫了這個多,其實也就想問一個簡單的問題,怎麼發起一次心跳? 最直接就找名字,找到了這個介面 org.eclipse.paho.client.mqttv3.MqttPingSender。 這個介面在庫裡面只看到一個實現org.eclipse.paho.client.mqttv3.TimerPingSender。 注意到裡面有這樣的程式碼: public void schedule(long delayInMilliseconds) {     timer.schedule(new PingTask(), delayInMilliseconds);        } private class PingTask extends TimerTask {     public void run() {         comms.checkForActivity();                } } checkForActivity進去,又包了一層, public MqttToken checkForActivity(){     MqttToken token = null;     token = clientState.checkForActivity();     return token; } 再進去: public MqttToken checkForActivity() throws MqttException {     ...     pingSender.schedule(nextPingTime);     return token; } 看到pingSender.schedule,這裡定時,準備做下一次心跳。 這個是維持心跳的方法。 直到後面我看了Paho給的android的例子,才知道原來MqttPingSender這樣做是為了易於擴充套件。 org.eclipse.paho.android.service.AlarmPingSender @Override public void schedule(long delayInMilliseconds) {     long nextAlarmInMilliseconds = System.currentTimeMillis()             + delayInMilliseconds;     AlarmManager alarmManager = (AlarmManager) service             .getSystemService(Service.ALARM_SERVICE);     alarmManager.set(AlarmManager.RTC_WAKEUP, nextAlarmInMilliseconds,             pendingIntent); } class AlarmReceiver extends BroadcastReceiver {     @Override     public void onReceive(Context context, Intent intent) {         ...     } } 在android裡面,休眠後Timer無效,只能用這種迂迴的方式去定時,也是因為有這種迂迴的方式,Paho庫才把傳送Ping的方法收得這麼深。 也是因為這麼深,很多之前用IBM的MQTT庫的人,比如我,才不得不去看看具體的程式碼怎麼跑。其實上面還有一個問題,就是AlarmManager定時在小米系統上無效,被定義為300S對齊喚醒,程式碼中貌似沒有看到對“這種行為”的處理。 有一個小問題,不知道是不是設計上的遺漏,org.eclipse.paho.client.mqttv3.internal.ClientComms,ClientComms是internal的方法,居然可以在外面使用。 回到問題原點,怎麼發起一次心跳?在上面的分析中,看到checkForActivity好像有點痕跡,checkForActivity的註釋裡面也有寫明。(Check and send a ping if needed and check for ping timeout)簡化那一堆程式碼,簡單的就是這樣做: public MqttToken checkForActivity() throws MqttException {     pendingFlows.insertElementAt(pingCommand, 0);     notifyQueueLock(); //Wake sender thread since it may be in wait state (in ClientState.get()) 這裡解鎖     return token; } 心跳包怎麼做呢? //org.eclipse.paho.client.mqttv3.internal.CommsSender public void run() {     MqttWireMessage message = clientState.get();     out.write(message);     out.flush(); } //MqttOutputStream public void write(MqttWireMessage message) throws IOException, MqttException {     final String methodName = "write";     byte[] bytes = message.getHeader();     byte[] pl = message.getPayload();     out.write(bytes,0,bytes.length);     clientState.notifySentBytes(bytes.length);     int offset = 0;     int chunckSize = 1024;     while (offset < pl.length) {         int length = Math.min(chunckSize, pl.length - offset);         out.write(pl, offset, length);         offset += chunckSize;         clientState.notifySentBytes(length);     }        } //MqttWireMessage public byte[] getHeader() throws MqttException {     try {         int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);         byte[] varHeader = getVariableHeader();         int remLen = varHeader.length + getPayload().length;         ByteArrayOutputStream baos = new ByteArrayOutputStream();         DataOutputStream dos = new DataOutputStream(baos);         dos.writeByte(first);         dos.write(encodeMBI(remLen));         dos.write(varHeader);         dos.flush();         return baos.toByteArray();     } catch(IOException ioe) {         throw new MqttException(ioe);     } } //MqttPingReq public class MqttPingReq extends MqttWireMessage {     protected byte[] getVariableHeader() throws MqttException {         return new byte[0];     } } 皮很厚,MqttWireMessage getHeader中,一共有兩個位元組(不知道有沒有數錯...) 第一部分有Type和Message組成,ping的type是12,資訊長度是0,記作first(頭部),兩個位元組,回頭看看協議:
bit 7 6 5 4 3 2 1 0
byte 1 Message Type DUP flag QoS level RETAIN
byte 2 Remaining Length
怎麼收到訊息? public void run() {     final String methodName = "run";     MqttToken token = null;     while (running && (in != null)) {         try {             receiving = in.available() > 0;             MqttWireMessage message = in.readMqttWireMessage();             receiving = false;             if (message instanceof MqttAck) {                 token = tokenStore.getToken(message);                 if (token!=null) {                     synchronized (token) {                         clientState.notifyReceivedAck((MqttAck)message);                     }                 } else {                     throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);                 }             } else {                 clientState.notifyReceivedMsg(message);             }         }finally {             receiving = false;         }     } } 意思大概就是不斷的迴圈讀InputStream的資料流。這裡有一個問題,休眠了,整個android世界是停止工作了。執行緒什麼的都會被掛起。 摘要就是:通訊協議棧運行於BP,一旦收到資料包,BP會將AP喚醒,喚醒的時間足夠AP執行程式碼完成對收到的資料包的處理過程。 就是有資料包來的時候,BP會喚醒CPU,CPU起來幹活之後就接著迴圈讀,就讀到推送送過來的新鮮的訊息了。BP耗電低於AP的1/10,類似收到簡訊、電話都是由BP進行監控。 讀了一輪,有幾點收穫: 1、重新認識了socket。 2、對MQTT PING的認知多了一點。 3、維持一條執行緒的開關,以及Paho的封裝方式都挺有意思的。 4、通過原始碼,理解的MQTT的部分效能底線。例如,以前覺得MQTT lostconnect應該是準的,看完程式碼才知道原來還有一個心跳週期的誤差,不是設計沒做好,而是“設計+實際情況”使然。 參考實現:(官網) git clone https://git.eclipse.org/r/paho/org.eclipse.paho.apps 轉載請保留本文地址-http://blog.csdn.net/yeshennet/article/details/50708115