1. 程式人生 > >Redis的Keyspace notifications功能初探

Redis的Keyspace notifications功能初探

本文出處:http://blog.csdn.net/chaijunkun/article/details/27361453,轉載請註明。由於本人不定期會整理相關博文,會對相應內容作出完善。因此強烈建議在原始出處檢視此文。

最近在做一套系統,其中要求若干個Worker伺服器將心跳資訊都上報給中央伺服器。當一定時間中央伺服器沒有得到心跳資訊時則認為該Worker失效了,發出告警。

滿足這種需求的解決方法多種多樣,我開始想到了memcache,上報一次心跳資訊就重新整理一次快取,當快取內心跳資訊物件超時被刪除,即認為對應的Worker失效。然而由於memcache的工作原理,刪除都是被動的,我們無法及時判斷資料是否過期,即便知道了資料過期,也沒有一種機制來回調方法來執行自定義的處理動作。難道快取架構就真的不行了嗎?

答案是否定的。在Redis 2.8.0版本起,加入了“Keyspace notifications”(即“鍵空間通知”)的功能。如何理解該功能呢?我們來看下官方是怎麼說的:

鍵空間通知,允許Redis客戶端從“釋出/訂閱”通道中建立訂閱關係,以便客戶端能夠在Redis中的資料因某種方式受到影響時收到相應事件。

可能接收到的事件舉例如下:

影響一個給出的鍵的所有命令(會告訴你哪個鍵被執行了一個命令,這個命令是什麼);

接收到了一個LPUSH操作的所有鍵(LPUSH命令:key v1 [v2 v3..]將指定的所有值從左到右進行壓棧操作,形成一個棧,並將該棧命名為指定的key);

在資料庫0中失效的所有鍵(不一定非得是資料庫0,這裡這樣表述其實想表達可以知道影響的哪個資料庫)。

看到這裡我聯想到,如果一條快取資料失效了,通過訂閱關係,客戶端會收到訊息,通過分析訊息可以得知何種訊息,分析訊息內容可以知道是哪個key失效了。這樣就可以間接實現開頭所描述的功能。

接下來我們來看下實驗的步驟:

1.準備redis伺服器

作為開源軟體,redis下載後得到的是原始碼,使用tar -xzvf redis-2.8.9.tar.gz解壓後對其進行編譯,過程也很簡單,make就可以了。編譯完成之後可以使用自帶的runtest進行測試,看是否編譯成功。然後就是安裝了,執行make PREFIX=/usr/local/redis-2.8.9 install,PREFIX引數指定的就是安裝路徑。現在安裝的只有可執行檔案,還沒有配置檔案。其實在原始碼目錄中有一個模板redis.conf,我們對它進行改動就可以了。
其他配置我們不關心,但是官方文件中說Keyspace notifications功能預設是關閉的(預設地,Keyspace 時間通知功能是禁用的,因為它或多或少會使用一些CPU的資源),我們需要開啟它。開啟的方法也很簡單,配置屬性:notify-keyspace-events 預設配置是這樣的:notify-keyspace-events "" 根據文件中的說明:
K     Keyspace events, published with [email protected]<db>__ prefix.
E     Keyevent events, published with [email protected]<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
A     Alias for g$lshzxe, so that the "AKE" string means all the events.
我們配置為:notify-keyspace-events Ex,含義為:釋出key事件,使用過期事件(當每一個key失效時,都會生成該事件)。

2.準備客戶端和連線配置

本文中使用的客戶端是Jedis,版本為2.4.2,為了程式碼的通用性,我使用Spring來管理連線:

<bean id="pool" class="redis.clients.jedis.JedisPool">
    	<constructor-arg>
    		<bean id="config" class="redis.clients.jedis.JedisPoolConfig">
    			<property name="maxIdle" value="0" />
    			<property name="maxTotal" value="20" />
    			<property name="maxWaitMillis" value="1000" />
    			<property name="testOnBorrow" value="true" />
   	 	</bean>
    	</constructor-arg>
    	<constructor-arg>
    		<value>192.168.1.100</value>
    	</constructor-arg>
    	<constructor-arg>
		<value>6379</value>
	</constructor-arg>
</bean>
然後使用Spring Test和Junit來測試程式碼
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext*.xml")
public class RedisSubscribeDemo {
	
	private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class);
	
	@Resource
	private JedisPool pool;
	
	@Test
	public void doTest() throws InterruptedException {
		for (int i = 0; i < 1; i++) {
			TestThread thread= new TestThread(pool);
			thread.start();
		}
		Thread.sleep(50000L);
		Log.info("Test finish...");
    }
}

由於要使用一定的延遲,我們把主要測試程式碼放到了TestThread中。當測試執行緒啟動後,主執行緒停滯50秒,讓我們有足夠的時間來操作。
public class TestThread extends Thread {
	
	private Log log= LogFactory.getLog(TestThread.class);
	
	private JedisPool pool;
	
	public TestThread(JedisPool pool){
		log.info("loading test thread");
		this.pool= pool;
	}

	@Override
	public void run() {
		Jedis jedis= pool.getResource();
		jedis.psubscribe(new MySubscribe(), "*");
		try {
			Thread.sleep(10000L);
		} catch (InterruptedException e) {
			log.info("延時失敗", e);
		}
		jedis.close();
		log.info("Test run finished");
	}
}
在測試執行緒中,我們將自定義的MySubscribe加入到了Jedis的模板訂閱(即psubscribe,因為模板訂閱的channel是支援星號'*'通配的,這樣可以收集到多個通配通道的訊息,而與之相反的還有一個subscribe,此訂閱只能指定嚴格匹配的通道)中,同樣為了測試過程能夠將結果顯示出來,在綁定了訂閱後,對該執行緒進行了延時10秒。
public class MySubscribe extends JedisPubSub {
	
	private static final Log log= LogFactory.getLog(MySubscribe.class);

	// 初始化按表示式的方式訂閱時候的處理  
	public void onPSubscribe(String pattern, int subscribedChannels) {  
    		log.info(pattern + "=" + subscribedChannels);
	}
	
	// 取得按表示式的方式訂閱的訊息後的處理  
	public void onPMessage(String pattern, String channel, String message) {  
    		log.info(pattern + "=" + channel + "=" + message);
	}

	...其他未用到的重寫方法忽略
}
作為Jedis自定義訂閱,必須繼承redis.clients.jedis.JedisPubSub類,在psubscribe模式下,重點重寫onPMessage方法,該方法為接收到模板訂閱後處理事件的重要程式碼。pattern為在繫結訂閱時使用的通配模板,channel為通配後符合條件的實際通道名稱,message就不用多說了,就是事件訊息內容。

3.實戰

通過Redis自帶的redis-cli命令,我們可以在服務端通過命令列的方式直接操作。我們執行上面的示例程式碼,然後迅速切換到redis-cli命令中,建立一個生命週期很短暫的資料:
127.0.0.1:6379> set chaijunkun 123 PX 100
PX引數指定生命週期單位為毫秒,100即宣告週期,即100毫秒。key為chaijunkun的資料,其值為123。 當執行語句後,回顯:
OK

這時我們看例項程式的輸出:
*[email protected]__:expired=chaijunkun
從輸出可以看出,之前指定的萬用字元為*,通配任何通道;之後是實際的通道名稱:[email protected]__:expired,這裡我們可以看到訂閱收到了一個keyevent位於資料庫0,事件型別為:expired,是一個過期事件;最後是chaijunkun,這個是過期資料的key。 在官方文件中,keyevent通道的格式永遠是這樣的: [email protected]<db>__:prefix
對於資料過期事件,我們在繫結訂閱時通配模板也可以精確地寫成: [email protected]*__:expired
通過示例程式碼,我們可以看到確實印證了之前的構想,實現了資料過期的事件觸發(event)或者說回撥(callback)。

4.其他應用

之前的程式碼中,對於事件的釋出都是由Redis自己生成的。實際上在命令中主動釋出自定義訊息也是可以的,在publish命令的幫助中我們看到:
127.0.0.1:6379> help publish

  PUBLISH channel message
  summary: Post a message to a channel
  since: 2.0.0
  group: pubsub
通過引數,可以自定義通道名稱和通道訊息。而在Jedis中,釋出API甚至做到了位元組資料的級別: jedis.publish(byte[] channel, byte[] message)
因此我們可以構想,自定一套通訊協議,channel為命令字,message為訊息體,我們可以通過redis這種簡單的釋出/訂閱機制實現訊息的分發。

相關推薦

Redis的Keyspace notifications功能初探

本文出處:http://blog.csdn.net/chaijunkun/article/details/27361453,轉載請註明。由於本人不定期會整理相關博文,會對相應內容作出完善。因此強烈建議在原始出處檢視此文。 最近在做一套系統,其中要求若干個Worker伺服器

一個Linux平臺PM功能初探

-c padding start out att void ifd acer slab WatchDog 用戶空間節點 /dev/watchdog /sys/bus/platform/devices/zx29_ap_wdt.0 /sys/bus/platform/drive

net.sz.framework 框架 輕鬆搭建服務---讓你更專注邏輯功能---初探

前言  在之前的文章中,講解過 threadmodel,socket tcp ,socket http,log,astart ,scripts; 都是分片講解,從今天開始,將帶大家,一窺 net.sz.framework 框架; net.sz.framework 框架分為java版本和C#.net 版本,兩種

HTML5中的prefetch預載入功能初探

 在HTML5中,有個很有用但常被忽略的特性,就是預先載入(prefetch),它的原理是: 利用瀏覽器的空閒時間去先下載使用者指定需要的內容,然後快取起來,這樣使用者下次載入時,就直接從快取中取出來,效率就快了.   目前,只有firefox和chrome支援這兩個特性,

Altium Designer高階功能初探之:匹配線長

Matched Length design in Altium Designer Altium Designer中的線長匹配設計 Along with high speed signal existing in the PCB Board, so we need

工作流框架Activiti常用功能初探

import java.io.File; import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import

Azure中國CDN全球覆蓋功能初探

在不久前的4月初,Azure中國官網上簡短地釋出了其CDN中“標準版 Zone 2”功能。一開始筆者尚有些摸不著頭腦,這個“Zone 2”具體指的是什麼呢?好在後來官網更新了資訊描述如下: 這下就比較清楚了,原來是指Azure中國區CDN開始支援全球覆蓋。顧名思義,全球覆蓋指的就是中國區CDN的內容可以被

SpringCloud框架初探(七) :Zuul API路由功能

API閘道器是一個更為智慧的應用伺服器,它的定義類似於面向物件設計模式中的Facade模式,它的存在就像是整個微服務架構系統的門面一樣,所有的外部客戶端訪問都需要經過它來進行排程和過濾。它除了要實現請求路由、 負載均衡、 校驗過濾等功能之外,還需要更多能力,比

jQuery部分功能使用js進行實現(初探JQ)

init splice {} pla .ajax substr rep 使用 turn 今天使用js寫了段jq中的html()方法。我的原則是廢話不多說,直接放代碼。。 <!DOCTYPE html> <html lang="en"> <he

Angularjs演示Service功能

lar 控制器 com 函數 splay click log ica multipl 在angularjs中,我們可以自定義自己的service。可以說得是自定義的方法,函數。下面我們一步一步來演示吧:首先為angularjs定義一個app: var demoAp

自學前端開發:模擬Array功能 不是擴展子類

自學 下使用 .cn 解決 shift this var 擴展 method function MyArray(){};//創建模擬數組功能的構造函數 MyArray.prototype.length=0;//解決IE下使用擴展子類

STL初探——第一級配置 __malloc_alloc_template的學習心得

exception template 定義 stl 空間 似的 strong cep 對象   在第一級配置器中,一開始就定義了內存分配出錯的宏接口,如下: #ifndef __THROW_BAD_ALLOC # if defined(__STL_NO_BAD_ALLO

STL初探——第二級配置器 __default_alloc_template的學習心得

空間配置 def 管理 使用 函數 效率 需求 typename []   SGI STL 第二級配置器使用的是memory pool,即內存池,相比較於第一級空間配置器,第二級空間配置器多了許多限制,主要是為了防止申請小額區塊過多而造成內存碎片。當然小額區塊在配置時實際上

使用canvas進行圖片裁剪簡單功能

getc page nload 一個 use over load 基本 height 1.html部分 使用一個input[type="file"]進行圖片上傳; canvas進行圖片的裁剪展示 <div> <input type="f

vue2.0的常用功能簡介

span nbsp color -i highlight href out con router 路由跳轉 當我們想要實現點擊鏈接跳轉時,可以使用$router來進行跳轉 語法如下: this.$router.push({path:"/www",query:{id:

vue2.0 代碼功能片段

vue lag taf 數組 select 括號 light oar brush 1、代碼片段截取 checkAll: function(flag){ this.checkAllFlag = flag; this.productLi

Libcurl的初步實現tfp上傳下載功能

rtmp 細致 helib art download close 2.3 article 可能 該學習筆記的目標是利用libcurl實現ftp文件上傳和下載功能 一、Libcurlde的簡單介紹 Libcurl是一個免費的而且易於使用的利用url進行文件傳輸的庫。,

使用NGINX+Openresty實現WAF功能

dev 安裝nginx 404頁 echo eight 並不是 where erro ip報文 使用NGINX+Openresty實現WAF功能 一、了解WAF1.1 什麽是WAF Web應用防護系統(也稱:網站應用級入侵防禦系統 。英文:Web Application F

PyQt4 模擬記事本基本功能(保存,打開文件)

int .sh idt img 問題 top 我們 文件 pyqt 1. 默認【保存】按鈕enable 2. 修改文本的內容後,【enable】 3. 解決字符亂碼問題:utf-8 # -*- coding: utf-8 -*- import sys from PyQt

jquery實現全選/反選功能

click demo lar sim llc res rip rop 個數 <!DOCTYPE html><html><head> <meta http-equiv="Content-Type" content="text/h