1. 程式人生 > >Java 連線 MQTT 伺服器

Java 連線 MQTT 伺服器

可用方案

方案 描述 GitHub連結
mqtt-client 由 Red Hat 子公司 FuseSource 開發 mqtt-client
paho.mqtt 由獨立的非營利性公司 Eclipse Foundation 開發 paho.mqtt

Maven 依賴

        <!--paho.mqtt-->
        <dependency>
            <groupId
>
org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
        <!--mqtt-client-->
        <dependency>
            <groupId>org.fusesource.mqtt-client</groupId
>
<artifactId>mqtt-client</artifactId> <version>1.14</version> </dependency>

樣例

公共配置檔案

public class Config {
    private String ac;
    private String pw;
    private String topic;
    private String clientId;
    private String host;

    public
Config() { } public void init(){ this.ac="XX"; this.pw="XX"; this.topic="XX"; this.clientId="XXXX"; this.host="tcp://XXXX:1883"; } public String getAc() { return ac; } public void setAc(String ac) { this.ac = ac; } public String getPw() { return pw; } public void setPw(String pw) { this.pw = pw; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } }

paho.mqtt

    public static void main(String[] args) throws MqttException, InterruptedException {
        Config config = new Config();
        config.init();
        MemoryPersistence persistence = new MemoryPersistence();

        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(config.getAc());
        options.setPassword(config.getPw().toCharArray());
        options.setCleanSession(true);
        options.setConnectionTimeout(10);


        MqttClient client = new MqttClient(config.getHost(), config.getClientId(), persistence);
        client.setCallback(new DeviceMqttCallback(config.getAc()));
        client.connect(options);
        client.subscribe(config.getTopic());

//        MqttTopic topic = client.getTopic(config.getTopic());
//        MqttMessage message = new MqttMessage(new byte[0]);// 清空之前的 retain 訊息
//        message.setRetained(true);
//        topic.publish(message);

        Thread.sleep(2000);
        client.disconnect();
        client.close();
    }

回撥類

public class DeviceMqttCallback implements MqttCallback {

    public DeviceMqttCallback() {
    }

    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("connectionLost:" + cause.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("receive: [topic]:" + topic + "  [msg]:" + message.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("[isComplete]:" + token.isComplete() + "       " + token.getTopics());
    }
}

mqtt-client

    public static void main(String[] args) throws URISyntaxException, InterruptedException {
        Config config = new Config();
        config.init();

        MQTT mqtt = new MQTT();
        mqtt.setHost(config.getHost());
        mqtt.setClientId(UUID.randomUUID().toString().replace("-",""));
        mqtt.setUserName(config.getAc());
        mqtt.setPassword(config.getPw());
        mqtt.setClientId(config.getClientId());
        CallbackConnection connection = mqtt.callbackConnection();
        connection.connect(new Callback<Void>() {
            @Override
            public void onSuccess(Void value) {
                System.out.println("連線成功");
                Topic topic = new Topic(config.getTopic(), QoS.AT_MOST_ONCE);
                connection.subscribe(new Topic[]{topic}, new Callback<byte[]>() {
                    @Override
                    public void onSuccess(byte[] value) {
                        System.out.println("訂閱成功");
                    }

                    @Override
                    public void onFailure(Throwable value) {
                        System.out.println("訂閱失敗");
                    }
                });
            }

            @Override
            public void onFailure(Throwable value) {
                System.out.println("連線失敗");
                value.printStackTrace();
            }
        });
        connection.listener(new DeviceMsgListener());
        Thread.sleep(1000 * 10);// 防止瞬間主執行緒關閉導致連線被關閉
    }