1. 程式人生 > >mosquitto MQTT安裝測試

mosquitto MQTT安裝測試

測試環境為centOS 7

#安裝libwebsockets
wget https://dl.fedoraproject.org/pub/epel/7/x86_64/Packages/l/libwebsockets-1.7.5-1.el7.x86_64.rpm
rpm -ivh libwebsockets-1.7.5-1.el7.x86_64.rpm

#安裝mosquitto
wget https://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo
mv home\:oojah\:mqtt.repo /etc/yum.repos.d/mosquitto.repo
yum install
mosquitto mosquitto-clients cd /etc/mosquitto/ cp mosquitto.conf.example conf.d/mosquitto.conf vim conf.d/mosquitto.conf # 修改內容為: allow_anonymous false #password_file /etc/mosquitto/pwfile cp pwfile.example pwfile #建立admin使用者 mosquitto_passwd -c /etc/mosquitto/pwfile admin #使用【mosquitto -c /etc/mosquitto/mosquitto.conf -d 】可以後臺執行,第一次啟用不要加-d,這樣如果執行出錯可以立即看到反饋
mosquitto -c /etc/mosquitto/mosquitto.conf #測試 mosquitto_pub -t test -u admin -P password -m "ok" mosquitto_sub -t test -u admin -P password # 如果sub收到ok說明測試成功

Publisher

package server;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public
class Publisher { /** * 代理伺服器ip地址 */ // public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:61613"; //apollo public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:1883"; /** * 訂閱標識 */ public static final String MQTT_TOPIC = "testTopic"; /** * 客戶端唯一標識 */ public static final String MQTT_CLIENT_ID = "publisher01"; private static String userName = "admin"; private static String password = "password"; private static MqttTopic topic; private static MqttClient client; public static void main(String... args) { // 推送訊息 try { client = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); topic = client.getTopic(MQTT_TOPIC); client.connect(options); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { System.out.println("連線斷開,正在重連......"); try { client.connect(options); }catch (Exception exception){ System.out.println(exception.getMessage()); } } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); int i=0; while (true) { MqttMessage message = new MqttMessage(); //0 最多一次傳送 (只負責傳送,傳送過後就不管資料的傳送情況) //1 至少一次傳送 (確認資料交付) //2 正好一次傳送 (保證資料交付成功) message.setQos(2); message.setRetained(true); message.setPayload((""+i++).getBytes()); MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println(i); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } } }

Subscriber

package client;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Subscriber {

  /**
   * 代理伺服器ip地址
   */
//  public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:61613";
  public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:1883";

  /**
   * 客戶端唯一標識
   */
  public static final String MQTT_CLIENT_ID = "subscriber01";

  /**
   * 訂閱標識
   */
  public static final String MQTT_TOPIC = "testTopic";

  /**
   *
   */
  public static final String USERNAME = "admin";
  /**
   * 密碼
   */
  public static final String PASSWORD = "password";

  private volatile static MqttClient mqttClient;
  private static MqttConnectOptions options;

  public static void main(String... args) {
    try {
      // host為主機名,clientid即連線MQTT的客戶端ID,一般以客戶端唯一識別符號表示,
      // MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
      mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
      // 配置引數資訊
      options = new MqttConnectOptions();
      // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,
      // 這裡設定為true表示每次連線到伺服器都以新的身份連線
      options.setCleanSession(false);
      // 設定使用者名稱
      options.setUserName(USERNAME);
      // 設定密碼
      options.setPassword(PASSWORD.toCharArray());
      // 設定超時時間 單位為秒
      options.setConnectionTimeout(10);
      // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制
      options.setKeepAliveInterval(20);
      // 連線
      mqttClient.connect(options);
      // 訂閱
      mqttClient.subscribe(MQTT_TOPIC);
      // 設定回撥
      mqttClient.setCallback(new MqttCallback() {

        @Override public void connectionLost(Throwable throwable) {
          // 連線丟失後,一般在這裡面進行重連
          System.out.println("連線斷開,正在重連......");
          try {
            mqttClient.connect(options);
          }catch (Exception exception){
            exception.printStackTrace();
          }
        }

        @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
          System.out.println("toppic:"+s+"\t message:\t"+ mqttMessage.toString());
        }

        @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
      });
    } catch (Exception e) {
      e.printStackTrace();
    }

  }

}