使用MQTT協議+Redis快取實現APP登入頂號功能
大家在玩遊戲或使用QQ等IM工具時,想必都見到過彈出被頂號或者是您的賬號於xx時間在另一裝置登入,您已被迫下線這樣的提示,然後不得不點退出按鈕退出整個應用,或者點選重新登入把另一裝置再頂下來。最近我參與的一個專案,正好就有這樣的需求,而且,由於我們專案中已經使用到了MQTT協議進行訊息推送,實現遠端控制,後臺用Java實現,快取使用了Redis,因此,正好可以利用現有的技術來實現這個功能。
實現的思路大概如下:首先,登入時不僅需要賬號密碼,還可以將裝置關鍵資訊記錄下來,如裝置型號(Android|iPhone)、登入時間、登入IP、裝置唯一標識(UUID)等,這就需要前臺登入功能與後臺介面一起配合實現,並在後臺把userId已經相關裝置資訊儲存到Redis中,當在另外一臺新裝置上登入同一帳號時,將userId對應的相關登入裝置資訊直接進行覆蓋,此時如果舊裝置進行重連時,因為該uuid已經不是當前服務端的uuid了,所以直接返回下線通知,為了進行友好提示,也可以將新登入裝置的主要資訊(裝置型號、登入時間)進行返回。
下面簡單介紹一下實現的方法。
軟體安裝
Linux下mqtt伺服器Apollo的安裝
下載
建立broker
一個broker例項是一個資料夾,其中包含所有的配置檔案及執行時的資料,不如日誌和訊息數
據。Apollo強烈建議不要把例項同安裝檔案放在一起。在linux作業系統下面,建議將例項建在
/var/lib/目錄下面
首先解壓:tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
選擇一個目錄存放解壓後的檔案,我放在了/server/下,解壓後的資料夾為 apache-apollo-1.7.1
開始建立broker例項:
cd /var/lib
sudo /server/apache-apollo-1.7.1/bin/apollo create mybroker
下圖是Apache官方給的一些建議截圖:
啟動broker例項
啟動broker例項可以有兩種方法,如下圖中所示:
可以執行
/var/lib/mybroker/bin/apollo-broker run
或者
sudo ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start
使其作為一個service進行啟動,以後系統重啟後只需執行/etc/init.d/apollo-broker-service start
訪問Apollo的監控頁面: http://localhost:61680/ 預設使用者名稱、密碼為為 admin/password
Linux下Redis的安裝與配置
Redis的安裝非常簡單,已經有現成的Makefile檔案,解壓後在src目錄下使用make命令完成編譯即可,redis-benchmark、redis-cli、redis-server、redis-stat 這四個檔案,加上一個 redis.conf 就構成了整個redis的最終可用包。它們的作用如下:
redis-server:Redis伺服器的daemon啟動程式
redis-cli:Redis命令列操作工具。當然,你也可以用telnet根據其純文字協議來操作
redis-benchmark:Redis效能測試工具,測試Redis在你的系統及你的配置下的讀寫效能
redis-stat:Redis狀態檢測工具,可以檢測Redis當前狀態引數及延遲狀況
下載安裝:
wget http://download.redis.io/redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make
make install
啟動
編譯後生成的可執行檔案:
redis-server 是Redis的伺服器,啟動Redis即執行redis-server
redis-cli 是Redis自帶的Redis命令列客戶端,學習Redis的重要工具
./redis-server & 不指定配置直接執行,這時採用預設配置,無密碼
./redis-server –port 6379 僅指定埠
./redis-server ../redis.conf 指定配置檔案
最好還是使用最後一種方式進行啟動
如果只是在本機連線,那麼使用預設配置檔案不會有什麼問題,但是,如果是連線遠端伺服器端的Redis,則需要對配置檔案進行一些修改:
requirepass foobared
#bind 127.0.0.1 ##註釋掉
protected-mode no ##從yes改成no
至於如何將Redis設定後臺服務,開機自啟等,這裡就不介紹了,可以去搜索一下。
功能實現
後臺介面
Redis客戶端使用的是Jedis,如下程式碼是一個對Jedis簡單的封裝
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
import java.util.ResourceBundle;
/**
* Jedis Cache 工具類
*/
public class JedisUtils {
private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);
private static JedisPool jedisPool;
/**
* 讀取相關的配置
*/
static {
ResourceBundle resourceBundle = ResourceBundle.getBundle("redis");
int maxActive = Integer.parseInt(resourceBundle.getString("redis.pool.maxActive"));
int maxIdle = Integer.parseInt(resourceBundle.getString("redis.pool.maxIdle"));
int maxWait = Integer.parseInt(resourceBundle.getString("redis.pool.maxWait"));
int port = Integer.parseInt(resourceBundle.getString("redis.port"));
int timeout = Integer.parseInt(resourceBundle.getString("redis.timeout"));
String ip = resourceBundle.getString("redis.ip");
String auth = resourceBundle.getString("redis.auth");
JedisPoolConfig config = new JedisPoolConfig();
//設定最大連線數
config.setMaxTotal(maxActive);
//設定最大空閒數
config.setMaxIdle(maxIdle);
//設定超時時間
config.setMaxWaitMillis(maxWait);
//初始化連線池
jedisPool = new JedisPool(config, ip, port, timeout, auth);
}
/**
* 獲取快取
* @param key 鍵
* @return 值
*/
public static String get(String key) {
String value = null;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)) {
value = jedis.get(key);
value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
logger.debug("get {} = {}", key, value);
}
} catch (Exception e) {
logger.warn("get {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return value;
}
/**
* 設定快取
* @param key 鍵
* @param value 值
* @param cacheSeconds 超時時間,0為不超時
* @return
*/
public static String set(String key, String value, int cacheSeconds) {
String result = null;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.set(key, value);
if (cacheSeconds != 0) {
jedis.expire(key, cacheSeconds);
}
logger.debug("set {} = {}", key, value);
} catch (Exception e) {
logger.warn("set {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 刪除快取
* @param key 鍵
* @return
*/
public static long del(String key) {
long result = 0;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)){
result = jedis.del(key);
logger.debug("del {}", key);
}else{
logger.debug("del {} not exists", key);
}
} catch (Exception e) {
logger.warn("del {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 快取是否存在
* @param key 鍵
* @return
*/
public static boolean exists(String key) {
boolean result = false;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.exists(key);
logger.debug("exists {}", key);
} catch (Exception e) {
logger.warn("exists {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 獲取資源
* @return
* @throws JedisException
*/
public static Jedis getResource() throws JedisException {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (JedisException e) {
logger.warn("getResource.", e);
returnBrokenResource(jedis);
throw e;
}
return jedis;
}
/**
* 歸還資源
* @param jedis
*/
public static void returnBrokenResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnBrokenResource(jedis);
}
}
/**
* 釋放資源
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnResource(jedis);
}
}
}
然後在登入介面中,當判斷完登入的使用者名稱密碼正確後,可以參考如下程式碼的思路去實現,首先判斷Redis中是否已儲存有這個userId對用的值,有的話說明當前已經有登入,需要被替換到,同時使用MQTT傳送訊息給客戶端使其退出,Redis中不存在則只需儲存userId和uuidStr即可
String uuidStr = ""; //這個值從APP端傳過來
// 先判斷Redis中是否已經有,有的話需要替換掉
if(JedisUtils.get(userId) != null && !JedisUtils .get(userId).equals(uuidStr)) {
MqttClient client = MyMqttClient.getInstance();
String topic = "TOPIC/LOGIN_LOGOUT";
client.subscribe(topic, 1);
MyMqttClient.sendMessage("Log out", topic);
client.unsubscribe(topic);
}
JedisUtils.set(userId, uuidStr, 0);
至於MQTT協議的實現,這裡使用的是Paho,如果後臺專案是使用Maven構建的話,在pom.xml中加入如下幾行即可:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
然後對其進行了一個簡單的封裝
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyMqttClient {
private MyMqttClient() {}
private static MqttClient mqttClientInstance = null;
private static MqttConnectOptions options;
//靜態工廠方法
public static synchronized MqttClient getInstance() {
try {
if (mqttClientInstance == null) {
mqttClientInstance = new MqttClient("tcp://125.216.242.151:61613",
MqttClient.generateClientId(), new MemoryPersistence());
options = new MqttConnectOptions();
//設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線
options.setCleanSession(true);
//設定連線的使用者名稱
options.setUserName("admin");
//設定連線的密碼
options.setPassword("password".toCharArray());
// 設定超時時間 單位為秒
options.setConnectionTimeout(10);
// 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
mqttClientInstance.connect(options);
}
return mqttClientInstance;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void sendMessage(String content, String myTopic) {
MqttTopic topic = getInstance().getTopic(myTopic);
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
message.setPayload(content.getBytes());
try {
MqttDeliveryToken token = topic.publish(message);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static MqttConnectOptions getOptions(){
return options;
}
}
app端
客戶端的做法思路也很簡單,由於使用了MQTT,因此客戶端和伺服器端其實已經保持了一個長連線,可以為客戶端寫一個MQTTService,隨時監聽伺服器推送過來的訊息進行處理
//為MTQQ client設定回撥
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//連線丟失後,一般在這裡面進行重連
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish後會執行到這裡
}
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
if(message.toString().equals("Log out")) {
handler.post(new Runnable() {
@Override
public void run() {
AlertDialog.Builder builder = new AlertDialog.Builder(getApplicationContext());
builder.setMessage("被頂號了");
builder.setNegativeButton("退出", new DialogInterface.OnClickListener() {
@Override
public void onClick(DialogInterface dialog, int which) {
// TODO 退出當前賬號,在這裡簡單粗暴的結束了應用
stopSelf();
android.os.Process.killProcess(android.os.Process.myPid());
}
});
Dialog dialog = builder.create();
dialog.setCanceledOnTouchOutside(false);
dialog.getWindow().setType(WindowManager.LayoutParams.TYPE_SYSTEM_ALERT);
dialog.show();
}
});
}
}
});
總結
上述程式碼可能在嚴謹性和可靠性上還會存在一些問題,還需要經過不斷的完善,但思路是很明確的。在這裡尤其要安利一下MTQQ,現在越來越多的產品都是基於這個協議進行開發,進行訊息推送等。它開銷很小,支援各種流行程式語言,能夠適應不穩定的網路傳輸需求,在未來幾年,相信MQTT的應用會越來越廣。