mosquitto MQTT安裝測試
阿新 • • 發佈:2018-11-05
測試環境為centOS 7
#安裝libwebsockets
wget https://dl.fedoraproject.org/pub/epel/7/x86_64/Packages/l/libwebsockets-1.7.5-1.el7.x86_64.rpm
rpm -ivh libwebsockets-1.7.5-1.el7.x86_64.rpm
#安裝mosquitto
wget https://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo
mv home\:oojah\:mqtt.repo /etc/yum.repos.d/mosquitto.repo
yum install mosquitto mosquitto-clients
cd /etc/mosquitto/
cp mosquitto.conf.example conf.d/mosquitto.conf
vim conf.d/mosquitto.conf
# 修改內容為: allow_anonymous false
#password_file /etc/mosquitto/pwfile
cp pwfile.example pwfile
#建立admin使用者
mosquitto_passwd -c /etc/mosquitto/pwfile admin
#使用【mosquitto -c /etc/mosquitto/mosquitto.conf -d 】可以後臺執行,第一次啟用不要加-d,這樣如果執行出錯可以立即看到反饋
mosquitto -c /etc/mosquitto/mosquitto.conf
#測試
mosquitto_pub -t test -u admin -P password -m "ok"
mosquitto_sub -t test -u admin -P password
# 如果sub收到ok說明測試成功
Publisher
package server;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Publisher {
/**
* 代理伺服器ip地址
*/
// public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:61613"; //apollo
public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:1883";
/**
* 訂閱標識
*/
public static final String MQTT_TOPIC = "testTopic";
/**
* 客戶端唯一標識
*/
public static final String MQTT_CLIENT_ID = "publisher01";
private static String userName = "admin";
private static String password = "password";
private static MqttTopic topic;
private static MqttClient client;
public static void main(String... args) {
// 推送訊息
try {
client = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
topic = client.getTopic(MQTT_TOPIC);
client.connect(options);
client.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable throwable) {
System.out.println("連線斷開,正在重連......");
try {
client.connect(options);
}catch (Exception exception){
System.out.println(exception.getMessage());
}
}
@Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
int i=0;
while (true) {
MqttMessage message = new MqttMessage();
//0 最多一次傳送 (只負責傳送,傳送過後就不管資料的傳送情況)
//1 至少一次傳送 (確認資料交付)
//2 正好一次傳送 (保證資料交付成功)
message.setQos(2);
message.setRetained(true);
message.setPayload((""+i++).getBytes());
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println(i);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Subscriber
package client;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Subscriber {
/**
* 代理伺服器ip地址
*/
// public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:61613";
public static final String MQTT_BROKER_HOST = "tcp://192.168.1.236:1883";
/**
* 客戶端唯一標識
*/
public static final String MQTT_CLIENT_ID = "subscriber01";
/**
* 訂閱標識
*/
public static final String MQTT_TOPIC = "testTopic";
/**
*
*/
public static final String USERNAME = "admin";
/**
* 密碼
*/
public static final String PASSWORD = "password";
private volatile static MqttClient mqttClient;
private static MqttConnectOptions options;
public static void main(String... args) {
try {
// host為主機名,clientid即連線MQTT的客戶端ID,一般以客戶端唯一識別符號表示,
// MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
// 配置引數資訊
options = new MqttConnectOptions();
// 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,
// 這裡設定為true表示每次連線到伺服器都以新的身份連線
options.setCleanSession(false);
// 設定使用者名稱
options.setUserName(USERNAME);
// 設定密碼
options.setPassword(PASSWORD.toCharArray());
// 設定超時時間 單位為秒
options.setConnectionTimeout(10);
// 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 連線
mqttClient.connect(options);
// 訂閱
mqttClient.subscribe(MQTT_TOPIC);
// 設定回撥
mqttClient.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable throwable) {
// 連線丟失後,一般在這裡面進行重連
System.out.println("連線斷開,正在重連......");
try {
mqttClient.connect(options);
}catch (Exception exception){
exception.printStackTrace();
}
}
@Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("toppic:"+s+"\t message:\t"+ mqttMessage.toString());
}
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}