SpringBoot(9) 基於Redis訊息佇列實現非同步操作
阿新 • • 發佈:2019-01-03
什麼是訊息佇列?
所謂訊息佇列,就是一個以佇列資料結構為基礎的一個真實存在的實體,如陣列,redis中的佇列集合等等,都可以。為什麼要使用佇列?
主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導致無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。通過使用訊息佇列,我們可以非同步處理請求,從而緩解系統的壓力。比如說點贊這個功能,這個在高併發的情況下,很容易造成資料庫連線數佔滿,到時整個網站響應緩慢,才是就是想到要解決資料庫的壓力問題,一般就是兩種方案,一是提高資料庫本身的能力(如增加連線數,讀寫分離等),但是資料庫總是有極限的,到達了極限是沒有辦法在提升了的,此時就要考慮第二種方案,釋放資料庫的壓力,將壓力轉移到快取裡面。就拿實際的點贊來說吧,使用者的點贊請求到來,我只是將點贊請求投遞到訊息佇列裡面,後續的點贊請求可以將訊息合併,即只更新點贊數,不產生新的任務,此時有個進行再不斷的輪訓訊息佇列,將點贊訊息消耗,並將值更新到資料庫裡面,這樣就有效的降低了資料庫的壓力,因為在快取層將數個數據庫更新請求合併成一個,大大提高了效率,降低了負載。
Redis實現的訊息佇列
生產者消費模式會讓一個或者多個客戶端監聽訊息佇列,一旦訊息到達,消費者馬上消費,誰先搶到算誰的,如果佇列裡沒有訊息,則消費者繼續監聽。 其實在生產者消費模式中生產者是一堆執行緒,消費者是另一堆執行緒,記憶體緩衝區可以使用List陣列佇列,資料型別只需要定義一個簡單的類就好。關鍵是如何處理多執行緒之間的協作。
釋出訂閱者模式也是一個或多個客戶端訂閱訊息頻道,只要釋出者釋出訊息,所有訂閱者都能收到訊息,訂閱者都是平等的。
這裡使用的是生產者消費模式。
基於Redis的訊息佇列實現的非同步操作原理圖如下:
具體的實現程式碼如下:
1.首先定義事件的型別,使用列舉類,便於取出各種事件
package com.springboot.springboot.async; /** * @author WilsonSong * @date 2018/6/3 * 列舉類,就是事件的各種型別 */ public enum EventType { LIKE(0), COMMENT(1), LOGIN(2),MAIL(3); private int value; EventType(int value){ this.value = value; } public int getValue(){ return value; } }
2.定義事件的具體實現類
類裡面很多的實現方法都是返回的是EventModel這個類,是為了以後點讚的時候能夠鏈式的取出與這個事件相關的引數
package com.springboot.springboot.async;
import java.util.HashMap;
import java.util.Map;
/**
* @author WilsonSong
* @date 2018/6/3
* 不同的事件肯定是有不同的型別的
*/
public class EventModel {
//例如,有人評論了一個問題,那type就是評論, actorId就是誰評論的,
// entityId和entityType就是評論的是那個問題,entityOwnerId就是那個問題關聯的物件
private EventType type; //事件的型別
private int actorId; //事件的觸發者
private int entityType; //觸發事件的載體
private int entityId; //和entityType組合成觸發事件的載體 可以使任何一個實體的id,問題,評論,使用者,站內信等等
private int entityOwnerId; //載體關聯的物件,當我們給一個人點贊時,系統要給那個人(也就是entityOwnerId)傳送一個站內信,通知那個人他被點讚了。
public EventModel(){
}
public EventModel(EventType type){
this.type = type;
}
//定義可擴充套件的欄位
private Map<String, String> exts = new HashMap<>();
public EventModel setExts(String key, String value){
exts.put(key,value);
return this;
}
public String getExts(String key){
return exts.get(key);
}
public EventType getType() {
return type;
}
//為了能夠實現鏈狀的設定
public EventModel setType(EventType type) {
this.type = type;
return this; //這個就是為了實現這個xxx.setType().setXX();
}
public int getActorId() {
return actorId;
}
public EventModel setActorId(int actorId) {
this.actorId = actorId;
return this;
}
public int getEntityType() {
return entityType;
}
public EventModel setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityId() {
return entityId;
}
public EventModel setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityOwnerId() {
return entityOwnerId;
}
public EventModel setEntityOwnerId(int entityOwnerId) {
this.entityOwnerId = entityOwnerId;
return this;
}
public Map<String, String> getExts() {
return exts;
}
public EventModel setExts(Map<String, String> exts) {
this.exts = exts;
return this;
}
}
3.EventProducer的實現--生產者,作用是把事件分發到佇列中
package com.springboot.springboot.async;
import com.alibaba.fastjson.JSONObject;
import com.springboot.springboot.utils.JedisAdapter;
import com.springboot.springboot.utils.RedisKeyUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author WilsonSong
* @date 2018/6/3
* 事件的入口,用來統一分發事件,就是在佇列中插入
*/
@Service
public class EventProducer {
@Autowired
JedisAdapter jedisAdapter;
//把事件分發出去 EventProducer
public boolean fireEvent(EventModel eventModel){
try{
//序列化,將EventModel 轉換WieJSON的字串
String json = JSONObject.toJSONString(eventModel);
String key = RedisKeyUtil.getEventQueueKey();
jedisAdapter.lpush(key, json);
return true;
}catch (Exception e){
return false;
}
}
//事件的取出與消費
}
4、Redis的統一封裝 --佇列
因為這裡是基於Redis的佇列實現非同步操作,需要對Redis的一些函式重新封裝,並與redis快取進行資料互動
package com.springboot.springboot.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.springboot.springboot.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import redis.clients.jedis.BinaryClient;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
import java.util.List;
/**
* @author WilsonSong
* @date 2018/6/1
*/
@Service
public class JedisAdapter implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(JedisAdapter.class);
private JedisPool pool;
public static void print(int index, Object object) {
System.out.println(String.format("%d, %s", index, object.toString()));
}
@Override
public void afterPropertiesSet() throws Exception {
pool = new JedisPool("redis://localhost:6379/10");
}
//增加
public long sadd(String key, String value) {
Jedis jedis = null;
try {
jedis = pool.getResource();
return jedis.sadd(key, value);
} catch (Exception e) {
logger.error("Redis新增資料異常" + e.getMessage());
} finally {
if (jedis != null) {
jedis.close();
}
}
return 0;
}
public long srem(String key, String value){
Jedis jedis = null;
try{
jedis = pool.getResource();
return jedis.srem(key,value);
}catch (Exception e){
logger.error("Redis刪除資料異常");
}finally {
if (jedis != null){
jedis.close();
}
}
return 0;
}
//查詢數量
public long scard(String key){
Jedis jedis = null;
try{
jedis = pool.getResource();
return jedis.scard(key);
}catch (Exception e){
logger.error("Redis統計數量異常" + e.getMessage());
}finally {
if (jedis != null){
jedis.close();
}
}
return 0;
}
public boolean sismember(String key, String value){
Jedis jedis = null;
try{
jedis = pool.getResource();
return jedis.sismember(key,value);
}catch (Exception e){
logger.error("Redis查詢異常" + e.getMessage());
}finally {
if (jedis != null){
jedis.close();
}
}
return false;
}
public long lpush(String key, String value){
Jedis jedis = null;
try {
jedis = pool.getResource();
return jedis.lpush(key,value);
}catch (Exception e){
logger.error("Redis佇列新增異常");
}finally {
if (jedis != null){
jedis.close();
}
}
return 0;
}
public List<String> brpop(int timeout, String key){
Jedis jedis = null;
try{
jedis = pool.getResource();
return jedis.brpop(timeout,key);
}catch (Exception e){
logger.error("Redis佇列彈出資料異常");
}finally {
if (jedis != null){
jedis.close();
}
}
return null;
}
因為Redis是key--value的模式,每一個事件都應該有與其對應的key,為了統一管理並且不產生混淆,定義統一的key的生成
package com.springboot.springboot.utils;
/**
* @author WilsonSong
* @date 2018/6/2
* 為了防止生成的key有衝突
*/
public class RedisKeyUtil {
private static String SPLIT = ":";
private static String BIZ_LIKE = "LIKE";
private static String BIZ_DISLIKE = "DISLIKE";
private static String BIZ_EVENTQUEUE = "EVENTQUEUE";
//獲取點讚的key
public static String getLikeKey(int entityType, int entityId){
return BIZ_LIKE + SPLIT + String.valueOf(entityType) + SPLIT +String.valueOf(entityId);
}
//獲取點踩的key
public static String getDislikeKey(int entityType, int entityId){
return BIZ_DISLIKE +SPLIT + String.valueOf(entityType) + SPLIT + String.valueOf(entityId);
}
public static String getEventQueueKey(){
return BIZ_EVENTQUEUE;
}
}
5.EventHandler介面
在消費者與事件之間寫一個handler的介面,實現Consumer和handler之間的互動,因為消費者就是找到哪些EventHandler對當前的事件感興趣
package com.springboot.springboot.async;
import java.util.List;
/**
* @author WilsonSong
* @date 2018/6/3
* 用來處理事件的,誰關心這個事件,誰來做這個事件
*/
public interface EventHandler {
void doHander(EventModel model); //誰來處理事件
List<EventType> getSupportEventTypes(); //有哪些關心這些事件的
}
6. EventConsumer的實現---消費者
建立一個型別為Map<EventType, List<EventHandler>>的map,用於存放所有的Handler,然後將所有的事件註冊到config中,即通過applicationContext獲取實現了EventHandler介面的全部Handler。
啟動執行緒去不斷的去佇列中查詢事件並用brpop把事件拉出來,通過序列化和反序列化將取出的JSON轉化為EventModel,尋找是否有能處理EventModel的Handler,呼叫每一個對該事件感興趣的EventType的doHandle方法去處理事件
package com.springboot.springboot.async;
import com.alibaba.fastjson.JSON;
import com.springboot.springboot.utils.JedisAdapter;
import com.springboot.springboot.utils.RedisKeyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author WilsonSong
* @date 2018/6/3
* 處理佇列中的事件並與各個handler溝通
* InitializingBean介面的作用在spring 初始化後,執行完所有屬性設定方法(即setXxx)將
* 自動呼叫 afterPropertiesSet(), 在配置檔案中無須特別的配置
*/
@Service
public class EventConsumer implements InitializingBean,ApplicationContextAware{
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
private Map<EventType, List<EventHandler>> config = new HashMap<>();
private ApplicationContext applicationContext; //sping的上下文
@Autowired
JedisAdapter jedisAdapter;
//這個方法將在所有的屬性被初始化後呼叫
@Override
public void afterPropertiesSet() throws Exception {
//獲取現在有多少個eventHandler初始化了
Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
if (beans != null){
for (Map.Entry<String,EventHandler> entry : beans.entrySet()){
List<EventType> eventTypes = entry.getValue().getSupportEventTypes(); //找到那些handler對當前的事件感興趣
for (EventType type : eventTypes){
if (!config.containsKey(type)){ //有可能是第一次註冊這個事件,所以就可能初始的時候是null
//把handler放到config中
config.put(type, new ArrayList<EventHandler>()); //把event註冊到config中
}
config.get(type).add(entry.getValue()); //把對這些event感興趣的handler新增到config中
}
}
}
//開啟執行緒去找佇列中的事件
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true){ //一直取
String key = RedisKeyUtil.getEventQueueKey();
List<String> events = jedisAdapter.brpop(0,key); //若佇列中沒有這個事件的話就一直等待
for (String message : events){
if (message.equals(key)){ //返回的第一個值可能是key,把他先過濾掉,取後面的event
continue;
}
//通過JSon的方式反序列化
EventModel eventModel = JSON.parseObject(message,EventModel.class);
if (!config.containsKey(eventModel.getType())){ //是不是有對這個事件有處理的handler
logger.error("不能識別的事件");
continue;
}
for (EventHandler handler : config.get(eventModel.getType())){
handler.doHander(eventModel);
}
}
}
}
});
thread.start();
}
//將config中所有的配置的介面
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
7.處理具體事件的具體的XXXhandler
例如這裡寫的點讚的handler
package com.springboot.springboot.async.handler;
import com.springboot.springboot.async.EventHandler;
import com.springboot.springboot.async.EventModel;
import com.springboot.springboot.async.EventType;
import com.springboot.springboot.model.Message;
import com.springboot.springboot.model.User;
import com.springboot.springboot.service.MessageService;
import com.springboot.springboot.service.userService;
import com.springboot.springboot.utils.WendaUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* @author WilsonSong
* @date 2018/6/4
* 處理點贊事件的handler
*/
@Component //就是把普通的物件在spring容器中初始化
public class LikeHandler implements EventHandler {
@Autowired
MessageService messageService;
@Autowired
userService uService;
@Override
public void doHander(EventModel model) {
Message message = new Message();
message.setFrom_id(WendaUtil.SYSTEMCONTROLLER_userId); //以系統管理員的額身份給你發訊息說誰給你點了贊
message.setTo_id(model.getEntityOwnerId()); //發給誰,就是那個entity擁有者的id
message.setCreated_date(new Date());
User user = uService.getUser(model.getActorId()); //觸發這個事件的使用者id
message.setContent("使用者" + user.getName() + "讚了你的評論,http://127.0.0.1:8080/question" + model.getExts("questionId"));
message.setConversationId(message.getConversationId());
messageService.addMessage(message);
}
@Override
public List<EventType> getSupportEventTypes() {
return Arrays.asList(EventType.LIKE); //只需要返回點讚的事件即可
}
}