MQTT釋出訂閱程式完整程式碼
阿新 • • 發佈:2019-01-26
- 工具類,包含有釋出者方法和訂閱者方法。
package cn.com.bonc.wholeCode; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.UUID; /** * mqtt的釋出和訂閱 * * @author wzq */ public class PublishSubscribe { private static String serviceURI = "tcp://172.16.22.160:1883"; private static String clientID = UUID.randomUUID().toString(); private static MqttClientPersistence persistence = new MemoryPersistence(); //如果mqtt服務配置了匿名訪問,則不需要使用使用者名稱和密碼就可以實現訊息的訂閱和釋出 // private static String username = "username"; // private static String password = "password"; private static String topic = "cebPark"; /* 訊息服務質量,一共有三個: 0:盡力而為。訊息可能會丟,但絕不會重複傳輸 1:訊息絕不會丟,但可能會重複傳輸 2:恰好一次。每條訊息肯定會被傳輸一次且僅傳輸一次 */ private static int qos = 0; /** * 訊息釋出 * * @author wzq **/ public static void publish() { try { MqttClient client = new MqttClient(serviceURI, clientID, persistence); MqttConnectOptions connectOptions = new MqttConnectOptions(); // connectOptions.setUserName(username); // connectOptions.setPassword(password.toCharArray()); connectOptions.setCleanSession(false); //釋出者連線服務 client.connect(connectOptions); System.out.println("釋出者連線狀態: " + client.isConnected()); MqttTopic mqttTopic = client.getTopic(topic); //MqttMessage mqttMessage = new MqttMessage(message.getBytes()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); int i = 1; String message = "hello,智慧公廁-"; while (true) { String payLoad = message + i++; mqttMessage.setPayload(payLoad.getBytes()); MqttDeliveryToken deliveryToken = mqttTopic.publish(mqttMessage); if (!deliveryToken.isComplete()) { System.out.println("釋出者釋出訊息: " + payLoad + " 失敗"); deliveryToken.waitForCompletion(); } else { System.out.println("釋出者釋出訊息: " + payLoad + " 成功"); } } } catch (Exception e) { e.printStackTrace(); } } /** * 訊息訂閱 * * @author wzq **/ public static void subscribe() { try { MqttClient client = new MqttClient(serviceURI, clientID, persistence); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("訂閱者連線丟失..."); System.out.println(cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) { System.out.println("訂閱者接收到訊息: " + message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); MqttConnectOptions connectOptions = new MqttConnectOptions(); // connectOptions.setUserName(username); // connectOptions.setPassword(password.toCharArray()); connectOptions.setCleanSession(false); //訂閱者連線訂閱主題 client.connect(connectOptions); client.subscribe(topic, qos); System.out.println("訂閱者連線狀態: " + client.isConnected()); } catch (MqttException e) { e.printStackTrace(); } } }
- 釋出者
package cn.com.bonc.wholeCode; /** * mqtt釋出 * @author: wzq * @time: 2018-07-27 16:43 */ public class Publish { public static void main(String[] args) { PublishSubscribe.publish(); } }
- 訂閱者
package cn.com.bonc.wholeCode; /** * mqtt訂閱 * @author: wzq * @time: 2018-07-27 16:43 */ public class Subscribe { public static void main(String[] args) { PublishSubscribe.subscribe(); } }