msquitto在java中的應用,一個小Demo
阿新 • • 發佈:2019-09-09
訊息釋出者
package com.gaofei.utils; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 釋出服務 * @author gaofei * */ public class ServerMQTT {//服務端MQTT //tcp://MQTT安裝的伺服器地址:MQTT定義的埠號 public static final String HOST = "tcp://localhost:1883"; //定義一個主題 public static final String TOPIC = "pos_message_all"; //定義MQTT的ID,可以在MQTT服務配置中指定 private static final String clientid = "server11"; private MqttClient client; //定義MQTT釋出者客戶端物件 private MqttTopic topic11; //釋出主題 private String userName = "admin"; //非必須-使用者名稱 private String passWord = "123456"; //非必須-密碼 private MqttMessage message;//釋出者釋出的訊息 public ServerMQTT() throws MqttException { // 初始化與伺服器連線,Url/cliendId/MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 client=new MqttClient(HOST,clientid,new MemoryPersistence()); connect(); } private void connect() { MqttConnectOptions options = new MqttConnectOptions(); //連線伺服器支援的選項設定 options.setCleanSession(false);//每次會話完畢session儲存 options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設定超時時間 options.setConnectionTimeout(10); // 設定會話心跳時間 options.setKeepAliveInterval(20); //時間為 20*1.5s傳送一次 try { client.setCallback(new PushCallback());//處理髮布的訊息 client.connect(options); //客戶端連線上 topic11 = client.getTopic(TOPIC); //獲取主題 } catch (Exception e) { e.printStackTrace(); } } /** * 釋出訊息 * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); //釋出訊息 token.waitForCompletion(); //等待完成 System.out.println("message is published completely! " + token.isComplete()); } /** * 啟動入口 * @param args * @throws MqttException */ public static void main(String[] args) throws MqttException { ServerMQTT server = new ServerMQTT();//建立MQTT釋出客戶端物件 server.message = new MqttMessage(); //建立Mqqt訊息物件 server.message.setQos(1); //設定服務質量QOS:1保證訊息至少能到達一次 server.message.setRetained(true); server.message.setPayload("這是推送訊息的內容".getBytes()); server.publish(server.topic11 , server.message); System.out.println(server.message.isRetained() + "------ratained狀態"); } }
訊息訂閱者
package com.gaofei.utils; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 訂閱服務 * 模擬一個客戶端訂閱服務 * @author rao * */ public class ClientMQTT { public static final String HOST = "tcp://localhost:1883"; public static final String TOPIC1 = "pos_message_all"; private static final String clientid = "client11"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin"; //非必須 private String passWord = "123456"; //非必須 @SuppressWarnings("unused") private ScheduledExecutorService scheduler; private void start() { try { // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連線設定 options = new MqttConnectOptions(); // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,設定為true表示每次連線到伺服器都以新的身份連線 options.setCleanSession(false); // 設定連線的使用者名稱 options.setUserName(userName); // 設定連線的密碼 options.setPassword(passWord.toCharArray()); // 設定超時時間 單位為秒 options.setConnectionTimeout(10); // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設定回撥 client.setCallback(new PushCallback()); MqttTopic topic = client.getTopic(TOPIC1); //setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息 //遺囑 options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); //訂閱訊息 int[] Qos = {1}; String[] topic1 = {TOPIC1}; client.subscribe(topic1, Qos); //訂閱主題 } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { ClientMQTT client = new ClientMQTT(); client.start(); } }
訊息的處理
package com.gaofei.utils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable arg0) {
// 連線丟失後,一般在這裡面進行重連
System.out.println("連線斷開,可以做重連");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {//訊息傳送完
System.out.println("deliveryComplete---------" + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe後得到的訊息會執行到這裡面
System.out.println("接收訊息主題 : " + topic);
System.out.println("接收訊息Qos : " + message.getQos());
System.out.println("接收訊息內容 : " + new String(message.getPayload()));
}
}