1. 程式人生 > >Android Mqtt 消息推送使用

Android Mqtt 消息推送使用

time lean listen tid let valueof idc ktr ans

初始化SDK:

/**
     * 初始化SDK
     *
     * @param context context
     */
    public void initSDK(Context context) {
        String clientId = String.valueOf(System.currentTimeMillis()+userId);
        mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, clientId);
        subscriptionTopics = new ArrayList<>();
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {

                if (reconnect) {

                    Log.d(TAG, "Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
//                    subscribeToTopic();
                    //publishMessage();
                } else {
                    Log.d(TAG, "Connected to: " + serverURI);

                }
                connectSuccess = true;

                subscribeToTopic();
            }

            @Override
            public void connectionLost(Throwable cause) {
                connectSuccess = false;
                Log.e(TAG, "The Connection was lost." + cause.getLocalizedMessage());

            }

            // THIS DOES NOT WORK!
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Log.d(TAG, "Incoming message: " +topic+ new String(message.getPayload()));

            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });


    }

連接遠程服務:

/**
     * 連接遠程服務
     */
    public void connectServer() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);

        try {
            //addToHistory("Connecting to " + serverUri);

            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    connectSuccess = true;
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.e(TAG, "Failed to connect to: " + serverUri);
                    exception.printStackTrace();
                    Log.d(TAG, "onFailure: " + exception.getCause());
                    connectSuccess = false;

                }
            });


        } catch (MqttException ex) {
            ex.printStackTrace();
        }
    }

獲取訂閱信息:

  /**
     *獲取訂閱信息
   */

public void connectGateway(String gatewayId, String userId) {
//獲取訂閱信息
        if (!subscriptionTopics.contains(gatewayId)) {
            subscriptionTopics.add(gatewayId);
        }
        Log.d(TAG, "pre sub topic: connect status=" + connectSuccess);
        Log.d(TAG, "subtopic " + subscriptionTopics);
        subscribeToTopic();
    }

  

訂閱mqtt消息:

/**
     * 訂閱mqtt消息
     */
    private void subscribeToTopic() {
        try {
            if(subscriptionTopics.size()==0)
                return;
            String[] topics = new String[subscriptionTopics.size()];
            subscriptionTopics.toArray(topics);
            int[] qoc = new int[topics.length];
            IMqttMessageListener[] mqttMessageListeners = new IMqttMessageListener[topics.length];
            for (int i = 0; i < topics.length; i++) {
                IMqttMessageListener mqttMessageListener = new IMqttMessageListener() {
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        // message Arrived!消息送達後做出的處理
                        Log.d(TAG, topic + " : " + new String(message.getPayload()));
                        handleReceivedMessage(new String(message.getPayload()), topic);
                    }
                };
                mqttMessageListeners[i] = mqttMessageListener;
                Log.d(TAG, "subscribeToTopic: qoc= " + qoc[i]);
            }
            mqttAndroidClient.subscribe(topics, qoc, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    Log.d(TAG, "Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    Log.d(TAG, "Failed to subscribe");
                }
            }, mqttMessageListeners);

        } catch (MqttException ex) {
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }

    }

處理收到的消息:

private void handleReceivedMessage(String message, String gatewayId) {
//可以發送一條廣播通知程序
}    

  

發送mqtt消息:

/**
     * 發送 mqtt 消息
     *
     * @param publishMessage 要發送的信息的 字符串
     */
    private void publishMessage(String publishMessage, String publishTopic) {
            try {
                publishTopic = userId + "/" + publishTopic;
                MqttMessage message = new MqttMessage();
                message.setPayload(publishMessage.getBytes());
                mqttAndroidClient.publish(publishTopic, message);
                Log.d(TAG, "publishMessage:Message Published \n" + publishTopic + ":" + message);
                if (!mqttAndroidClient.isConnected()) {
                    Log.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
                }
            } catch (MqttException e) {
                System.err.println("Error Publishing: " + e.getMessage());
                e.printStackTrace();
            }
    }

  

沒有封裝的類:

public class SubscribeClient {
    private final static String CONNECTION_STRING = "tcp://mqtt地址:mqtt端口";
    private final static boolean CLEAN_START = true;
    private final static short KEEP_ALIVE = 30;//低耗網絡,但是又需要及時獲取數據,心跳30s
    private final static String CLIENT_ID = "client1";
    private final static String[] TOPICS = {
            //訂閱信息
    };
    private final static int[] QOS_VALUES = {0, 0, 2, 0};

    private MqttClient mqttClient = null;

    public SubscribeClient(String i) {
        try {
            mqttClient = new MqttClient(CONNECTION_STRING);
            SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
            mqttClient.registerSimpleHandler(simpleCallbackHandler);//註冊接收消息方法
            mqttClient.connect(CLIENT_ID + i, CLEAN_START, KEEP_ALIVE);
            mqttClient.subscribe(TOPICS, QOS_VALUES);//訂閱接主題

            /**
             * 完成訂閱後,可以增加心跳,保持網絡通暢,也可以發布自己的消息
             */

            mqttClient.publish(PUBLISH_TOPICS, "keepalive".getBytes(), QOS_VALUES[0], true);

        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 簡單回調函數,處理client接收到的主題消息
     *
     * @author pig
     */
    class SimpleCallbackHandler implements MqttSimpleCallback {

        /**
         * 當客戶機和broker意外斷開時觸發
         * 可以再此處理重新訂閱
         */
        @Override
        public void connectionLost() throws Exception {
            // TODO Auto-generated method stub
            System.out.println("客戶機和broker已經斷開");
        }

        /**
         * 客戶端訂閱消息後,該方法負責回調接收處理消息
         */
        @Override
        public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {
            // TODO Auto-generated method stub
            System.out.println("訂閱主題: " + topicName);
            System.out.println("消息數據: " + new String(payload));
            System.out.println("消息級別(0,1,2): " + Qos);
            System.out.println("是否是實時發送的消息(false=實時,true=服務器上保留的最後消息): " + retained);
        }

    }

    /**
     * 高級回調
     *
     * @author pig
     */
    class AdvancedCallbackHandler implements MqttSimpleCallback {

        @Override
        public void connectionLost() throws Exception {
            // TODO Auto-generated method stub

        }

        @Override
        public void publishArrived(String arg0, byte[] arg1, int arg2,
                                   boolean arg3) throws Exception {
            // TODO Auto-generated method stub

        }

    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        new SubscribeClient("" + i);

    }

}

  

Android Mqtt 消息推送使用