1. 程式人生 > >利用ActiveMQ使用mqttv3發訊息Android接收訊息

利用ActiveMQ使用mqttv3發訊息Android接收訊息

在Linux伺服器端安裝好並啟動ActiveMQ後使用

一、傳送訊息

public interface PushCallBack {
    int saveOnDone(boolean isOk);
}
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
public class MQTT {
    private String host = "tcp://192.168.0.1:1883";
    private
String userName = "admin"; private String passWord = "admin"; private MqttClient client = null; public static void main(String[] args) { //傳送訊息 MQTT mq = new MQTT(); mq.passWord = "admin"; for (int i = 0; i < 2; i++) { mq.send("Jaycekon-MQ", "HelloTest"
+ i, new PushCallBack() { @Override public int saveOnDone(boolean isOk) { System.out.println("MQTT.main(...).new PushCallBack() {...}.saveOnDone()"); return 0; } }); } } private static
MQTT mqtt = null; public static MQTT get() { if (mqtt == null) { mqtt = new MQTT(); } return mqtt; } private MQTT() { init(); } private void init() { try { String clientid = MqttClient.generateClientId(); if (clientid.length() > 23) { clientid = clientid.substring(clientid.length() - 23); } client = new MqttClient(host, clientid, new MqttDefaultFilePersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out .println("MqttCallback connectionLost-----------連線丟失"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out .println("MqttCallback deliveryComplete---------交付完成" + token.getMessageId() + "|" + token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage arg1) throws Exception { System.out .println("MqttCallback messageArrived----------訊息到達"); } }); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } private void close() { try { if (client != null) { client.close(); client = null; } } catch (Exception e) { e.printStackTrace(); } } public void send(final String topics, final String notify, PushCallBack pushCallBack) { send(topics, notify, 1, pushCallBack); } private void send(final String topics, final String notify, final int trycnt, final PushCallBack pushCallBack) { try { MqttDeliveryToken token; MqttMessage message; MqttTopic topic = client.getTopic(topics); message = new MqttMessage(); message.setQos(2); message.setRetained(true); message.setPayload(notify.getBytes("UTF-8")); token = topic.publish(message); final int msgid = token.getMessageId(); System.out .println(message.isRetained() + "------ratained|" + msgid); token.setActionCallback(new IMqttActionListener() { @Override public void onFailure(IMqttToken arg0, Throwable arg1) { if (pushCallBack != null) { pushCallBack.saveOnDone(false); } System.out.println(msgid + "------ActionCallback false"); } @Override public void onSuccess(IMqttToken arg0) { if (pushCallBack != null) pushCallBack.saveOnDone(true); System.out.println(msgid + "------ActionCallback true"); } }); token.waitForCompletion(); } catch (Exception e) { if (trycnt < 2) {// 繼續嘗試一次 close(); init(); send(topics, notify, trycnt + 1, pushCallBack); } else { e.printStackTrace(); } } } }

二、Android接收訊息

public class MainActivity extends Activity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Intent intent = new Intent(this, MQTTService.class);
        startService(intent);

    }
}
package com.lgz.mq;

import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.widget.Toast;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQTTService extends Service {
    //訊息伺服器的URL
    public static final String BROKER_URL = "tcp://192.168.199.174:1883";
    //客戶端ID,用來標識一個客戶,可以根據不同的策略來生成
    public static final String clientId = "admin";
    //訂閱的主題名
    public static final String TOPIC = "Jaycekon-MQ";
    //mqtt客戶端類
    private MqttClient mqttClient;
    //mqtt連線配置類
    private MqttConnectOptions options;

    private String userName = "admin";
    private String passWord = "admin";

    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        try {
            //在服務開始時new一個mqttClient例項,客戶端ID為clientId,第三個引數說明是持久化客戶端,如果是null則是非持久化
            mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());
            // MQTT的連線設定
            options = new MqttConnectOptions();
            // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線
            //換而言之,設定為false時可以客戶端可以接受離線訊息
            options.setCleanSession(false);
            // 設定連線的使用者名稱
            options.setUserName(userName);
            // 設定連線的密碼
            options.setPassword(passWord.toCharArray());
            // 設定超時時間 單位為秒
            options.setConnectionTimeout(10);
            // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制
            options.setKeepAliveInterval(20);
            // 設定回撥  回撥類的說明看後面
            mqttClient.setCallback(new PushCallback());
            MqttTopic topic = mqttClient.getTopic(TOPIC);
            //setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息
            options.setWill(topic, "close".getBytes(), 2, true);
            //mqtt客戶端連線伺服器
            mqttClient.connect(options);

            //mqtt客戶端訂閱主題
            //在mqtt中用QoS來標識服務質量
            //QoS=0時,報文最多傳送一次,有可能丟失
            //QoS=1時,報文至少傳送一次,有可能重複
            //QoS=2時,報文只發送一次,並且確保訊息只到達一次。
            int[] Qos = {1};
            String[] topic1 = {TOPIC};
            mqttClient.subscribe(topic1, Qos);

        } catch (MqttException e) {
            Toast.makeText(getApplicationContext(), "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG).show();
            e.printStackTrace();
        }
        return super.onStartCommand(intent, flags, startId);
    }

    @Override
    public void onDestroy() {
        try {
            mqttClient.disconnect(0);
        } catch (MqttException e) {
            Toast.makeText(getApplicationContext(), "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG).show();
            e.printStackTrace();
        }
    }
}
package com.lgz.mq;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PushCallback implements MqttCallback {

    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("連線失敗---");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(" 有新訊息到達時的回撥方法");
        System.out.println(" topic = " + topic);
        String msg = new String(message.getPayload());
        System.out.println("msg = " + msg);
        System.out.println("qos = " + message.getQos());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("--deliveryComplete--成功釋出某一訊息後的回撥方法");
    }

}

別忘了在AndroidManifest.xml中新增服務和網路許可權等

<uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_WIFI_STATE" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

    <application
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">
        <activity android:name=".MainActivity">
            <intent-filter>
                <action android:name="android.intent.action.MAIN" />

                <category android:name="android.intent.category.LAUNCHER" />
            </intent-filter>
        </activity>
        <service android:name=".MQTTService" />
    </application>