Java Jedis操作Redis示例(一)——pub/sub模式實現訊息佇列
轉載:http://blog.csdn.net/shaobingj126/article/details/50585035
轉載:http://blog.csdn.net/abcd898989/article/details/51697596
一 訊息佇列
1. 定義
訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。目前在生產環境,使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
2. 訊息佇列的適用場景
訊息佇列的適用場景包括非同步處理,應用解耦,流量削鋒和訊息通訊四個場景
1. 非同步處理:非同步處理中訊息佇列儲存了當前處理操作,使得動作請求方可以在發出動作請求/寫入訊息佇列後理解返回,非同步獲取結果,關注點在於請求的友好程度。
2. 應用解耦:應用解耦用於消除請求發起方和請求處理方的耦合,提升系統的健壯性。
3. 流量削鋒:流量削峰一般指秒殺或搶購場景,訊息佇列用於控制活動人數,緩解高訪問壓力。
4. 日誌處理:日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。
3. 訊息模型
在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
P2P模式:
P2P模式包含三個角色:訊息佇列(Queue),傳送者(Sender),接收者(Receiver)。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
P2P的特點
- 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
- 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
- 接收者在成功接收訊息之後需向佇列應答成功
Pub/sub模式:
包含三個角色主題(Topic),釋出者(Publisher),訂閱者(Subscriber) 。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。
Pub/Sub的特點
- 每個訊息可以有多個消費者
- 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息。
- 為了消費訊息,訂閱者必須保持執行的狀態。
如果希望傳送的訊息可以不被做任何處理、或者只被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。
4. 訊息消費
在JMS中,訊息的產生和消費都是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。(1)同步:訂閱者或接收者通過receive方法來接收訊息,receive方法在接收到訊息之前(或超時之前)將一直阻塞;
(2)非同步:訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。
二 Redis 釋出-訂閱模式(pub/sub)
Pub/Sub功能(means Publish, Subscribe)即釋出及訂閱功能。基於事件的系統中,Pub/Sub是目前廣泛使用的通訊模型,它採用事件作為基本的通訊機制,提供大規模系統所要求的鬆散耦合的互動模式:
訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;
釋出者(如伺服器)可將訂閱者感興趣的事件隨時通知相關訂閱者。
1. 時間非耦合:釋出者和訂閱者不必同時線上,它們不必同時參與互動。
2. 空間非耦合:釋出者和訂閱者不必相互知道對方所在的位置。釋出者通過事件服務釋出事件,訂閱者通過事件服務間接獲得事件。釋出者和訂閱者不需要擁有直接到對方的引用,也不必知道有多少個訂閱者或者是釋出者參與互動。
3. 同步非耦合:釋出者/訂閱者是非同步模式。釋出者可不斷地生產事件,而訂閱者(通過一個回撥)則可非同步地得到產生事件的通知。
分類:
按照訂閱方式分為基於主題(topic-based)、基於內容(content-based)、基於型別(type-based)的pub/sub方式。
三 Redis pub/sub的實現(非持久)
Redis通過publish和subscribe命令實現訂閱和釋出的功能。訂閱者可以通過subscribe向redis server訂閱自己感興趣的訊息型別。redis將資訊型別稱為通道(channel)。當釋出者通過publish命令向redis server傳送特定型別的資訊時,訂閱該訊息型別的全部訂閱者都會收到此訊息。1. 匯入Redis依賴(以Maven工程為例子):
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
2. 增加日誌配置檔案,這裡使用系統輸出代替日至
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
3. 建立訊息的釋出者Publisher.java
package com.zenhobby.redis_pub_sub;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import redis.clients.jedis.Jedis;
public class Publisher {
private Jedis publisherJedis;
private String channel;
public Publisher(Jedis publishJedis,String channel){
this.publisherJedis=publishJedis;
this.channel=channel;
}
public void startPublish(){
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while(true){
System.out.println("請輸入message:");
String line = reader.readLine();
if(!"quit".equals(line)){
publisherJedis.publish(channel, line);
}else{
break;
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
4. 實現訊息的接收者Subscriber.java,實現JedisPubSub介面
package com.zenhobby.redis_pub_sub;
import redis.clients.jedis.JedisPubSub;
public class Subscriber extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("Channel:" + channel + ",Message:" + message);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}
}
JedisPubSub是Redis提供的抽象類,繼承這個類就完成了對客戶端對訂閱的監聽。
抽象類中存在六個方法。分別表示
- 監聽到訂閱模式接受到訊息時的回撥 (onPMessage)
- 監聽到訂閱頻道接受到訊息時的回撥 (onMessage )
- 訂閱頻道時的回撥( onSubscribe )
- 取消訂閱頻道時的回撥( onUnsubscribe )
- 訂閱頻道模式時的回撥 ( onPSubscribe )
- 取消訂閱模式時的回撥( onPUnsubscribe )
5. 建立測試Main.java
package com.zenhobby.redis_pub_sub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class TestMain {
public static final String CHANNEL = "mychannel";
public static final String HOST = "127.0.0.1";
public static final int PORT = 6379;
private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, HOST, PORT, 0);
public static void main(String[] args) {
final Jedis subscriberJedis = JEDIS_POOL.getResource();
final Jedis publisherJedis = JEDIS_POOL.getResource();
final Subscriber subscriber = new Subscriber();
new Thread(new Runnable() {
public void run() {
try {
System.out.println("Subscribing to mychannel,this thread will be block");
subscriberJedis.subscribe(subscriber, CHANNEL);
System.out.println("subscription ended");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Publisher(publisherJedis, CHANNEL).startPublish();
publisherJedis.close();
subscriber.unsubscribe();
subscriberJedis.close();
}
}
6.測試方法:首先,啟動main方法中所示地址的Redis伺服器;然後,執行main方法,觀察控制檯輸出。並且我們是以控制檯輸入內容作為訊息釋出的內容,各位看官可以在控制檯輸入任意內容,點選回車鍵,觀察控制檯輸出。示例如下(直接把原博的圖借過來啦):
注意:此方法實現的釋出與訂閱功能,訊息不會在Redis客戶端進行快取。
四 Redis的pub/sub實現(持久)
Redis的pub/sub的持久主要通過,在非持久化的基礎上需要作如下處理:
1. 重新實現Publisher
package com.zenhobby.redis.persistence;
import java.util.Set;
import redis.clients.jedis.Jedis;
public class PPubClient {
private Jedis jedis;
private String CONSTANT_CLIENTSET = "clientSet";
public PPubClient(String host,int port){
jedis = new Jedis(host,port);
}
private void put(String message){
Set<String> subClients = jedis.smembers(CONSTANT);
for(String clientKey:subClients){
jedis.rpush(clientKey, message);
}
}
public void pub(String channel,String message){
Long txid = jedis.incr("MAXID");
String content = txid+"/"+message;
this.put(content);
jedis.publish(channel, message);
}
public void close(String channel){
jedis.publish(channel, "quit");
jedis.del(channel);
}
}
在新實現的Publisher中使用Jedis儲存釋出的訊息。
2. 重新實現SubClient
package com.zenhobby.redis.persistence; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; public class PPSubClient { private Jedis jedis; private JedisPubSub listener; private String CONSTANT_CLIENTSET="clientSet"; public PPSubClient(String host,int port,String clientId){ jedis = new Jedis(host,port); listener = new PPrintListener(clientId,new Jedis(host,port)); jedis.sadd(CONSTANT_CLIENTSET, clientId); } public void sub(String channel){ jedis.subscribe(listener, channel); } public void unsubscribe(String channel){ listener.unsubscribe(channel); } }
這個客戶端並沒有繼承JedisPubSub類,轉而在如下的輸出類進行Listener的處理
3. Listener類用於處理訊息
package com.zenhobby.persistence;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PPrintListener extends JedisPubSub {
private String clientId;
private PSubHandler handler;
private String CONSTANT = "clientSet";
public PPrintListener(String clientId, Jedis jedis) {
this.clientId = clientId;
handler = new PSubHandler(jedis);
}
@Override
public void onMessage(String channel, String message) {
if (message.equalsIgnoreCase("quit")) {
this.unsubscribe(channel);
}
handler.handle(channel, message);
System.out.println("message receive:" + message + ",channel:" + channel);
}
private void message(String channel, String message) {
Date time = new Date();
System.out.println("message receive:" + message + ",channel:" + channel + time.toString());
}
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("message receive:" + message + ",pattern channel:" + channel);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
handler.subscribe(channel);
System.out.println("subscribe:" + channel + ",total channels:" + subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
handler.unsubscribe(channel);
System.out.println("unsubscribe:" + channel + ",total channels:" + subscribedChannels);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("subscribe pattern:" + pattern + ",total channels:" + subscribedChannels);
}
@Override
public void unsubscribe(String... channels) {
super.unsubscribe(channels);
for (String channel : channels) {
handler.unsubscribe(channel);
}
}
class PSubHandler {
private Jedis jedis;
PSubHandler(Jedis jedis) {
this.jedis = jedis;
}
public void handle(String channel, String message) {
int index = message.indexOf("/");
if (index < 0) {
return;
}
Long txid = Long.valueOf(message.substring(0, index));
String key = clientId + "/" + channel;
while (true) {
String lm = jedis.lindex(key, 0);
if (lm == null) {
break;
}
int li = lm.indexOf("/");
if(li<0){
String result = jedis.lpop(key);
if(result == null){
break;
}
message(channel, lm);
continue;
}
Long lxid = Long.valueOf(lm.substring(0, li));
if(txid>=lxid){
jedis.lpop(key);
message(channel,lm);
continue;
}else{
break;
}
}
}
public void subscribe(String channel){
String key = clientId+"/"+channel;
boolean exist = jedis.sismember(CONSTANT, key);
if(!exist){
jedis.sadd(CONSTANT, key);
}
}
public void unsubscribe(String channel){
String key = clientId+"/"+channel;
jedis.srem(CONSTANT, key);
jedis.del(key);
}
}
}
其中jedis.sismember(CONSTANT, Key)用於判斷當前使用者是否存在,如果不存在則新增(和Redis快取的思路相同)。
4. 建立測試Main方法,具體內容如下:
package com.zenhobby.redis.persistence;
public class PPubSubTestMain {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 6379;
String clientId = "myclient";
PPubClient pubClient = new PPubClient(host, port);
final String channel = "mychannel";
final PPSubClient subClient = new PPSubClient(host, port, clientId);
Thread subThread = new Thread(new Runnable() {
public void run() {
System.out.println("------------sub----start------------");
subClient.sub(channel);
System.out.println("------------sub----end------------");
}
});
subThread.setDaemon(true);
subThread.start();
int i = 0;
while (i < 20) {
String message = "message--" + i;
pubClient.pub(channel, message);
i++;
Thread.sleep(100);
}
subClient.unsubscribe(channel);
}
}
5.測試方法:首先,啟動main方法中所示地址的Redis伺服器;然後,執行main方法,觀察控制檯輸出。這次我們是以迴圈呼叫作為輸入內容作為訊息釋出的內容,各位看官觀察控制檯輸出。示例如下:然後,開啟Redis客戶端,觀察當前Redis中保留的所有資料:
題外的話:
Redis目前提供的釋出與訂閱功能,將會完全阻塞訂閱者的客戶端,在java實現時,即需要保留一個執行緒來專門處理髮布者與訂閱者的連線。因此,在實際應用時,更加推薦的做法是使用MQ元件來實現該功能。
至此,NoSQL之Redis---PUB/SUB(訂閱與釋出)---JAVA實現 結束
在此,對以下參考資料的作者表示感謝!:
參考資料:
redis官網:
其他博文:
http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw
http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html
相關推薦
Java Jedis操作Redis示例(一)——pub/sub模式實現訊息佇列
轉載:http://blog.csdn.net/shaobingj126/article/details/50585035 轉載:http://blog.csdn.net/abcd898989/article/details/51697596 一 訊息佇列 1. 定義 訊息
Java Jedis操作Redis示例(五)——Redis的事務、管道和指令碼
一 Redis的事務 在資料庫系統中,一個事務是指:由一系列資料庫操作組成的一個完整的邏輯過程。例如銀行轉帳,從原賬戶扣除金額,以及向目標賬戶新增金額,這兩個資料庫操作的總和,構成一個完整的邏輯過程,不可拆分。這個過程被稱為一個事務,具有ACID特性。 0. A
JAVA並行框架Fork/Join(一):簡介和代碼示例
over 框架設計 put 分割 gif 得到 java owa trace 一、背景 雖然目前處理器核心數已經發展到很大數目,但是按任務並發處理並不能完全充分的利用處理器資源,因為一般的應用程序沒有那麽多的並發處理任務。基於這種現狀,考慮把一個任務拆分成多個單元,每個單元
Jedis操作單節點redis,叢集及redisTemplate操作redis叢集(三)
package com.dream21th.dream21thredis.controller;import java.util.List;import java.util.Map;import org.springframework.beans.factory.annotation.Autowired;im
Jedis操作單節點redis,叢集及redisTemplate操作redis叢集(二)
package com.dream21th.dream21thredis.redis;import java.util.List;import java.util.Map;import java.util.Set;import com.dream21th.dream21thredis.key.KeyPrefi
《大話設計模式》Java程式碼示例(一)之簡單工廠方法
簡單工廠模式(Simple Factory):也叫靜態工廠模式,就是建立一個工廠類,對實現了同一介面的一些類進行例項的建立。 package simplefactory; /** * 簡單工廠方法(Simple Factory) * 簡單運算工廠類 */ p
Java中常用到的文件操作那些事(一)——替換doc文檔模板,生成真實合同案例
代碼 sta ring site hashmap i++ illegal puts except 工作中,我們時常會遇到一些操作文件的操作,比如在線生成合同模板,上傳/下載/解析Excel,doc文檔轉為pdf等操作。本文就已工作中遇到的在線生成合同為例,簡要地介紹一種
Java中的反射機制(一)
erl void port 令行 sage [0 ray 輸出 我們 基本概念 在Java運行時環境中,對於任意一個類,能否知道這個類有哪些屬性和方法?對於任意一個對象,能否調用它的任意一個方法? 答案是肯定的。 這種動態獲取類的信息以及動態調用對象的方法的功能
JAVA中的枚舉(一)
enum 枚舉 在實際編程中,往往存在著這樣的“數據集”,它們的數值在程序中是穩定的,而且“數據集”中的元素是有限的。例如星期一到星期日七個數據元素組成了一周的“數據集”,春夏秋冬四個數據元素組成了四季的“數據集”。在Java中想表示這種數據集最容易想到的寫法可能是這樣,我們以表示一周五天的工作日來舉
vue-router單頁應用簡單示例(一)
問題 clas 做了 設置 new scope 文件的 log target 請先完成了項目初始化,具體請看我另一篇博文。vue項目初始化 看一下完成的效果圖,很典型的單頁應用。 .vue後綴名的單文件組件 這裏先說一下我對組件的理解。組件,顧名思義就是一組元素組成的
(轉)Redis研究(一)—簡介
創始人 存儲結構 隊列 cached tar 寫入 關系 退出 使用 http://blog.csdn.net/wtyvhreal/article/details/41855327 Redis是一個開源的高性能鍵值對數據庫。它通過提供多種鍵值數據類型來適應不同場景下的
Java並發編程(一)
implement 返回 tile 對象 not seconds dex note 系統調用 1、定義
Java導出txt模板——(一)
qps ogl iar i++ vnr snv bho vra dsr 導出txt文件時候\r\n才能換行 java代碼 package DRDCWordTemplates; import java.io.BufferedWriter; import java.io.F
.NET中使用Redis之ServiceStack.Redis學習(一)安裝與簡單的運行
arraylist write client cli ring blog 控制臺 創建 spa 1.下載ServiceStack.Redis PM> Install-Package ServiceStack.Redis 2.vs中創建一個控制臺程序 class Pro
【JAVA秒會技術之秒殺面試官】秒殺Java面試官——集合篇(一)
tails category tail java cat 秒殺 試題 面試官 java面試 【JAVA秒會技術之秒殺面試官】秒殺Java面試官——集合篇(一) 【JAVA秒會技術之秒殺面試官】JavaEE常見面試題(三) http://blog.csdn.net/qq296
Java核心技術 卷二(一)
rgs code 一個 dir 字節 per workspace spa 核心技術 書:《Java核心技術 卷二 高級特性 9》 時間:2017.9.4 17:13 1.流 輸入流:可以從其中讀取一個字節序列的對象;抽象類(InputStream) 輸出流:可以向其
Java中的線程(一)
java線程一、線程與進程 談到線程,那就不得不提進程,很久之前其實並沒有線程,只有進程,當一個程序需要運行的時候,必然需要使用系統資源和CPU,因此進程就擔任了對一個應用程序進行資源分配以及CPU調度這兩項職責。後來,為了進一步提高並發執行和資源利用的效率,提出了線程的概念,將進程作了細分,進程將負責資源
Java集合幹貨系列-(一)ArrayList源碼解析
div imp ins bject 增加 toa tof capacity == 前言 今天來介紹下ArrayList,在集合框架整體框架一章中,我們介紹了List接口,ArrayList繼承了AbstractList,實現了List。ArrayList在工作中經常用到,所
Java語言中的----繼承(一)
java語言中的----繼承(一)day10 Java語言中的繼承(一)一、繼承概述: 繼承:什麽是繼承,程序中的繼承與生活中的繼承還是有區別的,在程序中繼承以後,你的父類和你的子類同樣的也具有某一成員變量。那麽我們為什麽藥學習繼承?是因為我們在編程的時候我們會有大量的代碼需要重寫,從而導致我們代碼比較
Python操作rabbitmq系列(一)
targe 紅色 入門 web 之間 cap ssa 隊列 技術 從本文開始,接下來的內容,我們將討論rabbitmq的相關功能。我的這些文章,最終是要實現一個項目(具體是什麽暫不透露)。前面每一篇,都是在為這個系統做準備。rabbitmq,是我們這個項目的關鍵部分之一。所