基於MQTT的訊息推送
阿新 • • 發佈:2019-01-01
這段時間學習了推送技術,對xmpp和mqtt 協議做了下比較。
xmpp基於xml資訊傳遞,所以傳輸資訊量比較大,在保持長連結情況下功耗會比較大。
可能還是比較適合用來做聊天之類的通訊應用,而對於智慧和物聯低功耗裝置的推送來說,感覺比較笨重。
而mqtt協議就是針對網路頻寬低,高延時,通訊不穩定的環境設計的,特別適合物聯裝置。低通訊量連線保持,簡約輕便。
- 提供了釋出/訂閱模式,只要訂閱了,即使釋出時客戶端離線,等再次上線時還能收到訊息。
- 提供了釋出反饋,客戶端收到訊息反饋等機制。
- 提供了釋出質量,比如至多一次,至少一次,只有一次。可以根據不同業務要求進行選擇。
- 提供了心跳機制,可自行設定心跳
- 提供了鑑權機制
可見,提供的功能已經很完整了。
mqtt 詳細協議可見:
中文版的:
現在有很多基於mqtt的開源實現,包含客戶端和伺服器端(或者說中介軟體)。
我做了一個demo,客戶選用paho 提供client-mqtt 和 android server庫,中介軟體是apollo。
apollo 下載地址:
下載完解壓按照裡面的readme 建立和執行自己的broker,依賴java執行環境。
client-mqtt 和 android server庫地址:
3. client.mqtt3 裡包含了java的.properties 檔案,它們是針對多語言的,這點和android的設計不一樣,所以android studio編譯時是不能直接把它們打包到class檔案裡面的。需要單獨把它們打包成jar放到lib目錄下,這樣在執行過程就不會報找不到它們了。
打包很簡單,新建org\eclipse\paho\client\mqttv3\internal\nls 目錄,把它們都放到這個目錄下。
然後在org 目錄外執行 jar -cvf properties.jar ./* , 之後把properties.jar 放入libs下。
4. 在androidManifest 裡面新增
<service android:name="org.eclipse.paho.android.service.MqttService"> </service>5. 在自己的activity或者service裡面通過呼叫paho.android.service MqttAndroidClient 類實現釋出和訂閱。
package com.tww.test;
import android.content.ComponentName;
import android.content.Context;
import android.content.ServiceConnection;
import android.os.Bundle;
import android.os.IBinder;
import android.os.PowerManager;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.android.service.MqttTraceHandler;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import java.io.IOException;
import java.io.InputStream;
import javax.net.ssl.SSLSocketFactory;
public class MainActivity extends AppCompatActivity implements View.OnClickListener,ServiceConnection, MqttCallback, IMqttActionListener,MqttTraceHandler {
private static final String TAG = "MainActivity";
private Button mButton;
private MqttAndroidClient mMqttAndroidClient;
private String host = "tcp://192.168.43.224:61613";
private String userName = "admin";
private String passWord = "password";
private IMqttToken mConnectToken;
private PowerManager.WakeLock mWakeLock;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mButton = (Button) findViewById(R.id.button);
mButton.setOnClickListener(this);
mMqttAndroidClient = new MqttAndroidClient(this,host,"123456789",new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath()));
mMqttAndroidClient.setCallback(this);
mMqttAndroidClient.setTraceEnabled(true);
mMqttAndroidClient.setTraceCallback( this);
Log.d(TAG,"onCreate");
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(2*60);
options.setMqttVersion(3);
try {
InputStream caInput = getResources().getAssets().open("keystore.bks");
if(caInput!=null){
Log.d(TAG,"do setSocketFactory");
SSLSocketFactory sslSocketFactory = mMqttAndroidClient.getSSLSocketFactory(caInput,"password");
options.setSocketFactory(sslSocketFactory);
}
} catch (IOException e) {
e.printStackTrace();
Log.e(TAG,"do connect IOException:"+e);
}catch (MqttSecurityException e) {
e.printStackTrace();
Log.e(TAG,"do connect MqttSecurityException:"+e);
}
try {
Log.d(TAG,"do connect");
mConnectToken = mMqttAndroidClient.connect(options,this);
} catch (MqttException e) {
Log.e(TAG,"connect MqttException:"+e);
e.printStackTrace();
}
PowerManager powerManager = (PowerManager)getSystemService(Context.POWER_SERVICE);
mWakeLock = powerManager.newWakeLock(PowerManager.FULL_WAKE_LOCK, TAG);
mWakeLock.setReferenceCounted(false);
mWakeLock.acquire();
}
@Override
protected void onDestroy(){
super.onDestroy();
if(mMqttAndroidClient != null){
try {
mMqttAndroidClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
mMqttAndroidClient.close();
}
mWakeLock.release();
}
@Override
public void onClick(View v) {
if(v.getId()==R.id.button){
}
}
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
Log.d(TAG,"onServiceConnected");
}
@Override
public void onServiceDisconnected(ComponentName name) {
Log.d(TAG,"onServiceDisconnected");
}
@Override
public void connectionLost(Throwable throwable) {
Log.d(TAG,"connectionLost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
Log.d(TAG,"messageArrived:"+mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.d(TAG,"deliveryComplete");
}
@Override
public void onSuccess(IMqttToken iMqttToken) {
if(mConnectToken.equals(iMqttToken)){
try {
Log.d(TAG,"connect success, do subscribe");
mMqttAndroidClient.subscribe("test",1);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
if(mConnectToken.equals(iMqttToken)){
Log.d(TAG,"onFailure success:"+throwable.toString());
}
}
@Override
public void traceDebug(String s, String s1) {
Log.d(s,s1);
}
@Override
public void traceError(String s, String s1) {
Log.e(s,s1);
}
@Override
public void traceException(String s, String s1, Exception e) {
Log.e(s,s1,e);
}
}