Docker+ActiveMQ+MQTT例子
阿新 • • 發佈:2018-12-26
1.啟動容器:docker run -p 1883:1883 -p 8161:8161 -d leo/activemq
如果沒有Docker,直接啟動ActiveMQ也可以.
ActiveMQ啟動好後,我們只需要寫好publisher和subscriber就好了.
2.需要新增2個maven依賴:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.0</version> </dependency>
第一個依賴是activemq的,
第二個依賴的詳情,請參考:https://eclipse.org/paho/clients/java/
3.Publisher程式碼如下:
package leo.mqtt; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Publisher { public static void main(String[] args) { String topic = "TopicOfLeo"; String content = "Hello Leo!"; int qos = 1;//0:msg只發1次,並且不需要確認. 1:msg至少發1次,需要確認. 2.發且僅發1次,並且需要進行4次揮手. String broker = "tcp://127.0.0.1:1883"; String clientId = "publisher1"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false);//第一次啟動是否清除session client.connect(connOpts); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(topic, message); client.disconnect(); System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
4.subscriber程式碼如下:
package leo.mqtt; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Subscriber { public static void main(String[] args) { String topic = "TopicOfLeo"; String broker = "tcp://127.0.0.1:1883"; String clientId = "Subscriber1"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false); client.connect(connOpts); client.subscribe(topic, new IMqttMessageListener() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(topic + " -> " + message); } }); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
5.先執行subscriber,再執行publisher,即可看到效果.
入門demo到此結束.