MQTT協議之moquette 安裝使用
阿新 • • 發佈:2019-01-30
在MQTT 官網 (http://mqtt.org/software)中有眾多MQTT的實現方式。具體參看官網,Moquette是基於netty(老版本使用的是mina) 的模型的一個Java MQTT broker,支援websocket,SSL。
如果想直接啟動 moquette-broker-0.4-jar-with-dependencies.jar的jar檔案方式
可以執行一些命令實現
java -jar moquette-broker-0.4-jar-with-dependencies.jar
google code 下載MQTT moquette Broker 地址:
如果想直接啟動 moquette-broker-0.4-jar-with-dependencies.jar的jar檔案方式
可以執行一些命令實現
java -jar moquette-broker-0.4-jar-with-dependencies.jar
google code 下載MQTT moquette Broker 地址:
MQTT moquette 的broker服務啟動程式碼(啟動類org.dna.mqtt.moquette.server.Server)如下:
/* * Copyright (c) 2012-2014 The original author or authors * ------------------------------------------------------ * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */ package org.dna.mqtt.moquette.server; import java.io.File; import java.io.IOException; import java.text.ParseException; import java.util.Properties; import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging; import org.dna.mqtt.moquette.server.netty.NettyAcceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Launch a configured version of the server. * @author andrea */ public class Server { private static final Logger LOG = LoggerFactory.getLogger(Server.class); //資料持久化資料目錄,使用mapdb /*MapDB是一個快速、易用的嵌入式Java資料庫引擎,它提供了基於磁碟或者堆外(off- heap允許Java直接操作記憶體空間, * 類似於C的malloc和free)儲存的併發的Maps、Sets、Queues。MapDB的前身是JDBM,已經有15年的歷史。 * MapDB支援 ACID事務、MVCC隔離,它的jar包只有200KB,且無其它依賴,非常輕量。 * 相對來說功能已經穩定,並有全職 的開發者支援開發。*/ public static final String STORAGE_FILE_PATH = System.getProperty("user.home") + File.separator + "moquette_store.mapdb"; private ServerAcceptor m_acceptor; SimpleMessaging messaging; public static void main(String[] args) throws IOException { final Server server = new Server(); server.startServer(); System.out.println("Server started, version 0.7-SNAPSHOT"); //程序關閉前,釋放資源 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { server.stopServer(); } }); } /** * Starts Moquette bringing the configuration from the file * located at config/moquette.conf */ public void startServer() throws IOException { String configPath = System.getProperty("moquette.path", null); startServer(new File(configPath, "config/moquette.conf")); } /** * Starts Moquette bringing the configuration from the given file */ public void startServer(File configFile) throws IOException { ConfigurationParser confParser = new ConfigurationParser(); try { confParser.parse(configFile); } catch (ParseException pex) { LOG.warn("An error occurred in parsing configuration, fallback on default configuration", pex); } Properties configProps = confParser.getProperties(); startServer(configProps); } /** * Starts the server with the given properties. * * Its need at least the following properties: * <ul> * <li>port</li> * <li>password_file</li> * </ul> */ public void startServer(Properties configProps) throws IOException { messaging = SimpleMessaging.getInstance(); messaging.init(configProps); m_acceptor = new NettyAcceptor(); m_acceptor.initialize(messaging, configProps); } public void stopServer() { System.out.println("Server stopping..."); messaging.stop(); m_acceptor.close(); System.out.println("Server stopped"); } }
下載moquette-mqtt原始碼,匯入eclipse中,執行啟動類。預設埠:1883
配置說明:config/moquette.conf
密碼檔案password_file.conf,使用者名稱密碼採用冒號分割":"。############################################## # Moquette configuration file. # # The synthax is equals to mosquitto.conf # ############################################## #啟動服務埠 port 1883 #websocket 埠 websocket_port 8080 #啟動主機的IP host 0.0.0.0 #密碼檔案 password_file password_file.conf ##支援SSL #ssl_port 8883 #jks_path serverkeystore.jks #key_store_password passw0rdsrv #key_manager_password passw0rdsrv
該專案使用distribution打包成產品包。下面使用mqtt-client採用阻塞式實現訊息的釋出並接收。
傳送訊息:
package cn.smartslim.mqtt.demo.fusesource;
import java.net.URISyntaxException;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
/**
* MQTT moquette 的Server 段用於釋出主題,併發布主題資訊
* 採用阻塞式 釋出主題
*/
public class MQTTServer {
private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗網路,但是又需要及時獲取資料,心跳30s
public final static long RECONNECTION_ATTEMPT_MAX=6;
public final static long RECONNECTION_DELAY=2000;
public final static int SEND_BUFFER_SIZE=2*1024*1024;//傳送最大緩衝為2M
public static void main(String[] args) {
MQTT mqtt = new MQTT();
try {
//設定服務端的ip
mqtt.setHost(CONNECTION_STRING);
//連線前清空會話資訊
mqtt.setCleanSession(CLEAN_START);
//設定重新連線的次數
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
//設定重連的間隔時間
mqtt.setReconnectDelay(RECONNECTION_DELAY);
//設定心跳時間
mqtt.setKeepAlive(KEEP_ALIVE);
//設定緩衝的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
//建立連線 ,使用阻塞式
BlockingConnection connection = mqtt.blockingConnection();
//開始連線
connection.connect();
try {
int count=0;
while(true){
count++;
//訂閱的主題
String topic="mqtt/test";
//主題的內容
String message="hello "+count+" mqtt!";
connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false);
System.out.println("MQTTServer Message Topic="+topic+" Content :"+message);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
接收訊息:
package cn.smartslim.mqtt.demo.fusesource;
import java.net.URISyntaxException;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
/**
* MQTT moquette 的Client 段用於訂閱主題,並接收主題資訊
* 採用阻塞式 訂閱主題
*/
public class MQTTClient {
private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗網路,但是又需要及時獲取資料,心跳30s
public static Topic[] topics = {
new Topic("china/beijing", QoS.EXACTLY_ONCE)};
public final static long RECONNECTION_ATTEMPT_MAX=6;
public final static long RECONNECTION_DELAY=2000;
public final static int SEND_BUFFER_SIZE=2*1024*1024;//傳送最大緩衝為2M
public static void main(String[] args) {
//建立MQTT物件
MQTT mqtt = new MQTT();
BlockingConnection connection=null;
try {
//設定mqtt broker的ip和埠
mqtt.setHost(CONNECTION_STRING);
//連線前清空會話資訊
mqtt.setCleanSession(CLEAN_START);
//設定重新連線的次數
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
//設定重連的間隔時間
mqtt.setReconnectDelay(RECONNECTION_DELAY);
//設定心跳時間
mqtt.setKeepAlive(KEEP_ALIVE);
//設定緩衝的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
//獲取mqtt的連線物件BlockingConnection
connection = mqtt.blockingConnection();
//MQTT連線的建立
connection.connect();
//建立相關的MQTT 的主題列表
Topic[] topics = {new Topic("mqtt/test", QoS.AT_LEAST_ONCE)};
//訂閱相關的主題資訊
byte[] qoses = connection.subscribe(topics);
//
while(true){
//接收訂閱的訊息內容
Message message = connection.receive();
//獲取訂閱的訊息內容
byte[] payload = message.getPayload();
// process the message then:
System.out.println("MQTTClient Message Topic="+message.getTopic()+" Content :"+new String(payload));
//簽收訊息的回執
message.ack();
Thread.sleep(2000);
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
connection.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}