redis 2 基本使用+Pipeline+事務
一 redis key的設計
越短,而且要完整表達含義,可以縮寫,但必須文件留存好說明
user:001
tm:order:001 order:1
一般以業務,功能模組或者表名開頭,後跟主鍵(或能表示資料唯一性的值)
二 客戶端連線redis
普通實現
import redis.clients.jedis.Jedis; public class jedistest { public static void main(String[] args) { try { String host = "xx.kvstore.aliyuncs.com";//控制檯顯示訪問地址 int port = 6379View Code; //直接new一個jedis物件 Jedis jedis = new Jedis(host, port); //鑑權資訊 jedis.auth("password");//password String key = "redis"; String value = "aliyun-redis"; //select db預設為0 jedis.select(1); //set一個key jedis.set(key, value); System.out.println("Set Key " + key + "Value: " + value); //get 設定進去的key String getvalue = jedis.get(key); System.out.println("Get Key " + key + " ReturnValue: " + getvalue); jedis.quit(); jedis.close(); } catch (Exception e) { e.printStackTrace(); } } }
使用了連線池
package com.springboot.demo.base.config; import lombok.View Codeextern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @Configuration @PropertySource("classpath:redis.properties") @Slf4j public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.timeout}") private int timeout; @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.jedis.pool.max-wait}") private long maxWaitMillis; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.block-when-exhausted}") private boolean blockWhenExhausted; @Bean public JedisPool redisPoolFactory() throws Exception{ log.info("JedisPool注入成功!!"); log.info("redis地址:" + host + ":" + port); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxWaitMillis(maxWaitMillis); // 連線耗盡時是否阻塞, false報異常,ture阻塞直到超時, 預設true jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted); // 是否啟用pool的jmx管理功能, 預設true jedisPoolConfig.setJmxEnabled(true); JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password); return jedisPool; } }
從連線池中獲取jedis物件
/** * 通過key獲取儲存在redis中的value * 並釋放連線 * * @param key * @param indexdb 選擇redis庫 0-15 * @return 成功返回value 失敗返回null */ public String get(String key,int indexdb) { Jedis jedis = null; String value = null; try { jedis = jedisPool.getResource(); jedis.select(indexdb); value = jedis.get(key); log.info(value); } catch (Exception e) { log.error(e.getMessage()); } finally { //歸還redis物件 returnResource(jedisPool, jedis); //或者 jedis.close() } return value; }View Code
redis的resp協議,無論是傳輸還是儲存,都是按這個資料格式
如 set name james ----> resp協議包 -----> redis 服務端
resp協議包的格式如下
*3 //組數 $3 //欄位長度 set // 欄位名 $4 //key長度 name //key的欄位名 $5 //值的長度 james //值
手寫redis
//基於tcp協議 socket實現 redis客戶端 public static String set(Socket socket, String key, String value) throws Exception { StringBuilder str = new StringBuilder(); //1個resp協議格式的資料 str.append("*3").append("\r\n"); //組數 str.append("$3").append("\r\n");//欄位長度 str.append("set").append("\r\n");//欄位 str.append("$").append(key.getBytes().length).append("\r\n");//key長度 str.append(key).append("\r\n"); //key 欄位名 str.append("$").append(value.getBytes().length).append("\r\n");//value長度 str.append(value).append("\r\n"); //value的值 socket.getOutputStream().write(str.toString().getBytes()); byte[] response = new byte[2048]; socket.getInputStream().read(response); return new String(response); }View Code
三 Pipeline詳解
1 pipeline出現的背景:
redis客戶端執行一條命令分4個過程:
傳送命令--->命令排隊--->命令執行--->返回結果
這個過程稱為Round trip time(簡稱RTT, 往返時間),mget mset等批量此操作有效節約了RTT,但大部分命令(如hgetall,並沒有mhgetall)不支援批量操作,需要消耗N次RTT ,這個時候需要pipeline來解決這個問題。
2 pepeline的效能
1) 未使用pipeline執行N條命令
特別頻繁的redis操作,大部分時間都是花在了網路上,網路時間比redis執行時間要長很多,特別是異地機房更加明顯
2)、使用了pipeline執行N條命令
3)、兩者效能對比
4 )具體實現
@Test public void pipeCompare() { Jedis redis = new Jedis("192.168.1.111", 6379); redis.auth("12345678");//授權密碼 對應redis.conf的requirepass密碼 Map<String, String> data = new HashMap<String, String>(); redis.select(8);//使用第8個庫 redis.flushDB();//清空第8個庫所有資料 // hmset long start = System.currentTimeMillis(); // 直接hmset 迴圈10000次往redis裡寫資料 for (int i = 0; i < 10000; i++) { data.clear(); //清空map data.put("k_" + i, "v_" + i); redis.hmset("key_" + i, data); //迴圈執行10000條資料插入redis } long end = System.currentTimeMillis(); System.out.println(" 共插入:[" + redis.dbSize() + "]條 .. "); System.out.println("1,未使用PIPE批量設值耗時" + (end - start) / 1000 + "秒.."); redis.select(8); redis.flushDB(); // 使用pipeline Pipeline pipe = redis.pipelined(); start = System.currentTimeMillis(); //迴圈10000次,值都放到了Pileline裡了 for (int i = 0; i < 10000; i++) { data.clear(); data.put("k_" + i, "v_" + i); pipe.hmset("key_" + i, data); //將值封裝到PIPE物件,此時並未執行,還停留在客戶端 } pipe.sync(); //將封裝後的PIPE一次性發給redis end = System.currentTimeMillis(); System.out.println(" PIPE共插入:[" + redis.dbSize() + "]條 .. "); System.out.println("2,使用PIPE批量設值耗時" + (end - start) / 1000 + "秒 ..");View Code
效能對比
使用了Pipeline效能高效了很多倍,可以優化吞吐量
四、原生批命令(mset, mget)與Pipeline對比
1 原生批命令是原子性,pipeline是非原子性
(原子性概念:一個事務是一個不可分割的最小工作單位,要麼都成功要麼都失敗。原子操作是指你的一個業務邏輯必須是不可拆分的. 處理一件事情要麼都成功,要麼都失敗,原子不可拆分)
2 原生批命令一命令多個key, 但pipeline支援多命令(存在事務),非原子性
3 原生批命令是服務端實現,而pipeline需要服務端與客戶端共同完成
五、Pipeline正確使用方式
使用pipeline組裝的命令個數不能太多,不然資料量過大,增加客戶端的等待時間,還可能造成網路阻塞,可以將大量命令的拆分多個小的pipeline命令完成。
六 具體實現
/** * 刪除多個字串key 並釋放連線 * * @param keys* * @return 成功返回value 失敗返回null */ public boolean mdel(List<String> keys) { Jedis jedis = null; boolean flag = false; try { jedis = pool.getResource();//從連線借用Jedis物件 Pipeline pipe = jedis.pipelined();//獲取jedis物件的pipeline物件 for(String key:keys){ pipe.del(key); //將多個key放入pipe刪除指令中 } pipe.sync(); //執行命令,完全此時pipeline物件的遠端呼叫 flag = true; } catch (Exception e) { pool.returnBrokenResource(jedis); e.printStackTrace(); } finally { returnResource(pool, jedis); } return flag; }View Code
七 Redis事務
Redis事務的概念:
Redis 事務的本質是一組命令的集合。事務支援一次執行多個命令,一個事務中所有命令都會被序列化。在事務執行過程,會按照順序序列化執行佇列中的命令,其他客戶端提交的命令請求不會插入到事務執行命令序列中。
總結說:redis事務就是一次性、順序性、排他性的執行一個佇列中的一系列命令。
Redis事務沒有隔離級別的概念:
批量操作在傳送 EXEC 命令前被放入佇列快取,並不會被實際執行,也就不存在事務內的查詢要看到事務裡的更新,事務外查詢不能看到。
Redis不保證原子性:
Redis中,單條命令是原子性執行的,但事務不保證原子性,且沒有回滾。事務中任意命令執行失敗,其餘的命令仍會被執行。
Redis事務的三個階段:
- 開始事務
- 命令入隊
- 執行事務
Redis事務相關命令:
watch key1 key2 ... : 監視一或多個key,如果在事務執行之前,被監視的key被其他命令改動,則事務被打斷 ( 類似樂觀鎖 )
multi : 標記一個事務塊的開始( queued )
exec : 執行所有事務塊的命令 ( 一旦執行exec後,之前加的監控鎖都會被取消掉 )
discard : 取消事務,放棄事務塊中的所有命令
unwatch : 取消watch對所有key的監控
Redis事務使用案例:
(1)正常執行
(2)放棄事務
(3)若在事務佇列中存在命令性錯誤(類似於java編譯性錯誤),則執行EXEC命令時,所有命令都不會執行
(4)若在事務佇列中存在語法性錯誤(類似於java的1/0的執行時異常),則執行EXEC命令時,其他正確命令會被執行,錯誤命令丟擲異常。
(5)使用watch
案例一:使用watch檢測balance,事務期間balance資料未變動,事務執行成功
案例二:使用watch檢測balance,在開啟事務後(標註1處),在新視窗執行標註2中的操作,更改balance的值,模擬其他客戶端在事務執行期間更改watch監控的資料,然後再執行標註1後命令,執行EXEC後,事務未成功執行。
一但執行 EXEC 開啟事務的執行後,無論事務使用執行成功, WARCH 對變數的監控都將被取消。
故當事務執行失敗後,需重新執行WATCH命令對變數進行監控,並開啟新的事務進行操作。
總結:
watch指令類似於樂觀鎖,在事務提交時,如果watch監控的多個KEY中任何KEY的值已經被其他客戶端更改,則使用EXEC執行事務時,事務佇列將不會被執行,同時返回Nullmulti-bulk應答以通知呼叫者事務執行失敗。