使用Redis實現非同步訊息佇列
阿新 • • 發佈:2019-01-22
@Service public class EventConsumer implements InitializingBean, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); private Map<EventType, List<EventHandler>> config = new HashMap<>(); private ApplicationContext applicationContext; @Autowired private JedisAdapter jedisAdapter; @Override public void afterPropertiesSet() throws Exception { Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class); if (beans != null) { for (Map.Entry<String, EventHandler> entry : beans.entrySet()) { List<EventType> eventTypes = entry.getValue().getSupportEventTypes(); for (EventType type : eventTypes) { if (!config.containsKey(type)) { config.put(type, new ArrayList<EventHandler>()); } // 註冊每個事件的處理函式 config.get(type).add(entry.getValue()); } } } // 啟動執行緒去消費事件 Thread thread = new Thread(new Runnable() { @Override public void run() { // 從佇列一直消費 while (true) { String key = RedisKeyUtil.getEventQueueKey(); List<String> messages = jedisAdapter.brpop(0, key); // 第一個元素是佇列名字 for (String message : messages) { if (message.equals(key)) { continue; } EventModel eventModel = JSON.parseObject(message, EventModel.class); // 找到這個事件的處理handler列表 if (!config.containsKey(eventModel.getType())) { logger.error("不能識別的事件"); continue; } for (EventHandler handler : config.get(eventModel.getType())) { handler.doHandle(eventModel); } } } } }); thread.start(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
6.寫一個實現了EventHandler介面的實現
@Component public class LikeHandler implements EventHandler { @Autowired MessageService messageService; @Autowired UserService userService; @Override public void doHandle(EventModel model) { Message message = new Message(); User user = userService.getUser(model.getActorId()); message.setToId(model.getEntityOwnerId()); message.setContent("使用者" + user.getName() + " 讚了你的資訊,http://127.0.0.1:8080/news/" + String.valueOf(model.getEntityId())); // SYSTEM ACCOUNT message.setFromId(3); message.setCreatedDate(new Date()); messageService.addMessage(message); } @Override public List<EventType> getSupportEventTypes() { return Arrays.asList(EventType.LIKE); } }