利用ActiveMQ使用mqttv3發訊息Android接收訊息
阿新 • • 發佈:2019-01-27
在Linux伺服器端安裝好並啟動ActiveMQ後使用
一、傳送訊息
public interface PushCallBack {
int saveOnDone(boolean isOk);
}
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
public class MQTT {
private String host = "tcp://192.168.0.1:1883";
private String userName = "admin";
private String passWord = "admin";
private MqttClient client = null;
public static void main(String[] args) {
//傳送訊息
MQTT mq = new MQTT();
mq.passWord = "admin";
for (int i = 0; i < 2; i++) {
mq.send("Jaycekon-MQ", "HelloTest" + i, new PushCallBack() {
@Override
public int saveOnDone(boolean isOk) {
System.out.println("MQTT.main(...).new PushCallBack() {...}.saveOnDone()");
return 0;
}
});
}
}
private static MQTT mqtt = null;
public static MQTT get() {
if (mqtt == null) {
mqtt = new MQTT();
}
return mqtt;
}
private MQTT() {
init();
}
private void init() {
try {
String clientid = MqttClient.generateClientId();
if (clientid.length() > 23) {
clientid = clientid.substring(clientid.length() - 23);
}
client = new MqttClient(host, clientid,
new MqttDefaultFilePersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out
.println("MqttCallback connectionLost-----------連線丟失");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out
.println("MqttCallback deliveryComplete---------交付完成"
+ token.getMessageId()
+ "|"
+ token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage arg1)
throws Exception {
System.out
.println("MqttCallback messageArrived----------訊息到達");
}
});
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
private void close() {
try {
if (client != null) {
client.close();
client = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void send(final String topics, final String notify,
PushCallBack pushCallBack) {
send(topics, notify, 1, pushCallBack);
}
private void send(final String topics, final String notify,
final int trycnt, final PushCallBack pushCallBack) {
try {
MqttDeliveryToken token;
MqttMessage message;
MqttTopic topic = client.getTopic(topics);
message = new MqttMessage();
message.setQos(2);
message.setRetained(true);
message.setPayload(notify.getBytes("UTF-8"));
token = topic.publish(message);
final int msgid = token.getMessageId();
System.out
.println(message.isRetained() + "------ratained|" + msgid);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
if (pushCallBack != null) {
pushCallBack.saveOnDone(false);
}
System.out.println(msgid + "------ActionCallback false");
}
@Override
public void onSuccess(IMqttToken arg0) {
if (pushCallBack != null)
pushCallBack.saveOnDone(true);
System.out.println(msgid + "------ActionCallback true");
}
});
token.waitForCompletion();
} catch (Exception e) {
if (trycnt < 2) {// 繼續嘗試一次
close();
init();
send(topics, notify, trycnt + 1, pushCallBack);
} else {
e.printStackTrace();
}
}
}
}
二、Android接收訊息
public class MainActivity extends Activity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Intent intent = new Intent(this, MQTTService.class);
startService(intent);
}
}
package com.lgz.mq;
import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.widget.Toast;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQTTService extends Service {
//訊息伺服器的URL
public static final String BROKER_URL = "tcp://192.168.199.174:1883";
//客戶端ID,用來標識一個客戶,可以根據不同的策略來生成
public static final String clientId = "admin";
//訂閱的主題名
public static final String TOPIC = "Jaycekon-MQ";
//mqtt客戶端類
private MqttClient mqttClient;
//mqtt連線配置類
private MqttConnectOptions options;
private String userName = "admin";
private String passWord = "admin";
public IBinder onBind(Intent intent) {
return null;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
try {
//在服務開始時new一個mqttClient例項,客戶端ID為clientId,第三個引數說明是持久化客戶端,如果是null則是非持久化
mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());
// MQTT的連線設定
options = new MqttConnectOptions();
// 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線
//換而言之,設定為false時可以客戶端可以接受離線訊息
options.setCleanSession(false);
// 設定連線的使用者名稱
options.setUserName(userName);
// 設定連線的密碼
options.setPassword(passWord.toCharArray());
// 設定超時時間 單位為秒
options.setConnectionTimeout(10);
// 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 設定回撥 回撥類的說明看後面
mqttClient.setCallback(new PushCallback());
MqttTopic topic = mqttClient.getTopic(TOPIC);
//setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息
options.setWill(topic, "close".getBytes(), 2, true);
//mqtt客戶端連線伺服器
mqttClient.connect(options);
//mqtt客戶端訂閱主題
//在mqtt中用QoS來標識服務質量
//QoS=0時,報文最多傳送一次,有可能丟失
//QoS=1時,報文至少傳送一次,有可能重複
//QoS=2時,報文只發送一次,並且確保訊息只到達一次。
int[] Qos = {1};
String[] topic1 = {TOPIC};
mqttClient.subscribe(topic1, Qos);
} catch (MqttException e) {
Toast.makeText(getApplicationContext(), "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG).show();
e.printStackTrace();
}
return super.onStartCommand(intent, flags, startId);
}
@Override
public void onDestroy() {
try {
mqttClient.disconnect(0);
} catch (MqttException e) {
Toast.makeText(getApplicationContext(), "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG).show();
e.printStackTrace();
}
}
}
package com.lgz.mq;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
System.out.println("連線失敗---");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(" 有新訊息到達時的回撥方法");
System.out.println(" topic = " + topic);
String msg = new String(message.getPayload());
System.out.println("msg = " + msg);
System.out.println("qos = " + message.getQos());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("--deliveryComplete--成功釋出某一訊息後的回撥方法");
}
}
別忘了在AndroidManifest.xml中新增服務和網路許可權等
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_WIFI_STATE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:supportsRtl="true"
android:theme="@style/AppTheme">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<service android:name=".MQTTService" />
</application>