Redis(十五)Redis 的一些常用技術(Spring 環境下)
一、Redis 事務與鎖機制
1.Redis的基礎事務
在Redis中開啟事務的命令是 multi 命令, 而執行事務的命令是 exec 命令。multi 到 exec 命令之間的 Redis 命令將采取進入隊列的形式,直至 exec 命令的出現,才會一次性發送隊列裏的命令去執行,而在執行這些命令的時候其他客戶端就不能再插入任何命令了。
127.0.0.1:6379> multi OK 127.0.0.1:6379> set key1 value1 QUEUED 127.0.0.1:6379> get key1 QUEUED 127.0.0.1:6379> exec1) OK 2) "value1"
如果回滾事務,可以使用 discard 命令取消事務中所有命令,使事務中的方法不會被執行了。
127.0.0.1:6379> multi OK 127.0.0.1:6379> set key1 value1 QUEUED 127.0.0.1:6379> get key1 QUEUED 127.0.0.1:6379> discard OK 127.0.0.1:6379> exec (error) ERR EXEC without MULTI
2.在Spring中使用Redis事務
SessionCallback接口可以保證所有的命令都是通過同一個 Redis 連接進行操作的。
public static void testTransaction() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); SessionCallback callBack = (SessionCallback) (RedisOperations ops) -> { ops.multi();// 開啟事務 ops.boundValueOps("key1").set("value1"); // 註意由於命令只是進入隊列,而沒有被執行,所以此處采用get命令返回值為null String value = (String) ops.boundValueOps("key1").get(); System.out.println("value = " + value); // list保存之前進入隊列的所有命令的結果 List list = ops.exec();// 執行事務 // 事務結束後,取出value1 value = (String) redisTemplate.opsForValue().get("key1"); return value; }; // 執行Redis命令 String value = (String) redisTemplate.execute(callBack); System.out.println(value); }
返回結果:
value = null
value1
3.Redis 事務回滾的兩種情況
- 命令格式正確,而數據類型錯誤時,僅回滾數據類型錯誤的那條命令
127.0.0.1:6379> multi OK 127.0.0.1:6379> set key1 value1 QUEUED 127.0.0.1:6379> set key2 value2 QUEUED 127.0.0.1:6379> incr key1 QUEUED 127.0.0.1:6379> del key2 QUEUED 127.0.0.1:6379> exec 1) OK 2) OK 3) (error) ERR value is not an integer or out of range 4) (integer) 1
- 命令格式不正確時,直接回滾所有命令
127.0.0.1:6379> multi OK 127.0.0.1:6379> set key1 value1 QUEUED 127.0.0.1:6379> incr (error) ERR wrong number of arguments for ‘incr‘ command 127.0.0.1:6379> set key2 value2 QUEUED 127.0.0.1:6379> exec (error) EXECABORT Transaction discarded because of previous errors. 127.0.0.1:6379> get key1 (nil) 127.0.0.1:6379> get key2 (nil)
4.使用 watch 命令監控事務
在 Redis 中使用 watch 命令可以決定事務是執行還是回滾。一般而言,可以在 multi 命令之前使用 watch 命令監控某些鍵值對,然後使用 multi 命令開啟事務。當Redis 使用 exec 命令執行事務的時候,它首先會去對比被 watch 命令所監控的鍵值對,如果沒有發生變化,那麽它會執行事務隊列中的命令,提交事務;如果發生變化,那麽它不會執行任何事務中的命令,而去事務回滾。無論事務是否回滾,Redis都會去取消執行事務前的watch命令:
Redis 參考了多線程中使用的 CAS (比較與交換,Compare and Swap)去執行的。當一條線程去執行某些業務邏輯,但是這些業務邏輯操作的數據可能被其他線程共享了,這樣會引發多線程中數據不一致的情況。為了克服這個問題,在線程開始時讀取這些多線程共享的數據,並將其保存到當前線程的副本中,稱為舊值(old value),watch命令就是這樣的一個功能。然後,開啟線程業務邏輯,由multi命令提供這個功能。在執行更新即exec命令前,比較當前線程副本保存的舊值和當前線程共享的值是否一致,如果不一致,那麽該數據已經被其他線程操作過,此次更新失敗,事務回滾;否則就認為它沒有被其他線程操作過,就執行對應的業務邏輯。在數據高並發環境的操作中,把這樣的機制稱為樂觀鎖。
CAS 會產生 ABA 問題,而 Redis不會產生 ABA 問題。
產生ABA問題的根本原因就是僅僅只記錄一個舊值,解決辦法例如有Hibernate中對緩存的持久對象加入字段 version 值,每操作一次持久對象,就令version++,可以解決ABA問題。
Redis多個事務完全可以在非阻塞的多線程環境下並發執行,而且Redis的機制是不會產生ABA問題的。
例如:成功提交事務的例子:
127.0.0.1:6379> flushdb OK 127.0.0.1:6379> set key1 value1 OK 127.0.0.1:6379> watch key1 OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> set key2 value2 QUEUED 127.0.0.1:6379> get key2 QUEUED 127.0.0.1:6379> exec 1) OK 2) "value2" 127.0.0.1:6379> get key1 "value1" 127.0.0.1:6379> get key2 "value2"
二、流水線(PipeLined)
當需要使用隊列批量執行一系列的命令時,Pipelined可以提高系統性能。
Redis執行讀/寫速度非常快,但是系統的瓶頸往往是在網絡通信中的時延:
為了解決這個問題,可以使用Redis的流水線,Redis的流水線是一種通信協議:
1.使用 Java API
public static void testJedisPipeline() { JedisPool pool = getPool(); Jedis jedis = pool.getResource(); long start = System.currentTimeMillis(); // 開啟流水線 Pipeline pipeline = jedis.pipelined(); // 測試十萬條讀/寫操作 for (int i = 0; i < 100000; i++) { int j = i + 1; pipeline.set("pipeline_key_" + j, "pipeline_value_" + j); pipeline.get("pipeline_key_" + j); } // pipeline.sync();// 只執行同步,不返回結果 // pipeline.syncAndReturnAll(); 將返回執行過的命令放入List列表中 List result = pipeline.syncAndReturnAll(); long end = System.currentTimeMillis(); System.err.println("耗時: " + (end - start) + "毫秒"); }
返回:耗時: 499毫秒
2.在Spring中使用流水線
public static void testPipeline() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); SessionCallback callBack = (SessionCallback) (RedisOperations ops) -> { for (int i = 0; i < 100000; i++) { int j = i + 1; ops.boundValueOps("pipeline_key_" + j).set("pipeline_value_" + j); ops.boundValueOps("pipeline_key_" + j).get(); } return null; }; long start = System.currentTimeMillis(); // 執行 Redis 的流水線命令 List resultList = redisTemplate.executePipelined(callBack); long end = System.currentTimeMillis(); System.out.println(end - start); }
返回:511
三、發布訂閱
當使用銀行卡消費的時候,銀行往往會通過微信、短信或者郵件通知用戶這筆交易的信息,這便是一種發布/訂閱模式。
發布訂閱模式首先需要消息源,也就是要有消息發布出來,比如銀行通知。首先是銀行的記賬系統收到了交易的命令,交易成功後,就會把消息發送出來,訂閱者就可以接收到這個消息。
發布訂閱需要兩點:
- 要有發送的消息渠道,讓記賬系統能夠發送消息
- 要有訂閱者訂閱這個渠道的消息
1.Redis中的發布訂閱
客戶端1監聽一個叫做chat的頻道:SUBSCRIBE chat
客戶端2在chat上發送消息:publish chat “hello”
此時,客戶端1就收到了客戶端2發送到chat上面的消息:“hello”
2.在Spring環境下使用發布訂閱
(1)Spring中,接收者需要實現MessageListener接口,並實現其中的onMessage方法
package com.ssm.chapter19.redis.listener; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; public class RedisMessageListener implements MessageListener { private RedisTemplate redisTemplate; public RedisTemplate getRedisTemplate() { return redisTemplate; } public void setRedisTemplate(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } @Override public void onMessage(Message message, byte[] bytes) { // 獲取消息 byte[] body = message.getBody(); // 使用值反序列化其轉換 String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body); System.err.println(msgBody); // 獲取頻道 byte[] channel = message.getChannel(); // 使用字符串序列化器轉換 String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel); System.err.println(channelStr); // 將頻道名稱的字節數組轉換成字符串 String bytesStr = new String(bytes); System.err.println(bytesStr); } }
(2)在Spring 配置文件中配置這個類
<bean id="redisMsgListener" class="com.ssm.chapter19.redis.listener.RedisMessageListener"> <property name="redisTemplate" ref="redisTemplate" /> </bean>
(3)還需要配置監聽容器RedisMessageListenerContainer可以用於監聽Redis的發布訂閱消息,指定頻道名稱為chat
當消息通過chat發送時,就會使用redisMsgListener進行處理。
<bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy"> <!--Redis連接工廠 --> <property name="connectionFactory" ref="connectionFactory" /> <!--連接池,這裏只要線程池生存,才能繼續監聽 --> <property name="taskExecutor"> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> <property name="poolSize" value="2" /> </bean> </property> <!--消息監聽Map --> <property name="messageListeners"> <map> <!--配置監聽者,key-ref和bean id定義一致 --> <entry key-ref="redisMsgListener"> <!--監聽類 --> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="chat" /> </bean> </entry> </map> </property> </bean>
(4)測試:執行下面的方法後,控制臺輸出結果為:
public static void testPubSub() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); String channel = "chat"; redisTemplate.convertAndSend(channel, "I am lazy!!"); }
控制臺輸出結果為:
I am lazy!!
chat
chat
四、超時命令
對於Redis而言,del命令可以刪除一些鍵值對,所以Redis比Java虛擬機更加靈活,與此同時,當內存運行空間滿了之後,還可以按照回收機制自動回收一些鍵值對。
但是,當垃圾進行回收的時候,又有可能執行回收而引發系統停頓,因此選擇適當的回收機制和時間將有利於系統性能的提高。
Redis可以給對應的鍵值設置超時:
1.在Redis中測試超時命令
127.0.0.1:6379> set key1 value1 OK 127.0.0.1:6379> get key1 "value1" 127.0.0.1:6379> ttl key1 (integer) -1 127.0.0.1:6379> expire key1 120 (integer) 1 127.0.0.1:6379> ttl key1 (integer) 112 127.0.0.1:6379> ttl key1 (integer) 110 127.0.0.1:6379> ttl key1 (integer) 110 127.0.0.1:6379> ttl key1 (integer) 108 127.0.0.1:6379> ttl key1 (integer) 65 127.0.0.1:6379> persist key1 (integer) 1 127.0.0.1:6379> persist key1 (integer) 0 127.0.0.1:6379> ttl key1 (integer) -1
2.在Spring中使用超時命令
public static void testExpire() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); redisTemplate.execute((RedisOperations ops) -> { ops.boundValueOps("key1").set("value1"); String keyValue = (String) ops.boundValueOps("key1").get(); Long expSecond = ops.getExpire("key1"); System.err.println(expSecond); boolean b = false; b = ops.expire("key1", 120L, TimeUnit.SECONDS); b = ops.persist("key1"); Long l = 0L; l = ops.getExpire("key1"); Long now = System.currentTimeMillis(); Date date = new Date(); date.setTime(now + 120000); ops.expireAt("key", date); return null; }); }
3.問題:如果key超時了,Redis 會回收key的存儲空間嗎?
不會。Redis的key超時不會被其自動回收,它只會標識哪些鍵值對超時了。
這樣做的好處是,如果一個很大的鍵值對超時,必須一個列表或者哈希結構,存在數以百萬個元素,要對其回收需要很長時間。如果采用超時回收,則可能產生系統停頓。壞處也很明顯,就是超時的鍵值對會浪費比較多的空間。
Redis 提供兩種方式回收超時鍵值對:
- 定時回收:在確定的某個時間觸發一段代碼,回收超時的鍵值對。定時回收可以完全回收那些超時的鍵值對,但是缺點也很明顯,如果這些鍵值對比較多,則Redis需要運行較長的時間,從而導致停頓。一般會選擇在沒有業務發生的時刻觸發Redis的定時回收,以便清理超時的鍵值對。
- 惰性回收:當一個超時的鍵,被再次用get命令訪問時,將觸發Redis將其從內存中情況。優勢是可以指定回收超時的鍵值對,缺點是要執行一個get操作,或者在某些時候,難以判斷哪些鍵值對已經超時。
五、使用Lua語言
Redis 命令的計算能力不算很強大,而使用Lua語言則在很大程度上彌補了 Redis 這個不足。只是在 Redis中,執行 Lua 語言是原子性的,也就是Redis執行Lua的時候是不會被中斷的。
Redis支持閬中方式運行Lua,一種是直接輸入;另外一種是將 Lua 語言編寫成文件。
1.執行輸入Lua程序代碼
eval lua-script key-num [key1 key2 key3 ...] [value1 value2 value3 ...] eval:執行Lua語言的命令 Lua-script:代表Lua語言腳本 key-num:代表參數中有多少個key,沒有為0 [key1 key2 key3 ...]:以key為參數 [value1 value2 value3 ...]:將這些參數傳遞給Lua
例如:
127.0.0.1:6379> eval "return ‘hello java‘" 0 "hello java" 127.0.0.1:6379> eval "redis.call(‘set‘,KEYS[1],ARGV[1])" 1 lua-key lua-value (nil) 127.0.0.1:6379> get lua-key "lua-value"
有時可能需要多次執行同一段腳本,在Redis中腳本會通過SHA-1簽名算法加密腳本,返回一個標識字符串,可以通過這個字符串執行加密後的腳本。這樣的好處是,如果腳本很長,從客戶端傳輸可能需要很長的時間,那麽使用標識字符串,則只需要傳遞32位字符串即可,這樣可以提高傳輸的效率,從而提高性能。
127.0.0.1:6379> script load "redis.call(‘set‘,KEYS[1],ARGV[1])" "7cfb4342127e7ab3d63ac05e0d3615fd50b45b06" 127.0.0.1:6379> evalsha 7cfb4342127e7ab3d63ac05e0d3615fd50b45b06 1 sha-key sha-value (nil) 127.0.0.1:6379> get sha-key "sha-value"
2.在Spring 中使用 Lua 腳本存儲簡單字符串
public static void testLuaScript() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); Jedis jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection(); // 執行簡單的腳本 String helloJava = (String) jedis.eval("return ‘hello java‘"); System.out.println(helloJava); // 執行帶參數的腳本 jedis.eval("redis.call(‘set‘,KEYS[1], ARGV[1])", 1, "lua-key", "lua-value"); String luaKey = (String) jedis.get("lua-key"); System.out.println(luaKey); // 緩存腳本,返回SHA1簽名標識字符串 String sha1 = jedis.scriptLoad("redis.call(‘set‘,KEYS[1], ARGV[1])"); // 執行腳本 jedis.evalsha(sha1, 1, new String[] { "sha-key", "sha-val" }); // 獲取執行腳本後的數據 String shaVal = jedis.get("sha-key"); System.out.println(shaVal); // ?關閉連接 jedis.close(); }
3.在Spring中使用Lua腳本存儲對象
Spring 提供了 RedisScript 接口和一個實現類 DefaultRedisScript ,通過這個對象就可以通過Lua腳本操作對象。
public static void testRedisScript() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); // 定義默認腳本封裝類 DefaultRedisScript<Role> redisScript = new DefaultRedisScript<Role>(); // 設置腳本 redisScript.setScriptText("redis.call(‘set‘, KEYS[1], ARGV[1]) return redis.call(‘get‘, KEYS[1])"); // 定義操作的key列表 List<String> keyList = new ArrayList<String>(); keyList.add("role1"); // 需要序列化保存和讀取的對象 Role role = new Role(); role.setId(1L); role.setRoleName("role_name_1"); role.setNote("note_1"); // 獲得標識字符串 String sha1 = redisScript.getSha1(); System.out.println(sha1); // 設置返回結果類型為Role類型 redisScript.setResultType(Role.class); // 使用JdkSerializationRedisSerializer進行序列化 JdkSerializationRedisSerializer serializer = new JdkSerializationRedisSerializer(); // 執行腳本 // DefaultRedisScript接口對象,參數序列化器,結果序列化器,key列表,參數列表 Role obj = (Role) redisTemplate.execute(redisScript, serializer, serializer, keyList, role); // 打印返回結果 System.out.println(obj.getId()); }
返回:
731429de653665577edb661a6741c4083e103b77
1
4.執行Lua文件
新建Lua文件test.lua
redis.call(‘set‘, KEYS[1], ARGV[1]) redis.call(‘set‘, KEYS[2], ARGV[2]) local n1 = tonumber(redis.call(‘get‘, KEYS[1])) local n2 = tonumber(redis.call(‘get‘, KEYS[2])) if n1 > n2 then return 1 end if n1 == n2 then return 0 end if n1 < n2 then return 2 end
在命令行輸入 redis-cli --eval test.lua key1 key2 , 2 4 會返回:2
在 Spring 中,只能通過evalsha的方式執行Lua文件,例如:
public static void testLuaFile() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); // 讀入文件流 File file = new File("D:\\BaiduNetdiskDownload\\ssm\\Chapter19\\src\\test.lua"); byte[] bytes = getFileToByte(file); Jedis jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection(); // 發送二進制文件給Redis服務器,得到標識數組 byte[] sha1 = jedis.scriptLoad(bytes); // 傳遞參數,執行Lua文件 Object obj = jedis.evalsha(sha1, 2, "key1".getBytes(), "key2".getBytes(), "2".getBytes(), "4".getBytes()); System.out.println(obj); } /** * 把文件轉化為二進制數組 * * @param file * * @return 二進制數組 */ public static byte[] getFileToByte(File file) { byte[] by = new byte[(int) file.length()]; try { InputStream is = new FileInputStream(file); ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); byte[] bb = new byte[2048]; int ch; ch = is.read(bb); while (ch != -1) { bytestream.write(bb, 0, ch); ch = is.read(bb); } by = bytestream.toByteArray(); } catch (Exception ex) { ex.printStackTrace(); } return by; }
六、在Spring中使用 Redis 哨兵模式
1.配置文件
主服務器192.168.11.128,兩個從服務器192.168.11.129、192.168.11.130。
然後在三臺機器上分別啟動哨兵服務。
<?xml version=‘1.0‘ encoding=‘UTF-8‘ ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> <!--配置Redis連接池 --> <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="50" /> <!--最大空閑數 --> <property name="maxTotal" value="100" /> <!--最大連接數 --> <property name="maxWaitMillis" value="3000" /> <!--最大等待時間3s --> </bean> <!--jdk序列化器,可保存對象 --> <bean id="jdkSerializationRedisSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" /> <!--String序列化器 --> <bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer" /> <!--哨兵配置 --> <bean id="sentinelConfig" class="org.springframework.data.redis.connection.RedisSentinelConfiguration"> <!--服務名稱 --> <property name="master"> <bean class="org.springframework.data.redis.connection.RedisNode"> <property name="name" value="mymaster" /> </bean> </property> <!--哨兵服務IP和端口 --> <property name="sentinels"> <set> <bean class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg name="host" value="192.168.11.128" /> <constructor-arg name="port" value="26379" /> </bean> <bean class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg name="host" value="192.168.11.129" /> <constructor-arg name="port" value="26379" /> </bean> <bean class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg name="host" value="192.168.11.130" /> <constructor-arg name="port" value="26379" /> </bean> </set> </property> </bean> <!--連接池設置 --> <bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg name="sentinelConfig" ref="sentinelConfig" /> <constructor-arg name="poolConfig" ref="poolConfig" /> <property name="password" value="abcdefg" /> </bean> <!--配置RedisTemplate --> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="keySerializer" ref="stringRedisSerializer" /> <property name="defaultSerializer" ref="stringRedisSerializer" /> <property name="valueSerializer" ref="jdkSerializationRedisSerializer" /> </bean> </beans>
2.驗證哨兵模式
關閉192.168.11.128主服務其上的Redis服務,然後3分鐘後,哨兵會進行投票切換新的主機,然後執行下面的方法。
public static void testSpringSentinel() { ApplicationContext ctx = new ClassPathXmlApplicationContext("com/ssm/chapter20/config/spring-cfg.xml"); RedisTemplate redisTemplate = ctx.getBean(RedisTemplate.class); String retVal = (String) redisTemplate.execute((RedisOperations ops) -> { ops.boundValueOps("mykey").set("myvalue"); String value = (String) ops.boundValueOps("mykey").get(); return value; }); System.out.println(retVal); }
七、Spring 緩存機制和Redis的結合
Redis(十五)Redis 的一些常用技術(Spring 環境下)