1. 程式人生 > >mqtt基於paho的訊息訂閱接收的JAVA程式碼

mqtt基於paho的訊息訂閱接收的JAVA程式碼

看到網上的部分程式碼,對於訂閱主題後,使用mqttCallBack介面來接收訊息,雖然這種方法也可以接收到訊息,但是mqtt Paho提供了正規的方法去接收訊息,這裡分享一下自己的demo。

使用callback介面訂閱類:


import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import edu.jia.pub.Publish;

public class Subscribe {
	public static final String HOST = "tcp://116.196.99.111:1883";
	public static final String TOPIC = "topic";
	private static final String clientid = "Client Subscribe";
	private MqttClient client;
	private MqttConnectOptions options;
	private String msg = null;

	// private String userName = "admin";
	// private String passWord = "password";

	// private ScheduledExecutorService scheduler;

	public MqttClient connect() throws MqttException {

		// host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
		this.client = new MqttClient(HOST, clientid, new MemoryPersistence());
		// MQTT的連線設定
		options = new MqttConnectOptions();
		// 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線
		options.setCleanSession(true);
		// 設定連線的使用者名稱
		// options.setUserName(userName);
		// // 設定連線的密碼
		// options.setPassword(passWord.toCharArray());
		// 設定超時時間 單位為秒
		options.setConnectionTimeout(10);
		// 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息 判斷客戶端是否線上,但這個方法並沒有重連的機制
		options.setKeepAliveInterval(20);
		// 設定回撥
		this.client.setCallback(new MqttCallback() {

			public void connectionLost(Throwable cause) {
				// 連線丟失後,一般在這裡面進行重連
				System.out.println("連線斷開,可以做重連");
			}

			public void deliveryComplete(IMqttDeliveryToken token) {
				System.out.println("deliveryComplete---------" + token.isComplete());
			}

			public void messageArrived(String topic, MqttMessage message) throws Exception {
				// subscribe後得到的訊息會執行到這裡面
				System.out.println("接收訊息的主題 : " + topic);
				System.out.println("接收訊息的質量Qos : " + message.getQos());
				//

				msg = new String(message.getPayload());
				System.out.println(">>>>>>>>>>>>>>>>>>>" + msg);
			}

		});

		// MqttTopic topic = client.getTopic(TOPIC);
		// setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息
		// options.setWill(topic, "close".getBytes(), 2, true);

		this.client.connect(options);

		return this.client;

	}

	public IMqttToken subscribe(MqttClient client) throws MqttException {
		// 訂閱訊息
		int Qos = 2;
		String topic1 = TOPIC;
		IMqttToken token = client.subscribeWithResponse(topic1, Qos);
		String str = new String(token.getResponse().getPayload());
		System.out.println("============================" + str);
		return token;
	}

	public static void main(String[] args) throws Throwable {
		System.out.println("下發配置");
		Thread.sleep(3000);
		System.out.println("配置已經下發");
		Thread.sleep(3000);
		System.out.println("監聽回傳訊息");
		Subscribe sub = new Subscribe();
		MqttClient client = sub.connect();
		Thread.sleep(3000);
		System.out.println("建立連線");
		IMqttToken token = sub.subscribe(client);
		if (token.isComplete()) {
			System.out.println("完成訂閱");
			Publish Publish = new Publish();

			Publish.setMessage(new MqttMessage());
			Publish.getMessage().setQos(2);
			Publish.getMessage().setRetained(true);
			Publish.getMessage().setPayload("2018/06-------mqtt服務端測試   msg2".getBytes());
			// 重寫publish方法
			Publish.publish(Publish.getTopic(), Publish.getMessage());
			Publish.getClient().disconnect();
		}

		if (sub.msg == null) {
			Thread.sleep(3000);
			if (sub.msg!=null) {
				System.out.println("====================" + sub.msg);
			}
			else {
				System.out.println("time out ");
			}
		
		}
	}

}

使用subscribe方法接收訊息:

package edu.jia.sub;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
public class TestSub {
 

     private static int qos = 2;
     private static String broker = "tcp://116.196.99.111:1883";

 
     private static MqttClient connect(String clientId) throws MqttException{
    	 MemoryPersistence persistence = new MemoryPersistence();
    	 MqttConnectOptions connOpts = new MqttConnectOptions();

    	 connOpts.setCleanSession(false);

         connOpts.setConnectionTimeout(10);
         connOpts.setKeepAliveInterval(20);

         MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
         mqttClient.connect(connOpts);
    	 return mqttClient;
     }
     
     public static void sub(MqttClient mqttClient,String topic) throws MqttException{
         int[] Qos  = {qos};
         String[] topics = {topic};
         mqttClient.subscribe(topics, Qos);
     }
     
     
    private static void runsub(String clientId, String topic) throws MqttException{
    	MqttClient mqttClient = connect(clientId);
    	if(mqttClient != null){
			sub(mqttClient,topic);
    	}
    	mqttClient.subscribe(topic,2, new IMqttMessageListener() {
			
			@Override
			public void messageArrived(String topic, MqttMessage message) throws Exception {
				// TODO Auto-generated method stub
				System.out.println(new String(message.getPayload()));
			}
		});
    }
    public static void main(String[] args) throws MqttException{
    	
		runsub("testSub", "test");
    }
}

測試:在遠端主機,開啟mqtt服務:mosquitto -c /etc/mosquitto/mosquitto.conf

mostuitto_pub -p 1883 -q 2 -t "test " -m "test ---- ok " 

多說幾句:subscribe提供了三個過載方法,大家可以根據自己的需要選擇需要的過載函式,如果對於listener要求比較高的話,可以將內部類單獨寫出來繼承介面即可。