使用jedis實現Redis訊息佇列(MQ)的釋出(publish)和訊息監聽(subscribe)
前言:
本文基於jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar
其中jedis連線池需要依賴commons-pool2包,json包用於物件例項和json字串的相互轉換
1、jedis的訊息佇列方法簡述
1.1、釋出訊息方法
(其中,channel是對應訊息通道,message是對應訊息體)
jedis.publish(channel, message);
1.2、監聽訊息方法
(其中,jedisPubSub用於處理監聽到的訊息,channels是對應的通道)
jedis.subscribe(jedisPubSub, channels);
2、釋出訊息
/** * 從jedis連線池獲取jedis操作例項 * @return */ public static Jedis getJedis() { return RedisPoolManager.getJedis(); } /** * 推入訊息到redis訊息通道 * * @param String * channel * @param String * message */ public static void publish(String channel, String message) { Jedis jedis = null; try { jedis = getJedis(); jedis.publish(channel, message); } finally { jedis.close(); } } /** * 推入訊息到redis訊息通道 * * @param byte[] * channel * @param byte[] * message */ public void publish(byte[] channel, byte[] message) { Jedis jedis = null; try { jedis = getJedis(); jedis.publish(channel, message); } finally { jedis.close(); } }
3、監聽訊息
3.1、監聽訊息主體方法
/** * 監聽訊息通道 * @param jedisPubSub - 監聽任務 * @param channels - 要監聽的訊息通道 */ public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { Jedis jedis = null; try { jedis = getJedis(); jedis.subscribe(jedisPubSub, channels); } finally { jedis.close(); } } /** * 監聽訊息通道 * @param jedisPubSub - 監聽任務 * @param channels - 要監聽的訊息通道 */ public static void subscribe(JedisPubSub jedisPubSub, String... channels) { Jedis jedis = null; try { jedis = getJedis(); jedis.subscribe(jedisPubSub, channels); } finally { jedis.close(); } }
3.2、處理監聽到的訊息任務
class Tasker implements Runnable {
private String[] channel = null;//監聽的訊息通道
private JedisPubSub jedisPubSub = null;//訊息處理任務
public Tasker(JedisPubSub jedisPubSub, String ...channel) {
this.jedisPubSub = jedisPubSub;
this.channel = channel;
}
@Override
public void run() {
// 監聽channel通道的訊息
RedisMQ.subscribe(jedisPubSub, channel);
}
}
3.3、處理監聽到的訊息主體類實現
package cn.eguid.livePushServer.redisManager;
import java.util.Map;
import org.json.JSONObject;
import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub;
public class RedisMQHandler extends JedisPubSub{
PushManager pushManager = null;
public RedisMQHandler(PushManager pushManager) {
super();
this.pushManager = pushManager;
}
@Override
// 接收到訊息後進行分發執行
public void onMessage(String channel, String message) {
JSONObject jsonObj = new JSONObject(message);
System.out.println(channel+","+message);
if ("push".equals(channel)) {
Map<String,Object> map=jsonObj.toMap();
System.out.println("接收到一條推流訊息,準備推流:"+map);
// String appName=pushManager.push(map);
//推流完成後還需要釋出一個成功訊息到返回佇列
} else if ("close".equals(channel)) {
String appName=jsonObj.getString("appName");
System.out.println("接收到一條關閉訊息,準備關閉應用:"+appName);
// pushManager.closePush(appName);
}
}
}
4、測試訊息佇列釋出和監聽
public static void main(String[] args) throws InterruptedException {
PushManager pushManager= new PushManagerImpl();
Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
t1.start();
t2.start();
LivePushEntity livePushInfo=new LivePushEntity();
livePushInfo.setAppName("test1");
JSONObject json=new JSONObject(livePushInfo);
publish("push",json.toString());
publish("close", json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
}
相關推薦
使用jedis實現Redis訊息佇列(MQ)的釋出(publish)和訊息監聽(subscribe)
前言: 本文基於jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar 其中jedis連線池需要依賴commons-pool2包,json
訊息佇列MQ, rabbitMQ和rocketMQ的實現方式
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用
linux 下C++實現 ARP釋出,和ARP監聽
改造自http://blog.csdn.net/xiaodao1986/article/details/6628250 g++ -o即可編譯通過。 ubuntu 14.04 可以用適當的方法,在寢室裡,讓室友不能上網。 #include <stdio.h>
訊息佇列mq的原理及實現方法
訊息佇列技術是分散式應用間交換資訊的一種技術。訊息佇列可駐留在記憶體或磁碟上,佇列儲存訊息直到它們被應用程式讀走。通過訊息佇列,應用程式可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程式接收此訊息。 訊息中介軟體概述 訊息佇列技術是分散式應用間交
訊息佇列MQ實踐----實現Queue(佇列訊息)和Topic(主題訊息)兩種模式
之前有篇檔案介紹了生產消費者模式(http://blog.csdn.net/canot/article/details/51541920 ),當時是通過BlockingQueue阻塞佇列來實現,以及在Redis中使用pub/sub模式(http://blog.csdn.ne
訊息佇列MQ分析
做java開發的面試基本會遇到java基礎知識,設計模式,多執行緒,io,集合等,spring/springMvc/mybatis/springBoot,mysql/oracle/sql優化這些。現在僅僅會這些是不夠的,還會要求快取、訊息佇列、訊息中介軟體。springCloud/Dubbo
訊息佇列MQ選型 - Kafka、RabbitMQ對比
image.png 適應場景 非同步處理,應用解耦,流量削鋒和訊息通訊 對比 feature scenario Kafka RabbitMQ 備註 PUB-SUB 釋出訂閱模型
訊息佇列MQ技術的介紹和原理
訊息佇列技術是分散式應用間交換資訊的一種技術。訊息佇列可駐留在記憶體或磁碟上,佇列儲存訊息直到它們被應用程式讀走。通過訊息佇列,應用程式可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程式接收此訊息。 訊息中介軟體概述 訊息佇列技術是分散式
轉載:訊息佇列MQ
本文大概圍繞如下幾點進行闡述: 為什麼使用訊息佇列? 使用訊息佇列有什麼缺點? 訊息佇列如何選型? 如何保證訊息佇列是高可用的? 如何保證訊息不被重複消費? 如何保證消費的可靠性傳輸? 如何保證訊息的順序性? 1、為什麼要使用訊息佇列? 分析:一個用訊息佇列的人,不知道
reids叢集學習(二)使用jedis實現redis叢集客戶端
上一節我記錄瞭如何搭建redis官方的叢集,這節我就開始講怎麼用jedis實現叢集環境下的客戶端。 jedis中實現叢集的客戶端類是redis.clients.jedis.JedisClust
jedis 實現 redis 統計一個使用者在一段時間內的登入次數
import java.util.BitSet; import redis.clients.jedis.Jedis; public class SetBitTest {public static void main(String[] args) {// TODO Auto
SpringBoot對訊息佇列(MQ)的支援
1.非同步訊息的定義 非同步訊息的主要目的是為了系統與系統之間的通訊,所謂非同步訊息即訊息傳送者無需等待訊息接收者的處理以及返回,甚至無需關心訊息是否傳送成功 在非同步訊息中有兩個很重要的概念,即訊息代理和目的地,當訊息傳送者傳送訊息之後,訊息將由訊
【訊息佇列MQ】各類MQ比較 【轉載】
原文地址:http://blog.csdn.net/sunxinhere/article/details/7968886目前業界有很多MQ產品,我們作如下對比: RabbitMQ 是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它
【訊息佇列MQ】各類MQ比較
目前業界有很多MQ產品,我們作如下對比: RabbitMQ 是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味
詳解RPC遠端呼叫和訊息佇列MQ的區別
PC(Remote Procedure Call)遠端過程呼叫,主要解決遠端通訊間的問題,不需要了解底層網路的通訊機制。 RPC框架 知名度較高的有Thrift(FB的)、dubbo(阿里的)。 RPC的一般需要經歷4個步驟: 1、建立通訊 首先要
【訊息佇列MQ】【Kafka&Jafka】design-2
源:http://incubator.apache.org/kafka/design.html Message Persistence and Caching Don't fear the filesystem! Kafka在很大程度上依賴檔案系統來實現儲
Java執行緒實現Redis任務佇列(生產者消費者)
注:接上篇IDEA整合Redis,本篇實現Redis的任務佇列,Redis連線池具體配置看上篇。 一:寫一個Jedis的工具類JedisUtil,將Jedis中的部分方法實現,程式碼如下: package com.wq.Util; import com.wq.RedisP
Amazon SQS 訊息佇列服務_訊息佇列mq解決方案
Amazon Simple Queue Service (SQS) 是一種完全託管的訊息佇列服務,可讓您分離和擴充套件微服務、分散式系統和無伺服器應用程式。SQS 消除了與管理和運營訊息型中介軟體相關的複雜性和開銷,並使開發人員能夠專注於重要工作。藉助 SQS,您可以在軟體元件之間傳送、儲
訊息佇列 MQ
訊息佇列(Message Queue,簡稱 MQ)是阿里巴巴集團中介軟體技術部自主研發的專業訊息中介軟體,產品基於高可用分散式叢集技術,提供訊息釋出訂閱、訊息軌跡查詢、定時(延時)訊息、資源統計、監
訊息佇列mq總結(重點看,比較了主流訊息佇列框架)
RabbitMQ/Kafka/ZeroMQ 都能提供訊息佇列服務,但有很大的區別。在面向服務架構中通過訊息代理(比如 RabbitMQ / Kafka等),使用生產者-消費者模式在服務間進行非同步通訊是一種比較好的思想。因為服務間依賴由強耦合變成了鬆耦合。訊息代理都會提供持久化機制,在消費者負載高或者掉線的情