解讀Paho MQTT原始碼
阿新 • • 發佈:2018-12-30
這兩天要重點突破一下MQTT的東西, 找到了它的原始碼,解讀一下,作為下一步優化的路標。
Paho是基於socket開做的,本質上還是維持一個長socket。
以TCP socket為例:(org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule)
發起連線
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
//傳出 接收訊息
socket.getInputStream();
//傳入 心跳、釋出訊息
socket.getOutputStream();
中斷連線
if (socket != null) {
socket.close();
}
這麼簡單的程式碼怎麼搭起一個資訊管理中心呢?
那怎麼知道socket什麼時候斷呢?這時候就不得不提到一個介面:
public interface MqttCallback {
public void connectionLost(Throwable cause);
public void messageArrived(String topic, MqttMessage message) throws Exception;
public void deliveryComplete(IMqttDeliveryToken token);
}
“connectionLost”
好在connectionLost 只有一個入口:org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost
這個入口也只有一次呼叫:org.eclipse.paho.client.mqttv3.internal.CommsCallback.shutdownConnection
找了一下呼叫,發現就是在“每個訊息接受/發出”時(即getInputStream、getOutputStream 讀寫處),檢查Exception,有Exception就提示客戶端中斷連線。
由這樣的設計可以知道,這個函式connectionLost 的最大誤差在一個“心跳週期”。
(當然可以通過各種 “優化手段” 去優化,不過最惡劣的情況就是這樣了。)
基於getInputStream、getOutputStream,Paho封裝了兩個方法:CommsReceiver 、CommsSender。很直觀的名字,裡面也很大方的開了兩條執行緒,以CommsSender為例子:
執行緒在這裡開啟:(發現這個庫很多地方都是這樣開執行緒)
public void start(String threadName) {
synchronized (lifecycle) {
if (!running) {
running = true;
sendThread = new Thread(this, threadName);
sendThread.start();
}
}
}
然後這樣做:
out = new MqttOutputStream(clientState, OutputStream );
public void run() {
while (running && (out != null)) {
message = clientState.get();
out.write(message);
out.flush();
}
}
簡單的說 就是不斷的從clientState裡面拿資訊,往socket裡面送。其實也不是“不斷”,在clientState.get()裡面有鎖,一個訊息解一次鎖,解開了就發一次,不然就在clientState.get死迴圈等 鎖,那怎麼等呢?
protected MqttWireMessage get() throws MqttException {
synchronized (queueLock) {
while (result == null) {
queueLock.wait();
result = (MqttWireMessage)pendingMessages.elementAt(0);
}
}
}
看到queueLock.wait();就是在這裡 等,pendingMessages就是訊息佇列。要找什麼時候發訊息?就找什麼時候這個鎖被解開了。
寫了這個多,其實也就想問一個簡單的問題,怎麼發起一次心跳?
最直接就找名字,找到了這個介面 org.eclipse.paho.client.mqttv3.MqttPingSender。
這個介面在庫裡面只看到一個實現org.eclipse.paho.client.mqttv3.TimerPingSender。
注意到裡面有這樣的程式碼:
public void schedule(long delayInMilliseconds) {
timer.schedule(new PingTask(), delayInMilliseconds);
}
private class PingTask extends TimerTask {
public void run() {
comms.checkForActivity();
}
}
checkForActivity進去,又包了一層,
public MqttToken checkForActivity(){
MqttToken token = null;
token = clientState.checkForActivity();
return token;
}
再進去:
public MqttToken checkForActivity() throws MqttException {
...
pingSender.schedule(nextPingTime);
return token;
}
看到pingSender.schedule,這裡定時,準備做下一次心跳。
這個是維持心跳的方法。
直到後面我看了Paho給的android的例子,才知道原來MqttPingSender這樣做是為了易於擴充套件。
org.eclipse.paho.android.service.AlarmPingSender
@Override
public void schedule(long delayInMilliseconds) {
long nextAlarmInMilliseconds = System.currentTimeMillis()
+ delayInMilliseconds;
AlarmManager alarmManager = (AlarmManager) service
.getSystemService(Service.ALARM_SERVICE);
alarmManager.set(AlarmManager.RTC_WAKEUP, nextAlarmInMilliseconds,
pendingIntent);
}
class AlarmReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
...
}
}
在android裡面,休眠後Timer無效,只能用這種迂迴的方式去定時,也是因為有這種迂迴的方式,Paho庫才把傳送Ping的方法收得這麼深。
也是因為這麼深,很多之前用IBM的MQTT庫的人,比如我,才不得不去看看具體的程式碼怎麼跑。其實上面還有一個問題,就是AlarmManager定時在小米系統上無效,被定義為300S對齊喚醒,程式碼中貌似沒有看到對“這種行為”的處理。
有一個小問題,不知道是不是設計上的遺漏,org.eclipse.paho.client.mqttv3.internal.ClientComms,ClientComms是internal的方法,居然可以在外面使用。
回到問題原點,怎麼發起一次心跳?在上面的分析中,看到checkForActivity好像有點痕跡,checkForActivity的註釋裡面也有寫明。(Check and send a ping if needed and check for ping timeout)簡化那一堆程式碼,簡單的就是這樣做:
public MqttToken checkForActivity() throws MqttException {
pendingFlows.insertElementAt(pingCommand, 0);
notifyQueueLock(); //Wake sender thread since it may be in wait state (in ClientState.get()) 這裡解鎖
return token;
}
心跳包怎麼做呢?
//org.eclipse.paho.client.mqttv3.internal.CommsSender
public void run() {
MqttWireMessage message = clientState.get();
out.write(message);
out.flush();
}
//MqttOutputStream
public void write(MqttWireMessage message) throws IOException, MqttException {
final String methodName = "write";
byte[] bytes = message.getHeader();
byte[] pl = message.getPayload();
out.write(bytes,0,bytes.length);
clientState.notifySentBytes(bytes.length);
int offset = 0;
int chunckSize = 1024;
while (offset < pl.length) {
int length = Math.min(chunckSize, pl.length - offset);
out.write(pl, offset, length);
offset += chunckSize;
clientState.notifySentBytes(length);
}
}
//MqttWireMessage
public byte[] getHeader() throws MqttException {
try {
int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
byte[] varHeader = getVariableHeader();
int remLen = varHeader.length + getPayload().length;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeByte(first);
dos.write(encodeMBI(remLen));
dos.write(varHeader);
dos.flush();
return baos.toByteArray();
} catch(IOException ioe) {
throw new MqttException(ioe);
}
}
//MqttPingReq
public class MqttPingReq extends MqttWireMessage {
protected byte[] getVariableHeader() throws MqttException {
return new byte[0];
}
}
皮很厚,MqttWireMessage getHeader中,一共有兩個位元組(不知道有沒有數錯...)
第一部分有Type和Message組成,ping的type是12,資訊長度是0,記作first(頭部),兩個位元組,回頭看看協議:
怎麼收到訊息?
public void run() {
final String methodName = "run";
MqttToken token = null;
while (running && (in != null)) {
try {
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
clientState.notifyReceivedAck((MqttAck)message);
}
} else {
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
} else {
clientState.notifyReceivedMsg(message);
}
}finally {
receiving = false;
}
}
}
意思大概就是不斷的迴圈讀InputStream的資料流。這裡有一個問題,休眠了,整個android世界是停止工作了。執行緒什麼的都會被掛起。
摘要就是:通訊協議棧運行於BP,一旦收到資料包,BP會將AP喚醒,喚醒的時間足夠AP執行程式碼完成對收到的資料包的處理過程。
就是有資料包來的時候,BP會喚醒CPU,CPU起來幹活之後就接著迴圈讀,就讀到推送送過來的新鮮的訊息了。BP耗電低於AP的1/10,類似收到簡訊、電話都是由BP進行監控。
讀了一輪,有幾點收穫:
1、重新認識了socket。
2、對MQTT PING的認知多了一點。
3、維持一條執行緒的開關,以及Paho的封裝方式都挺有意思的。
4、通過原始碼,理解的MQTT的部分效能底線。例如,以前覺得MQTT lostconnect應該是準的,看完程式碼才知道原來還有一個心跳週期的誤差,不是設計沒做好,而是“設計+實際情況”使然。
參考實現:(官網)
git clone https://git.eclipse.org/r/paho/org.eclipse.paho.apps
轉載請保留本文地址-http://blog.csdn.net/yeshennet/article/details/50708115
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Type | DUP flag | QoS level | RETAIN | ||||
byte 2 | Remaining Length |