MQTT簡單介紹與實現
1. MQTT 介紹
它是一種 機器之間通訊 machine-to-machine (M2M)、物聯網 Internet of Things (IoT)常用的一種輕量級消息傳輸協議
適用於網絡帶寬較低的場合
包含發布、訂閱模式,通過一個代理服務器(broker),任何一個客戶端(client)都可以訂閱或者發布某個主題的消息,然後訂閱了該主題的客戶端則會收到該消息
1.1 消息主題
發布消息或者訂閱消息都要選定一個消息主題,消息主題可以任意定制,類似文件系統,用 “/” 進行分隔,例如主題為 /a/b/c/d 的消息
客戶端可以使用完全字符匹配消息,也可以使用通配符進行消息匹配
通配符 + :替換任意單個層級。比如訂閱 /a/b/c/d、/a/+/c/d 、+/+/+/+ 主題的消息即可收到主題為 /a/b/c/d 的消息,而 b/+/c/d 、 +/+/+ 不會匹配
長度為 0 的主題層級也是允許的。比如發布主題為 a//topic 的消息,客戶端可以用 a/+/topic 進行匹配。/a/topic 的主題用 +/a/topic、#、/# 可以匹配。
1.2 服務質量(Quality of Service,QoS)
MQTT 定義了三種客戶端與代理服務器之間消息到達的難度
0:broker/client 之間消息傳一次,並不確認傳到沒有,消息可能丟失
1:broker/client 之間消息至少一次,帶確認消息的傳輸,可能重復收到
2:broker/client 之間消息僅有一次,利用四次握手進行確認,網絡延遲可能會增加
發布等級為 2 ,客戶端訂閱等級為 0, 那麽客戶端接收到的 QoS = 0
發布等級為 0 ,訂閱等級為 2,那麽客戶端接收到的 QoS = 0
1.3 消息保留
即當 broker 正在發送消息給 client 時,消息會保存,如果此時有新的 client 訂閱了該主題的消息,那麽它也會收到消息。這種做法的好處就是當消息主題經常變換的時候,如果有新的 client 訂閱該消息,那麽它不用等待太長的時間就可以收到消息
1.4 會話清除
client 可以設置 clean session 標誌位,當 clean session = false 時,client 失去連接時, broker 會一直保留消息直到 client 重新連接。而 clean session = true 時,broker 會清除所有的消息當這個 client 失去連接。
當 client 連接上 broker 時,client 會提示 broker 它有一個意願消息,這個意願消息將會在 client 失去連接時,broker 發送出去。消息意願和普通消息一樣都包含主題和內容。
2. 實例
js實現過程
function RndNum(n){
var rnd="";
for(var i=0;i<n;i++)
rnd+=Math.floor(Math.random()*10);
return rnd;
}
var hostname = ‘服務器‘,
port = “端口號”,
clientId = RndNum(5),
keepAlive = 100,
cleanSession = false,
userName = ‘‘,
password = ‘‘,
topic = ‘訂閱消息‘;
client = new Paho.MQTT.Client(hostname, port, clientId);
//建立客戶端實例
var options = {
invocationContext: {
host: hostname,
port: port,
path: client.path,
clientId: clientId
},
keepAliveInterval: keepAlive,
cleanSession: cleanSession,
userName: userName,
password: password,
onSuccess: onConnect,
};
client.connect(options);
//連接服務器並註冊連接成功處理事件
function onConnect() {
console.log("onConnected");
client.subscribe(topic);
}
client.onConnectionLost = onConnectionLost;
//註冊連接斷開處理事件
client.onMessageArrived = onMessageArrived;
//註冊消息接收處理事件
function onConnectionLost(responseObject) {
console.log(responseObject);
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" + responseObject.errorMessage);
console.log("連接已斷開");
}
}
function onMessageArrived(message) {
let msg = message.payloadString;
// 消息處理
}
var count = 0;
function start() {
window.tester = window.setInterval(function () {
if(count!=0){
if (client.isConnected) {
var s = "content:" + (count++) ;
message = new Paho.MQTT.Message(s);
message.destinationName = topic;
client.send(message);
}
}else
{
window.clearInterval(window.tester);
}
});
}
java spring boot實現過程
1.添加依賴
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>
2.創建類
public class mqttutil {
private String serviceURI = "服務器端口號";
private String clientID = "客戶端id";
private MqttClientPersistence persistence = new MemoryPersistence();
private String username = "";
private String password = "";
private String topic = "訂閱";
private String MessageStr = "";
private int qos = 0;
/**
* 消息訂閱
**/
public void subscribe() {
try {
MqttClient client = new MqttClient(serviceURI, clientID, persistence);
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("訂閱者連接丟失...");
System.out.println(cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
MessageStr = message.toString();
// System.out.println("訂閱者接收到消息:"+MessageStr);
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setUserName(username);
connectOptions.setPassword(password.toCharArray());
connectOptions.setCleanSession(false);
//訂閱者連接訂閱主題
client.connect(connectOptions);
client.subscribe(topic, qos);
System.out.println("訂閱者連接狀態: " + client.isConnected());
} catch (MqttException e) {
e.printStackTrace();
}
}
MQTT簡單介紹與實現