Java 連線 MQTT 伺服器
阿新 • • 發佈:2018-11-20
可用方案
方案 | 描述 | GitHub連結 |
---|---|---|
mqtt-client | 由 Red Hat 子公司 FuseSource 開發 | mqtt-client |
paho.mqtt | 由獨立的非營利性公司 Eclipse Foundation 開發 | paho.mqtt |
Maven 依賴
<!--paho.mqtt-->
<dependency>
<groupId >org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<!--mqtt-client-->
<dependency>
<groupId>org.fusesource.mqtt-client</groupId >
<artifactId>mqtt-client</artifactId>
<version>1.14</version>
</dependency>
樣例
公共配置檔案
public class Config {
private String ac;
private String pw;
private String topic;
private String clientId;
private String host;
public Config() {
}
public void init(){
this.ac="XX";
this.pw="XX";
this.topic="XX";
this.clientId="XXXX";
this.host="tcp://XXXX:1883";
}
public String getAc() {
return ac;
}
public void setAc(String ac) {
this.ac = ac;
}
public String getPw() {
return pw;
}
public void setPw(String pw) {
this.pw = pw;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
}
paho.mqtt
public static void main(String[] args) throws MqttException, InterruptedException {
Config config = new Config();
config.init();
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(config.getAc());
options.setPassword(config.getPw().toCharArray());
options.setCleanSession(true);
options.setConnectionTimeout(10);
MqttClient client = new MqttClient(config.getHost(), config.getClientId(), persistence);
client.setCallback(new DeviceMqttCallback(config.getAc()));
client.connect(options);
client.subscribe(config.getTopic());
// MqttTopic topic = client.getTopic(config.getTopic());
// MqttMessage message = new MqttMessage(new byte[0]);// 清空之前的 retain 訊息
// message.setRetained(true);
// topic.publish(message);
Thread.sleep(2000);
client.disconnect();
client.close();
}
回撥類
public class DeviceMqttCallback implements MqttCallback {
public DeviceMqttCallback() {
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("receive: [topic]:" + topic + " [msg]:" + message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("[isComplete]:" + token.isComplete() + " " + token.getTopics());
}
}
mqtt-client
public static void main(String[] args) throws URISyntaxException, InterruptedException {
Config config = new Config();
config.init();
MQTT mqtt = new MQTT();
mqtt.setHost(config.getHost());
mqtt.setClientId(UUID.randomUUID().toString().replace("-",""));
mqtt.setUserName(config.getAc());
mqtt.setPassword(config.getPw());
mqtt.setClientId(config.getClientId());
CallbackConnection connection = mqtt.callbackConnection();
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
System.out.println("連線成功");
Topic topic = new Topic(config.getTopic(), QoS.AT_MOST_ONCE);
connection.subscribe(new Topic[]{topic}, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] value) {
System.out.println("訂閱成功");
}
@Override
public void onFailure(Throwable value) {
System.out.println("訂閱失敗");
}
});
}
@Override
public void onFailure(Throwable value) {
System.out.println("連線失敗");
value.printStackTrace();
}
});
connection.listener(new DeviceMsgListener());
Thread.sleep(1000 * 10);// 防止瞬間主執行緒關閉導致連線被關閉
}