1. 程式人生 > >Redis學習三(進階功能).

Redis學習三(進階功能).

一、排序

redis 支援對 list,set 和 zset 元素的排序,排序的時間複雜度是 O(N+M*log(M))。(N 是集合大小,M 為返回元素的數量)

sort key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC|DESC] [ALPHA] [STORE destination]
  • [BY pattern]:sort 命令預設使用集合元素進行排序,可以通過 “BY pattern” 使用外部 key 的資料作為權重排序。
  • [LIMIT offset count]:排序之後返回元素的數量可以通過 LIMIT 修飾符進行限制,修飾符接受 offset (要跳過的元素數量,即起始位置)和 count (返回的元素數量)兩個引數。
  • [GET pattern [GET pattern ...]]:get 可以根據排序的結果來取出相應的鍵值,“get #” 表示返回自身元素,“get pattern” 可以返回外部 key 的資料 。
  • [ASC|DESC] [ALPHA]:選擇按照順序、逆序或者字串排序,set 集合(本身沒有索引值)排序操作必須指定 ALPHA。
  • [STORE destination]:預設情況下, sort 操作只是簡單地返回排序結果,並不進行任何儲存操作。通過給 store 選項指定一個 key 引數,可以將排序結果儲存到給定的鍵上。

假設現在有使用者資料如下:

127.0.0.1:6379> sadd uid 1 2 3 4
127.0.0.1:6379> mset user_name_1 admin user_level_1 9999
127.0.0.1:6379> mset user_name_2 jack user_level_2 10
127.0.0.1:6379> mset user_name_3 peter user_level_3 25
127.0.0.1:6379> mset user_name_4 mary user_level_4 70

首先,直接利用集合內的元素做排序操作:

127.0.0.1:6379> sort uid
1) "1"
2) "2"
3) "3"
4) "4"

接著,我們來試試 [BY pattern] 和 [GET pattern [GET pattern ...]] 操作:

127.0.0.1:6379> sort uid by user_name_* get # get user_name_* get user_level_* alpha
 1) "1"
 2) "admin"
 3) "9999"
 4) "2"
 5) "jack"
 6) "10"
 7) "4"
 8) "mary"
 9) "70"
10) "3"
11) "peter"
12) "25"

這個語句有點晦澀,試著這麼理解 “by user_name_* ”, user_name_* 是一個佔用符,它先取出 uid 中的值,然後再用這個值拼接成外部鍵,而真正進行排序的正是這些外部鍵值;“get # get user_name_* get user_level_* ” 的含義也可以這麼理解,get # 表示返回自己元素,[get pattern] 表示返回外部鍵值。

二、事務

redis 的事務機制主要是由下面的幾個指令來完成:

  • multi:標記一個事務塊的開始
  • exec:執行所有事務塊中的命令
  • discard:取消事務,放棄執行事務塊中的所有指令
  • watch key [key...]:監視一個或多個 key,如果在事務執行之前這個(或這些key)被其他命令所改動,這個改動也被稱為 CAS 錯誤,那麼事務將被打斷
  • unwatch:取消 watch 命令對所有 key 的監視

當 redis 接受到 multi 指令時,這個連線會進入一個事務上下文,該連線後續的命令並不是立即執行,而是先放到一個佇列中;當從連線受到 exec 命令後,redis 會順序的執行佇列中的所有命令。並將所有命令的執行結果打包到一起返回給 client。然後此連線就結束事務上下文。

redis 將是否有 watch 命令分為普通型別事務和 CAS(Check And Set)型別事務,無 watch 命令的為普通型別事務,有 watch 命令的為 CAS型別事務。

事務失敗的原因可以分為靜態錯誤(如不存在的命令)和執行時錯誤(如 CAS 錯誤、對 string 用 lpop 操作等)。靜態錯誤會在提交 exec 時返回錯誤資訊,使事務不能執行;而除 CAS 以外的執行時錯誤不會阻止事務繼續執行。因此,Redis 的事務機制並不具有原子性。

127.0.0.1:6379> multi
OK
127.0.0.1:6379> lpush list java
QUEUED
127.0.0.1:6379> scard list
QUEUED
127.0.0.1:6379> exec
1) (integer) 1
2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> lrange list 0 -1
1) "java"

可以看到,即使命令錯誤,事務依然沒有被回滾。因此,redis 的事務機制過於原始,不建議使用。

需要注意的是,如果我們使用 AOF 的方式持久化,可能存在事務被部分寫入的情況(事務執行過程中 redis 掛掉等)從而導致 redis 啟動失敗退出,可以使用 redis-check-aof 工具進行修復。

三、流水線(pipeline)

在事務中 redis 提供了佇列,可以批量執行任務,這樣效能就比較高,但使用 multi…exec 事務命令是有系統開銷的,因為它會檢測對應的鎖和序列化命令。有時我們希望在沒有任何附加條件的情況下使用佇列批量執行一系列命令,這時可以使用 redis的流水線(pipeline)技術。

看看如何在 spring-data-redis 中使用 pipeline 功能,需要注意的是 RedisTemplate 的序列化需要使用 StringRedisSerializer,不能使用 JdkSerializationRedisSerializer,否則會導致序列化失敗。

    @Test
    public void pipeline() {
        // 1.executePipelined 重寫 入參 RedisCallback 的doInRedis方法
        List<Object> resultList = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
                // 2.redisConnection 給本次管道內新增 要一次性執行的多條命令
                // 2.1 一個set操作
                redisConnection.set("hello".getBytes(), "world".getBytes());
                // 2.2一個批量mset操作
                Map<byte[], byte[]> tuple = new HashMap();
                tuple.put("m_hello_1".getBytes(), "m_world_1".getBytes());
                tuple.put("m_hello_2".getBytes(), "m_world_2".getBytes());
                tuple.put("m_hello_3".getBytes(), "m_world_3".getBytes());
                redisConnection.mSet(tuple);
                // 2.3一個get操作
                redisConnection.get("m_hello_1".getBytes());
                // 3 這裡一定要返回null,最終pipeline的執行結果,才會返回給最外層
                return null;
            }
        });
        // 4. 最後對redis pipeline管道操作返回結果進行判斷和業務補償
        for (Object str : resultList) {
            System.out.println(str);
        }
    }

四、釋出訂閱

繼 RxJava、Reactor、CompletableFuture 以及 Spring 的事件驅動模型後,我們又要接觸一種觀察者模式啦!redis 作為一個pub/sub server,在訂閱者和釋出者之間起到了訊息路由的功能。訂閱者可以通過 subscribe 和 psubscribe 命令向 redis server 訂閱自己感興趣的channel ;釋出者通過 publish 命令向 redis server 的 channel 傳送訊息,訂閱該 channel 的全部 client 都會收到此訊息。一個 client 可以訂閱多個 channel,也可以向多個 channel 傳送訊息。

訂閱 channel:

127.0.0.1:6379> subscribe channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1
1) "message"
2) "channel"
3) "Hello,World"

釋出 channel 訊息:

127.0.0.1:6379> publish channel Hello,World
(integer) 1

接下來我們來看看在 spring-data-redis 中如何實現釋出訂閱功能。首先我們需要一個訊息監聽器,只要讓它實現 MessageListener 介面即可:

public class ChannelListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        System.out.println("channel is:" + new String(message.getChannel()));
        System.out.println("channel content:" + new String(message.getBody()));
    }
}

那麼,下面怎麼做呢?當然是把訊息監聽器和 channel 繫結在一起,讓訊息監聽器知道處理哪個 channel 的訊息:

    /**
     * redis 訊息監聽器容器, 繫結訊息監聽器和 channel
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //訂閱了一個叫 channel 的通道
        container.addMessageListener(channelAdapter(), new PatternTopic("channel"));
        //這個 container 可以新增多個 messageListener
        return container;
    }

    /**
     * 訊息監聽器介面卡
     */
    @Bean
    public MessageListenerAdapter channelAdapter() {
        return new MessageListenerAdapter(new ChannelListener());
    }

接下來,讓我們試著往這個 channel 釋出一個訊息吧!

    @Test
    public void publish() {
        redisTemplate.convertAndSend("channel", "Hello,World");
    }