1. 程式人生 > >基於MQTT的訊息推送

基於MQTT的訊息推送

這段時間學習了推送技術,對xmpp和mqtt 協議做了下比較。

xmpp基於xml資訊傳遞,所以傳輸資訊量比較大,在保持長連結情況下功耗會比較大。

可能還是比較適合用來做聊天之類的通訊應用,而對於智慧和物聯低功耗裝置的推送來說,感覺比較笨重。

而mqtt協議就是針對網路頻寬低,高延時,通訊不穩定的環境設計的,特別適合物聯裝置。低通訊量連線保持,簡約輕便。

  • 提供了釋出/訂閱模式,只要訂閱了,即使釋出時客戶端離線,等再次上線時還能收到訊息。
  • 提供了釋出反饋,客戶端收到訊息反饋等機制。
  • 提供了釋出質量,比如至多一次,至少一次,只有一次。可以根據不同業務要求進行選擇。
  • 提供了心跳機制,可自行設定心跳
  • 提供了鑑權機制

可見,提供的功能已經很完整了。

mqtt 詳細協議可見:

中文版的:

現在有很多基於mqtt的開源實現,包含客戶端和伺服器端(或者說中介軟體)。

我做了一個demo,客戶選用paho 提供client-mqtt 和 android server庫,中介軟體是apollo。

apollo 下載地址:

下載完解壓按照裡面的readme 建立和執行自己的broker,依賴java執行環境。

client-mqtt 和 android server庫地址:

可以直接把jar檔案匯入工程,但是我選擇的是把原始碼放入,因為這樣可以根據自己的需要對原始碼做修改。 在android studio上:
1.把jar放入很簡單:放到model libs目錄下,右鍵 As libraries就可以了。 2. 原始碼放入,把sources.jar 分別解壓,放到對應java目錄下。
3. client.mqtt3 裡包含了java的.properties 檔案,它們是針對多語言的,這點和android的設計不一樣,所以android studio編譯時是不能直接把它們打包到class檔案裡面的。需要單獨把它們打包成jar放到lib目錄下,這樣在執行過程就不會報找不到它們了。

  打包很簡單,新建org\eclipse\paho\client\mqttv3\internal\nls 目錄,把它們都放到這個目錄下。

 然後在org 目錄外執行 jar -cvf  properties.jar ./*  , 之後把properties.jar 放入libs下。


  4. 在androidManifest 裡面新增

<service android:name="org.eclipse.paho.android.service.MqttService">
</service>
5. 在自己的activity或者service裡面通過呼叫paho.android.service MqttAndroidClient 類實現釋出和訂閱。
package com.tww.test;

import android.content.ComponentName;
import android.content.Context;
import android.content.ServiceConnection;
import android.os.Bundle;
import android.os.IBinder;
import android.os.PowerManager;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.android.service.MqttTraceHandler;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

import java.io.IOException;
import java.io.InputStream;

import javax.net.ssl.SSLSocketFactory;

public class MainActivity extends AppCompatActivity  implements View.OnClickListener,ServiceConnection, MqttCallback, IMqttActionListener,MqttTraceHandler {
    private static final String TAG = "MainActivity";
    private Button mButton;
    private MqttAndroidClient mMqttAndroidClient;
    private String host = "tcp://192.168.43.224:61613";
    private String userName = "admin";
    private String passWord = "password";
    private IMqttToken mConnectToken;
    private PowerManager.WakeLock mWakeLock;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mButton = (Button) findViewById(R.id.button);
        mButton.setOnClickListener(this);
        mMqttAndroidClient = new MqttAndroidClient(this,host,"123456789",new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath()));
        mMqttAndroidClient.setCallback(this);
        mMqttAndroidClient.setTraceEnabled(true);
        mMqttAndroidClient.setTraceCallback( this);

        Log.d(TAG,"onCreate");
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(60);
        options.setKeepAliveInterval(2*60);
        options.setMqttVersion(3);
        try {
            InputStream caInput = getResources().getAssets().open("keystore.bks");
            if(caInput!=null){
                Log.d(TAG,"do setSocketFactory");
                SSLSocketFactory sslSocketFactory = mMqttAndroidClient.getSSLSocketFactory(caInput,"password");
                options.setSocketFactory(sslSocketFactory);
            }
        } catch (IOException e) {
            e.printStackTrace();
            Log.e(TAG,"do connect IOException:"+e);
        }catch (MqttSecurityException e) {
            e.printStackTrace();
            Log.e(TAG,"do connect MqttSecurityException:"+e);
        }
        try {
            Log.d(TAG,"do connect");
            mConnectToken = mMqttAndroidClient.connect(options,this);
        } catch (MqttException e) {
            Log.e(TAG,"connect MqttException:"+e);
            e.printStackTrace();
        }
        PowerManager powerManager = (PowerManager)getSystemService(Context.POWER_SERVICE);
        mWakeLock = powerManager.newWakeLock(PowerManager.FULL_WAKE_LOCK, TAG);
        mWakeLock.setReferenceCounted(false);
        mWakeLock.acquire();

    }
    @Override
    protected void onDestroy(){
        super.onDestroy();
        if(mMqttAndroidClient != null){
            try {
                mMqttAndroidClient.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
            mMqttAndroidClient.close();
        }
        mWakeLock.release();
    }

    @Override
    public void onClick(View v) {
        if(v.getId()==R.id.button){

        }
    }

    @Override
    public void onServiceConnected(ComponentName name, IBinder service) {
        Log.d(TAG,"onServiceConnected");
    }

    @Override
    public void onServiceDisconnected(ComponentName name) {
        Log.d(TAG,"onServiceDisconnected");
    }

    @Override
    public void connectionLost(Throwable throwable) {
        Log.d(TAG,"connectionLost");
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        Log.d(TAG,"messageArrived:"+mqttMessage.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        Log.d(TAG,"deliveryComplete");
    }

    @Override
    public void onSuccess(IMqttToken iMqttToken) {
        if(mConnectToken.equals(iMqttToken)){
            try {
                Log.d(TAG,"connect success, do subscribe");
                mMqttAndroidClient.subscribe("test",1);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
        if(mConnectToken.equals(iMqttToken)){
           Log.d(TAG,"onFailure success:"+throwable.toString());

        }
    }

    @Override
    public void traceDebug(String s, String s1) {
        Log.d(s,s1);
    }

    @Override
    public void traceError(String s, String s1) {
        Log.e(s,s1);
    }

    @Override
    public void traceException(String s, String s1, Exception e) {
        Log.e(s,s1,e);
    }
}