1. 程式人生 > 實用技巧 >redis 2 基本使用+Pipeline+事務

redis 2 基本使用+Pipeline+事務

一 redis key的設計

越短,而且要完整表達含義,可以縮寫,但必須文件留存好說明

user:001

tm:order:001 order:1

一般以業務,功能模組或者表名開頭,後跟主鍵(或能表示資料唯一性的值)

二 客戶端連線redis

普通實現

import redis.clients.jedis.Jedis;
public class jedistest {
public static void main(String[] args) {
try {
     String host = "xx.kvstore.aliyuncs.com";//控制檯顯示訪問地址
     int port = 6379
; //直接new一個jedis物件 Jedis jedis = new Jedis(host, port); //鑑權資訊 jedis.auth("password");//password String key = "redis"; String value = "aliyun-redis"; //select db預設為0 jedis.select(1); //set一個key jedis.set(key, value); System.out.println("Set Key " + key + "
Value: " + value); //get 設定進去的key String getvalue = jedis.get(key); System.out.println("Get Key " + key + " ReturnValue: " + getvalue); jedis.quit(); jedis.close(); } catch (Exception e) { e.printStackTrace(); } } }
View Code

使用了連線池

package com.springboot.demo.base.config;
 
import lombok.
extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @Configuration @PropertySource("classpath:redis.properties") @Slf4j public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.timeout}") private int timeout; @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.jedis.pool.max-wait}") private long maxWaitMillis; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.block-when-exhausted}") private boolean blockWhenExhausted; @Bean public JedisPool redisPoolFactory() throws Exception{ log.info("JedisPool注入成功!!"); log.info("redis地址:" + host + ":" + port); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxWaitMillis(maxWaitMillis); // 連線耗盡時是否阻塞, false報異常,ture阻塞直到超時, 預設true jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted); // 是否啟用pool的jmx管理功能, 預設true jedisPoolConfig.setJmxEnabled(true); JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password); return jedisPool; } }
View Code

從連線池中獲取jedis物件

/**
     * 通過key獲取儲存在redis中的value
     * 並釋放連線
     *
     * @param key
     * @param indexdb 選擇redis庫 0-15
     * @return 成功返回value 失敗返回null
     */
    public String get(String key,int indexdb) {
        Jedis jedis = null;
        String value = null;
        try {
            jedis = jedisPool.getResource();
            jedis.select(indexdb);
            value = jedis.get(key);
            log.info(value);
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
                        //歸還redis物件
            returnResource(jedisPool, jedis);
                        //或者 jedis.close()
        }
        return value;
    }
View Code

redis的resp協議,無論是傳輸還是儲存,都是按這個資料格式

如 set name james ----> resp協議包 -----> redis 服務端

resp協議包的格式如下

*3   //組數  
$3   //欄位長度
set  // 欄位名
$4   //key長度
name  //key的欄位名
$5     //值的長度
james  //

手寫redis

   //基於tcp協議  socket實現 redis客戶端  
  public static String set(Socket socket, String key, String value) throws Exception {
        StringBuilder str = new StringBuilder();

        //1個resp協議格式的資料
        str.append("*3").append("\r\n"); //組數
        str.append("$3").append("\r\n");//欄位長度
        str.append("set").append("\r\n");//欄位
        str.append("$").append(key.getBytes().length).append("\r\n");//key長度
        str.append(key).append("\r\n"); //key 欄位名
        str.append("$").append(value.getBytes().length).append("\r\n");//value長度
        str.append(value).append("\r\n"); //value的值
     
        socket.getOutputStream().write(str.toString().getBytes());
        byte[] response = new byte[2048];
        socket.getInputStream().read(response);
        return new String(response);
    }
View Code

三 Pipeline詳解

1 pipeline出現的背景:
redis客戶端執行一條命令分4個過程:

傳送命令--->命令排隊--->命令執行--->返回結果

這個過程稱為Round trip time(簡稱RTT, 往返時間),mget mset等批量此操作有效節約了RTT,但大部分命令(如hgetall,並沒有mhgetall)不支援批量操作,需要消耗N次RTT ,這個時候需要pipeline來解決這個問題。

2 pepeline的效能

1) 未使用pipeline執行N條命令

特別頻繁的redis操作,大部分時間都是花在了網路上,網路時間比redis執行時間要長很多,特別是異地機房更加明顯

2)、使用了pipeline執行N條命令

3)、兩者效能對比

4 )具體實現

@Test
    public void pipeCompare() {
        Jedis redis = new Jedis("192.168.1.111", 6379);
        redis.auth("12345678");//授權密碼 對應redis.conf的requirepass密碼
        Map<String, String> data = new HashMap<String, String>();
        redis.select(8);//使用第8個庫
        redis.flushDB();//清空第8個庫所有資料
        // hmset
        long start = System.currentTimeMillis();
        // 直接hmset   迴圈10000次往redis裡寫資料
        for (int i = 0; i < 10000; i++) {
            data.clear();  //清空map
            data.put("k_" + i, "v_" + i);
            redis.hmset("key_" + i, data); //迴圈執行10000條資料插入redis
        }
        long end = System.currentTimeMillis();
        System.out.println("    共插入:[" + redis.dbSize() + "]條 .. ");
        System.out.println("1,未使用PIPE批量設值耗時" + (end - start) / 1000 + "秒..");
        redis.select(8);
        redis.flushDB();


        // 使用pipeline 
        Pipeline pipe = redis.pipelined();
        start = System.currentTimeMillis();
        //迴圈10000次,值都放到了Pileline裡了
        for (int i = 0; i < 10000; i++) {
            data.clear();
            data.put("k_" + i, "v_" + i);
            pipe.hmset("key_" + i, data); //將值封裝到PIPE物件,此時並未執行,還停留在客戶端
        }
        pipe.sync(); //將封裝後的PIPE一次性發給redis

        end = System.currentTimeMillis();
        System.out.println("    PIPE共插入:[" + redis.dbSize() + "]條 .. ");
        System.out.println("2,使用PIPE批量設值耗時" + (end - start) / 1000 + "秒 ..");
View Code

效能對比

使用了Pipeline效能高效了很多倍,可以優化吞吐量

四、原生批命令(mset, mget)與Pipeline對比

1 原生批命令是原子性,pipeline是非原子性

(原子性概念:一個事務是一個不可分割的最小工作單位,要麼都成功要麼都失敗。原子操作是指你的一個業務邏輯必須是不可拆分的. 處理一件事情要麼都成功,要麼都失敗,原子不可拆分)

2 原生批命令一命令多個key, 但pipeline支援多命令(存在事務),非原子性

3 原生批命令是服務端實現,而pipeline需要服務端與客戶端共同完成

五、Pipeline正確使用方式

使用pipeline組裝的命令個數不能太多,不然資料量過大,增加客戶端的等待時間,還可能造成網路阻塞,可以將大量命令的拆分多個小的pipeline命令完成。

六 具體實現

/**
     * 刪除多個字串key 並釋放連線
     * 
     * @param keys*
     * @return 成功返回value 失敗返回null
     */
    public boolean mdel(List<String> keys) {
        Jedis jedis = null;
        boolean flag = false;
        try {
            jedis = pool.getResource();//從連線借用Jedis物件
            Pipeline pipe = jedis.pipelined();//獲取jedis物件的pipeline物件
            for(String key:keys){
                pipe.del(key); //將多個key放入pipe刪除指令中
            }
            pipe.sync(); //執行命令,完全此時pipeline物件的遠端呼叫 
            flag = true;
        } catch (Exception e) {
            pool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            returnResource(pool, jedis);
        }
        return flag;
    }
View Code

七 Redis事務

Redis事務的概念:

  Redis 事務的本質是一組命令的集合。事務支援一次執行多個命令,一個事務中所有命令都會被序列化。在事務執行過程,會按照順序序列化執行佇列中的命令,其他客戶端提交的命令請求不會插入到事務執行命令序列中。

  總結說:redis事務就是一次性、順序性、排他性的執行一個佇列中的一系列命令。  

Redis事務沒有隔離級別的概念:

  批量操作在傳送 EXEC 命令前被放入佇列快取,並不會被實際執行,也就不存在事務內的查詢要看到事務裡的更新,事務外查詢不能看到。

Redis不保證原子性:

  Redis中,單條命令是原子性執行的,但事務不保證原子性,且沒有回滾。事務中任意命令執行失敗,其餘的命令仍會被執行。

Redis事務的三個階段:

  • 開始事務
  • 命令入隊
  • 執行事務

Redis事務相關命令:

  watch key1 key2 ... : 監視一或多個key,如果在事務執行之前,被監視的key被其他命令改動,則事務被打斷 ( 類似樂觀鎖 )

  multi : 標記一個事務塊的開始( queued )

  exec : 執行所有事務塊的命令 ( 一旦執行exec後,之前加的監控鎖都會被取消掉 ) 

  discard : 取消事務,放棄事務塊中的所有命令

  unwatch : 取消watch對所有key的監控

Redis事務使用案例:

(1)正常執行

(2)放棄事務

(3)若在事務佇列中存在命令性錯誤(類似於java編譯性錯誤),則執行EXEC命令時,所有命令都不會執行

(4)若在事務佇列中存在語法性錯誤(類似於java的1/0的執行時異常),則執行EXEC命令時,其他正確命令會被執行,錯誤命令丟擲異常。

(5)使用watch

案例一:使用watch檢測balance,事務期間balance資料未變動,事務執行成功

案例二:使用watch檢測balance,在開啟事務後(標註1處),在新視窗執行標註2中的操作,更改balance的值,模擬其他客戶端在事務執行期間更改watch監控的資料,然後再執行標註1後命令,執行EXEC後,事務未成功執行。

一但執行 EXEC 開啟事務的執行後,無論事務使用執行成功, WARCH 對變數的監控都將被取消。

故當事務執行失敗後,需重新執行WATCH命令對變數進行監控,並開啟新的事務進行操作。

總結:

  watch指令類似於樂觀鎖,在事務提交時,如果watch監控的多個KEY中任何KEY的值已經被其他客戶端更改,則使用EXEC執行事務時,事務佇列將不會被執行,同時返回Nullmulti-bulk應答以通知呼叫者事務執行失敗。