1. 程式人生 > >使用MQTT協議+Redis快取實現APP登入頂號功能

使用MQTT協議+Redis快取實現APP登入頂號功能

大家在玩遊戲或使用QQ等IM工具時,想必都見到過彈出被頂號或者是您的賬號於xx時間在另一裝置登入,您已被迫下線這樣的提示,然後不得不點退出按鈕退出整個應用,或者點選重新登入把另一裝置再頂下來。最近我參與的一個專案,正好就有這樣的需求,而且,由於我們專案中已經使用到了MQTT協議進行訊息推送,實現遠端控制,後臺用Java實現,快取使用了Redis,因此,正好可以利用現有的技術來實現這個功能。

實現的思路大概如下:首先,登入時不僅需要賬號密碼,還可以將裝置關鍵資訊記錄下來,如裝置型號(Android|iPhone)、登入時間、登入IP、裝置唯一標識(UUID)等,這就需要前臺登入功能與後臺介面一起配合實現,並在後臺把userId已經相關裝置資訊儲存到Redis中,當在另外一臺新裝置上登入同一帳號時,將userId對應的相關登入裝置資訊直接進行覆蓋,此時如果舊裝置進行重連時,因為該uuid已經不是當前服務端的uuid了,所以直接返回下線通知,為了進行友好提示,也可以將新登入裝置的主要資訊(裝置型號、登入時間)進行返回。

下面簡單介紹一下實現的方法。

軟體安裝

Linux下mqtt伺服器Apollo的安裝

下載

建立broker

一個broker例項是一個資料夾,其中包含所有的配置檔案及執行時的資料,不如日誌和訊息數
據。Apollo強烈建議不要把例項同安裝檔案放在一起。在linux作業系統下面,建議將例項建在
/var/lib/目錄下面

首先解壓:tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
選擇一個目錄存放解壓後的檔案,我放在了/server/下,解壓後的資料夾為 apache-apollo-1.7.1

開始建立broker例項:

cd /var/lib
sudo /server/apache-apollo-1.7.1/bin/apollo create mybroker

下圖是Apache官方給的一些建議截圖:

這裡寫圖片描述

啟動broker例項

啟動broker例項可以有兩種方法,如下圖中所示:

這裡寫圖片描述

可以執行

/var/lib/mybroker/bin/apollo-broker run

或者

sudo ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start

使其作為一個service進行啟動,以後系統重啟後只需執行/etc/init.d/apollo-broker-service start

訪問Apollo的監控頁面: http://localhost:61680/ 預設使用者名稱、密碼為為 admin/password

Linux下Redis的安裝與配置

Redis的安裝非常簡單,已經有現成的Makefile檔案,解壓後在src目錄下使用make命令完成編譯即可,redis-benchmark、redis-cli、redis-server、redis-stat 這四個檔案,加上一個 redis.conf 就構成了整個redis的最終可用包。它們的作用如下:

redis-server:Redis伺服器的daemon啟動程式
redis-cli:Redis命令列操作工具。當然,你也可以用telnet根據其純文字協議來操作
redis-benchmark:Redis效能測試工具,測試Redis在你的系統及你的配置下的讀寫效能
redis-stat:Redis狀態檢測工具,可以檢測Redis當前狀態引數及延遲狀況

下載安裝:

wget http://download.redis.io/redis-stable.tar.gz  
tar xzf redis-stable.tar.gz  
cd redis-stable  
make
make install

啟動

編譯後生成的可執行檔案:
redis-server 是Redis的伺服器,啟動Redis即執行redis-server
redis-cli 是Redis自帶的Redis命令列客戶端,學習Redis的重要工具

./redis-server & 不指定配置直接執行,這時採用預設配置,無密碼
./redis-server –port 6379 僅指定埠
./redis-server ../redis.conf 指定配置檔案

最好還是使用最後一種方式進行啟動

如果只是在本機連線,那麼使用預設配置檔案不會有什麼問題,但是,如果是連線遠端伺服器端的Redis,則需要對配置檔案進行一些修改:

requirepass foobared
#bind 127.0.0.1    ##註釋掉
protected-mode no  ##從yes改成no

至於如何將Redis設定後臺服務,開機自啟等,這裡就不介紹了,可以去搜索一下。

功能實現

後臺介面

Redis客戶端使用的是Jedis,如下程式碼是一個對Jedis簡單的封裝

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;

import java.util.ResourceBundle;

/**
 * Jedis Cache 工具類
 */
public class JedisUtils {

    private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);

    private static JedisPool jedisPool;

    /**
     *  讀取相關的配置
     */
    static {
        ResourceBundle resourceBundle = ResourceBundle.getBundle("redis");
        int maxActive = Integer.parseInt(resourceBundle.getString("redis.pool.maxActive"));
        int maxIdle = Integer.parseInt(resourceBundle.getString("redis.pool.maxIdle"));
        int maxWait = Integer.parseInt(resourceBundle.getString("redis.pool.maxWait"));
        int port = Integer.parseInt(resourceBundle.getString("redis.port"));
        int timeout = Integer.parseInt(resourceBundle.getString("redis.timeout"));
        String ip = resourceBundle.getString("redis.ip");
        String auth = resourceBundle.getString("redis.auth");

        JedisPoolConfig config = new JedisPoolConfig();
        //設定最大連線數
        config.setMaxTotal(maxActive);
        //設定最大空閒數
        config.setMaxIdle(maxIdle);
        //設定超時時間
        config.setMaxWaitMillis(maxWait);

        //初始化連線池
        jedisPool = new JedisPool(config, ip, port, timeout, auth);
    }

    /**
     * 獲取快取
     * @param key 鍵
     * @return 值
     */
    public static String get(String key) {
        String value = null;
        Jedis jedis = null;
        try {
            jedis = getResource();
            if (jedis.exists(key)) {
                value = jedis.get(key);
                value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
                logger.debug("get {} = {}", key, value);
            }
        } catch (Exception e) {
            logger.warn("get {} = {}", key, value, e);
        } finally {
            returnResource(jedis);
        }
        return value;
    }


    /**
     * 設定快取
     * @param key 鍵
     * @param value 值
     * @param cacheSeconds 超時時間,0為不超時
     * @return
     */
    public static String set(String key, String value, int cacheSeconds) {
        String result = null;
        Jedis jedis = null;
        try {
            jedis = getResource();
            result = jedis.set(key, value);
            if (cacheSeconds != 0) {
                jedis.expire(key, cacheSeconds);
            }
            logger.debug("set {} = {}", key, value);
        } catch (Exception e) {
            logger.warn("set {} = {}", key, value, e);
        } finally {
            returnResource(jedis);
        }
        return result;
    }


    /**
     * 刪除快取
     * @param key 鍵
     * @return
     */
    public static long del(String key) {
        long result = 0;
        Jedis jedis = null;
        try {
            jedis = getResource();
            if (jedis.exists(key)){
                result = jedis.del(key);
                logger.debug("del {}", key);
            }else{
                logger.debug("del {} not exists", key);
            }
        } catch (Exception e) {
            logger.warn("del {}", key, e);
        } finally {
            returnResource(jedis);
        }
        return result;
    }


    /**
     * 快取是否存在
     * @param key 鍵
     * @return
     */
    public static boolean exists(String key) {
        boolean result = false;
        Jedis jedis = null;
        try {
            jedis = getResource();
            result = jedis.exists(key);
            logger.debug("exists {}", key);
        } catch (Exception e) {
            logger.warn("exists {}", key, e);
        } finally {
            returnResource(jedis);
        }
        return result;
    }


    /**
     * 獲取資源
     * @return
     * @throws JedisException
     */
    public static Jedis getResource() throws JedisException {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
        } catch (JedisException e) {
            logger.warn("getResource.", e);
            returnBrokenResource(jedis);
            throw e;
        }
        return jedis;
    }

    /**
     * 歸還資源
     * @param jedis
     */
    public static void returnBrokenResource(Jedis jedis) {
        if (jedis != null) {
            jedisPool.returnBrokenResource(jedis);
        }
    }

    /**
     * 釋放資源
     * @param jedis
     */
    public static void returnResource(Jedis jedis) {
        if (jedis != null) {
            jedisPool.returnResource(jedis);
        }
    }
}

然後在登入介面中,當判斷完登入的使用者名稱密碼正確後,可以參考如下程式碼的思路去實現,首先判斷Redis中是否已儲存有這個userId對用的值,有的話說明當前已經有登入,需要被替換到,同時使用MQTT傳送訊息給客戶端使其退出,Redis中不存在則只需儲存userId和uuidStr即可

String uuidStr = "";  //這個值從APP端傳過來

// 先判斷Redis中是否已經有,有的話需要替換掉
if(JedisUtils.get(userId) != null && !JedisUtils .get(userId).equals(uuidStr)) {
    MqttClient client = MyMqttClient.getInstance();
    String topic = "TOPIC/LOGIN_LOGOUT";
    client.subscribe(topic, 1);
    MyMqttClient.sendMessage("Log out", topic);
    client.unsubscribe(topic);
}

JedisUtils.set(userId, uuidStr, 0);    

至於MQTT協議的實現,這裡使用的是Paho,如果後臺專案是使用Maven構建的話,在pom.xml中加入如下幾行即可:

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.0</version>
</dependency>

然後對其進行了一個簡單的封裝

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttClient {

    private MyMqttClient() {}

    private static MqttClient mqttClientInstance = null;

    private static MqttConnectOptions options;

    //靜態工廠方法
    public static synchronized MqttClient getInstance() {
        try {
            if (mqttClientInstance == null) {
                mqttClientInstance = new MqttClient("tcp://125.216.242.151:61613",
                        MqttClient.generateClientId(), new MemoryPersistence());

                options = new MqttConnectOptions();
                //設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線
                options.setCleanSession(true);
                //設定連線的使用者名稱
                options.setUserName("admin");
                //設定連線的密碼
                options.setPassword("password".toCharArray());
                // 設定超時時間 單位為秒
                options.setConnectionTimeout(10);
                // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制
                options.setKeepAliveInterval(20);

                mqttClientInstance.connect(options);
            }

            return mqttClientInstance;

        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
    }

    public static void sendMessage(String content, String myTopic) {
        MqttTopic topic = getInstance().getTopic(myTopic);
        MqttMessage message = new MqttMessage();
        message.setQos(1);
        message.setRetained(false);
        message.setPayload(content.getBytes());

        try {
            MqttDeliveryToken token = topic.publish(message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public static MqttConnectOptions getOptions(){
        return options;
    }

}

app端

客戶端的做法思路也很簡單,由於使用了MQTT,因此客戶端和伺服器端其實已經保持了一個長連線,可以為客戶端寫一個MQTTService,隨時監聽伺服器推送過來的訊息進行處理

//為MTQQ client設定回撥
client.setCallback(new MqttCallback() {

    @Override
    public void connectionLost(Throwable cause) {
        //連線丟失後,一般在這裡面進行重連
    }


    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //publish後會執行到這裡
    }

    @Override
    public void messageArrived(String topicName, MqttMessage message) throws Exception {

        if(message.toString().equals("Log out")) {

            handler.post(new Runnable() {
                @Override
                public void run() {
                    AlertDialog.Builder builder = new AlertDialog.Builder(getApplicationContext());
                    builder.setMessage("被頂號了");
                    builder.setNegativeButton("退出", new DialogInterface.OnClickListener() {
                        @Override
                        public void onClick(DialogInterface dialog, int which) {
                            // TODO 退出當前賬號,在這裡簡單粗暴的結束了應用
                            stopSelf();
                            android.os.Process.killProcess(android.os.Process.myPid());
                        }
                    });
                    Dialog dialog = builder.create();
                    dialog.setCanceledOnTouchOutside(false);
                    dialog.getWindow().setType(WindowManager.LayoutParams.TYPE_SYSTEM_ALERT);
                    dialog.show();
                }
            });
        }
    }
});

總結

上述程式碼可能在嚴謹性和可靠性上還會存在一些問題,還需要經過不斷的完善,但思路是很明確的。在這裡尤其要安利一下MTQQ,現在越來越多的產品都是基於這個協議進行開發,進行訊息推送等。它開銷很小,支援各種流行程式語言,能夠適應不穩定的網路傳輸需求,在未來幾年,相信MQTT的應用會越來越廣。