1. 程式人生 > 實用技巧 >《Redis使用手冊》筆記 —— 程式碼均使用Python實現

《Redis使用手冊》筆記 —— 程式碼均使用Python實現

本書配套程式碼

本書配套程式碼

Redis的特色與請求響應

Python操作redis及相關彙總

Python操作redis
個人總結合集
Mac中redis的安裝配置及圖形化工具的下載與使用

字串

字串命令補充

1、GETSET

GETSET命令就像GET命令和SET命令的組合版本,GETSET首先獲取字串鍵目前已有的值,接著為鍵設定新值,最後把之前獲取到的舊值返回給使用者:

127.0.0.1:6379> set name1 whw
OK
127.0.0.1:6379> getset name1 www
"whw"
127.0.0.1:6379> get name1
"www"

2、MSETNX

MSETNX與MSET的主要區別在於,MSETNX只會在所有給定鍵都不存在的情況下對鍵進行設定,而不會像MSET那樣直接覆蓋鍵已有的值:如果在給定鍵當中,即使有一個鍵已經有值了,那麼MSETNX命令也會放棄對所有給定鍵的設定操作。MSETNX命令在成功執行設定操作時返回1,在放棄執行設定操作時則返回0。

鎖是一種同步機制,用於保證一項資源在任何時候只能被一個程序使用,如果有其他程序想要使用相同的資源,那麼就必須等待,直到正在使用資源的程序放棄使用權為止。

一個鎖的實現通常會有獲取(acquire)和釋放(release)這兩種操作:

  • 獲取操作用於取得資源的獨佔使用權。在任何時候,最多隻能有一個程序取得鎖,我們把成功取得鎖的這個程序稱為鎖的持有者。在鎖已經被持有的情況下,所有嘗試再次獲取鎖的操作都會失敗。
  • 釋放操作用於放棄資源的獨佔使用權,一般由鎖的持有者呼叫。在鎖被釋放之後,其他程序就可以再次嘗試獲取這個鎖了。

程式碼清單展示了一個使用字串鍵實現的鎖程式,這個程式會根據給定的字串鍵是否有值來判斷鎖是否已經被獲取,而針對鎖的獲取操作和釋放操作則是分別通過設定字串鍵和刪除字串鍵來完成的。

from redis import Redis


VALUE_OF_LOCK = "locking"

class Lock(object):

    def __init__(self,client,key):
        self.client = client
        self.key = key

    def acquire(self):
        """
        嘗試獲取鎖 成功返回True 失敗返回False
        """
        # nx選項的值確保了代表鎖的字串鍵只會在 沒有值 的情況下被設定
        result = self.client.set(self.key,VALUE_OF_LOCK,nx=True)
        ret = result is True
        return ret

    def release(self):
        """
        嘗試釋放鎖 刪除成功返回True 刪除
        失敗返回False
        """
        ret = self.client.delete(self.key)
        res = (ret == 1)
        return res
        
client = Redis(decode_response=True)
lock = Lock(client,"test_lock")

lock.acquire() # 成功獲取鎖  True
lock.acquire() # 鎖已被獲取 無法再次獲取 False

lock.release() # 釋放鎖
lock.acquire() # 鎖釋放後還能被獲取 True

NX選項的值確保了代表鎖的字串鍵只會在沒有值的情況下被設定:

result = self.client.set(self.key,VALUE_OF_LOCK,nx=True)
  • 如果給定的字串鍵沒有值,那麼說明鎖尚未被獲取,SET命令將執行設定操作,並將result變數的值設定為True。
  • 如果給定的字串鍵已經有值了,那麼說明鎖已經被獲取,SET命令將放棄執行設定操作,並將result變數的值設定為None。

acquire()方法最後會通過檢查ret變數的值是否為True來判斷自己是否成功取得了鎖。

因為Redis的DEL命令和Python的del關鍵字重名,所以在redis-py客戶端中,執行DEL命令實際上是通過呼叫delete()方法來完成的:

ret = self.client.delete(self.key)
res = (ret == 1)
return res

release()方法通過檢查delete()方法的返回值是否為1來判斷刪除操作是否執行成功:如果使用者嘗試對一個尚未被獲取的鎖執行release()方法,那麼方法將返回false,表示沒有鎖被釋放。

上述程式碼存在問題:

  • 因為這個鎖的釋放操作無法驗證程序的身份,所以無論執行釋放操作的程序是否為鎖的持有者,鎖都會被釋放。如果鎖被持有者以外的其他程序釋放,那麼系統中可能會同時出現多個鎖,導致鎖的唯一性被破壞。
  • 這個鎖的獲取操作不能設定最大加鎖時間,因而無法讓鎖在超過給定的時限之後自動釋放。因此,如果持有鎖的程序因為故障或者程式設計錯誤而沒有在退出之前主動釋放鎖,那麼鎖就會一直處於已被獲取的狀態,導致其他程序永遠無法取得鎖。

快取文章資訊

在構建應用程式的時候,我們經常會需要批量地設定和獲取多項資訊。以部落格程式為例:

  • 當用戶想要註冊部落格時,程式就需要把使用者的名字、賬號、密碼、註冊時間等多項資訊儲存起來,並在使用者登入的時候取出這些資訊。
  • 當用戶想在部落格中撰寫一篇新文章的時候,程式就需要把文章的標題、內容、作者、發表時間等多項資訊儲存起來,並在使用者閱讀文章的時候取出這些資訊。

通過使用MSET命令、MSETNX命令以及MGET命令,我們可以實現上面提到的這些批量設定操作和批量獲取操作。比如程式碼清單就展示了一個文章儲存程式,這個程式使用MSET命令和MSETNX命令將文章的標題、內容、作者、發表時間等多項資訊儲存到不同的字串鍵中,並通過MGET命令從這些鍵裡面獲取文章的各項資訊。

# -*- coding:utf-8 -*-
from time import time  # time() 函式用於獲取當前 Unix 時間戳

class Article:

    def __init__(self, client, article_id):
        self.client = client
        self.id = str(article_id)
        self.title_key = "article::" + self.id + "::title"
        self.content_key = "article::" + self.id + "::content"
        self.author_key = "article::" + self.id + "::author"
        self.create_at_key = "article::" + self.id + "::create_at"

    def create(self, title, content, author):
        """
        建立一篇新的文章,建立成功時返回 True ,
        因為文章已存在而導致建立失敗時返回 False 。
        """
        article_data = {
            self.title_key: title,
            self.content_key: content,
            self.author_key: author,
            self.create_at_key: time()
        }
        return self.client.msetnx(article_data)

    def get(self):
        """
        返回 ID 對應的文章資訊。
        """
        result = self.client.mget(self.title_key,
                                  self.content_key,
                                  self.author_key,
                                  self.create_at_key)
        return {"id": self.id, "title": result[0], "content": result[1],
                "author": result[2], "create_at": result[3]}

    def update(self, title=None, content=None, author=None):
        """
        對文章的各項資訊進行更新,
        更新成功時返回 True ,失敗時返回 False 。
        """
        article_data = {}
        if title is not None:
            article_data[self.title_key] = title
        if content is not None:
            article_data[self.content_key] = content
        if author is not None:
            article_data[self.author_key] = author
        return self.client.mset(article_data)

    def get_content_len(self):
        """
        返回文章內容的位元組長度。
        """
        return self.client.strlen(self.content_key)

    def get_content_preview(self, preview_len):
        """
        返回指定長度的文章預覽內容。
        """
        start_index = 0
        end_index = preview_len-1
        return self.client.getrange(self.content_key, start_index, end_index)

鍵的命名格式

Article程式使用了多個字串鍵去儲存文章資訊,並且每個字串鍵的名字都是以article::<id>::<attribute>格式命名的,這是一種Redis使用慣例:

Redis使用者通常會為邏輯上相關聯的鍵設定相同的字首,並通過分隔符來區分鍵名的各個部分,以此來構建一種鍵的命名格式。

比如對於article::10086::title、article::10086::author這些鍵來說,article字首表明這些鍵都儲存著與文章資訊相關的資料,而分隔符“::”則區分開了鍵名裡面的字首、ID以及具體的屬性。除了“::”符號之外,常用的鍵名分隔符還包括“.”符號,比如article.10086.title;或者“->”符號,比如article->10086->title;以及“|”符號,比如article|10086|title等。

分隔符的選擇通常只取決於個人喜好,而鍵名的具體格式也可以根據需要進行構造,比如,如果不喜歡article::<id>::<attribute>格式,那麼也可以考慮使用article::<attribute>::<id>格式,諸如此類。唯一需要注意的是,一個程式應該只使用一種鍵名分隔符,並且持續地使用同一種鍵名格式,以免造成混亂。

通過使用相同的格式去命名邏輯上相關聯的鍵,我們可以讓程式產生的資料結構變得更容易被理解,並且在需要的時候,還可以根據特定的鍵名格式在資料庫裡面以模式匹配的方式查詢指定的鍵。

給文章儲存程式加上文章長度計數功能和文章預覽功能

在前面的內容中,我們使用MSET、MGET等命令構建了一個儲存文章資訊的程式,在學習了STRLEN命令和GETRANGE命令之後,我們可以給這個文章儲存程式加上兩個新功能,其中一個是文章長度計數功能,另一個則是文章預覽功能。

文章長度計數功能用於顯示文章內容的長度,讀者可以通過這個長度值來了解一篇文章大概有多長,從而決定是否繼續閱讀。

文章預覽功能則用於顯示文章開頭的一部分內容,這些內容可以幫助讀者快速地瞭解文章大意,並吸引讀者進一步閱讀整篇文章。

程式碼清單2展示了這兩個功能的具體實現程式碼,其中文章長度計數功能是通過對文章內容執行STRLEN命令來實現的,文章預覽功能是通過對文章內容執行GETRANGE命令來實現的。

class Article(object):
    
    # 省略之前的 init create update等方法
    
    # 返回文章內容的位元組長度
    def get_content_len(self):
        return self.clieent.strlen(self.content_key)
    
    # 返回指定長度的文章預覽內容
    def get_content_preview(self,preview_len):
        start_index = 0
        end_index = preview_len - 1
        return self.client.getrange(self.content_key,start_index,end_index)

APPEND:追加新內容到值的末尾

通過呼叫APPEND命令,使用者可以將給定的內容追加到字串鍵已有值的末尾:

APPEND key sjffix

APPEND命令在執行追加操作之後,會返回字串值當前的長度作為命令的返回值。

redis> get name1
"whw"
redis> append name1 "666"
(integer) 6

處理不存在的key

如果使用者給定的鍵並不存在,那麼APPEND命令會先將鍵的值初始化為空字串"",然後再執行追加操作。

使用字串鍵儲存數字值

每當使用者將一個值儲存到字串鍵裡面的時候,Redis都會對這個值進行檢測,如果這個值能夠被解釋為以下兩種型別的其中一種,那麼Redis就會把這個值當作數字來處理:

  • 第一種型別是能夠使用C語言的long long int型別儲存的整數,在大多數系統中,這種型別儲存的都是64位長度的有符號整數,取值範圍介於-9223372036854775808和9223372036854775807之間。
  • 第二種型別是能夠使用C語言的long double型別儲存的浮點數,在大多數系統中,這種型別儲存的都是128位長度的有符號浮點數,取值範圍介於3.36210314311209350626e-4932和1.18973149535723176502e+4932L之間。

增減

INCRBYDECRBY:對整數值執行加法操作和減法操作

key不存在時

ID生成器

在構建應用程式的時候,我們經常會用到各式各樣的ID(identif ier,識別符號)。比如,儲存使用者資訊的程式在每次出現一個新使用者的時候就需要建立一個新的使用者ID,而部落格程式在作者每次發表一篇新文章的時候也需要建立一個新的文章ID。

ID通常會以數字形式出現,並且通過遞增的方式來創建出新的ID。比如,如果當前最新的ID值為10086,那麼下一個ID就應該是10087,再下一個ID則是10088,以此類推。

程式碼清單展示了一個使用字串鍵實現的ID生成器,這個生成器通過執行INCR命令來產生新的ID,並且可以通過執行SET命令來保留指定數字之前的ID,從而避免使用者為了得到某個指定的ID而生成大量無效ID。

class IdGenerator(object):

    def __init__(self,client,key):
        self.client = client
        self.key = key

    # 生成並返回下一個ID
    def produce(self):
        return self.client.incr(self.key)

    # 保留前n個ID 使得之後執行的produce方法產生的ID都大於n 
    # 為了避免produce方法產生重複ID 這個方法只能在produce方法和reserve方法沒有執行過的情況下使用
    # 這個方法在ID被成功保留時返回True 在produce方法或reserve方法已經執行過而保留失敗時返回False
    def reserve(self,n):
        ret = self.client.set(self.key,n,nx=True)
        return ret is True

*計數器 —— 沒有加鎖

除了ID生成器之外,計數器也是構建應用程式時必不可少的元件之一,如對於網站的訪客數量、使用者執行某個操作的次數、某首歌或者某個視訊的播放量、論壇帖子的回覆數量等,記錄這些資訊都需要用到計數器。實際上,計數器在網際網路中幾乎無處不在,因此如何簡單、高效地實現計數器一直都是構建應用程式時經常會遇到的一個問題。

程式碼清單展示了一個計數器實現,這個程式把計數器的值儲存在一個字串鍵裡面,並通過INCRBY命令和DECRBY命令對計數器的值執行加法操作和減法操作,在需要時,使用者還可以通過呼叫GETSET方法來清零計數器並取得清零之前的舊值。

# -*- coding:utf-8 -*-
class Counter:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def increase(self, n=1):
        """
        將計數器的值加上 n ,然後返回計數器當前的值。
        如果使用者沒有顯式地指定 n ,那麼將計數器的值加上一。
        """
        return self.client.incr(self.key, n)

    def decrease(self, n=1):
        """
        將計數器的值減去 n ,然後返回計數器當前的值。
        如果使用者沒有顯式地指定 n ,那麼將計數器的值減去一。
        """
        return self.client.decr(self.key, n)

    def get(self):
        """
        返回計數器當前的值。
        """
        # 嘗試獲取計數器當前的值
        value = self.client.get(self.key)
        # 如果計數器並不存在,那麼返回 0 作為計數器的預設值
        if value is None:
            return 0
        else:
            # 因為 redis-py 的 get() 方法返回的是字串值
            # 所以這裡需要使用 int() 函式,將字串格式的數字轉換為真正的數字型別
            # 比如將 "10" 轉換為 10
            return int(value)

    def reset(self):
        """
        清零計數器,並返回計數器在被清零之前的值。
        """
        old_value = self.client.getset(self.key, 0)
        # 如果計數器之前並不存在,那麼返回 0 作為它的舊值
        if old_value is None:
            return 0
        else:
            # 跟 redis-py 的 get() 方法一樣, getset() 方法返回的也是字串值
            # 所以程式在將計數器的舊值返回給呼叫者之前,需要先將它轉換成真正的數字
            return int(old_value)

*限速器

為了保障系統的安全性和效能,並保證系統的重要資源不被濫用,應用程式常常會對使用者的某些行為進行限制,比如:

  • 為了防止網站內容被網路爬蟲抓取,網站管理者通常會限制每個IP地址在固定時間段內能夠訪問的頁面數量,比如1min之內最多隻能訪問30個頁面,超過這一限制的使用者將被要求進行身份驗證,確認本人並非網路爬蟲,或者等到限制解除之後再進行訪問。

  • 為了防止使用者的賬號遭到暴力破解,網上銀行通常會對訪客的密碼試錯次數進行限制,如果一個訪客在嘗試登入某個賬號的過程中,連續好幾次輸入了錯誤的密碼,那麼這個賬號將被凍結,只能等到第二天再嘗試登入,有的銀行還會向賬號持有者的手機發送通知來彙報這一情況。

實現這些限制機制的其中一種方法是使用限速器,它可以限制使用者在指定時間段之內能夠執行某項操作的次數。

這個限速器程式會把操作的最大可執行次數儲存在一個字串鍵裡面,然後在使用者每次嘗試執行被限制的操作之前,使用DECR命令將操作的可執行次數減1,最後通過檢查可執行次數的值來判斷是否執行該操作。

import redis

# 連線池 實際中可以做成一個基於模組匯入的單例
POOL = redis.ConnectionPool(host="127.0.0.1",port=6379,max_connections=10000)
client = redis.Redis(connection_pool=POOL)


#coding:utf-8

class Limiter:

    def __init__(self, client, limiter_name):
        self.client = client
        self.max_execute_times_key = limiter_name + '::max_execute_times'
        self.current_execute_times_key = limiter_name + '::current_execute_times'

    def set_max_execute_times(self, n):
        """
        設定操作的最大可執行次數。
        """
        self.client.set(self.max_execute_times_key, n)
        # 初始化操作的已執行次數為 0
        self.client.set(self.current_execute_times_key, 0)

    def get_max_execute_times(self):
        """
        返回操作的最大可執行次數。
        """
        return int(self.client.get(self.max_execute_times_key))

    def get_current_execute_times(self):
        """
        返回操作的當前已執行次數。
        """
        current_execute_times = int(self.client.get(self.current_execute_times_key))
        max_execute_times = self.get_max_execute_times()

        if current_execute_times > max_execute_times:
            # 當用戶嘗試執行操作的次數超過最大可執行次數時
            # current_execute_times 的值就會比 max_execute_times 的值更大
            # 為了將已執行次數的值保持在 
            # 0 <= current_execute_times <= max_execute_times 這一區間
            # 如果已執行次數已經超過最大可執行次數
            # 那麼程式將返回最大可執行次數作為結果
            return max_execute_times
        else:
            # 否則的話,返回真正的當前已執行次數作為結果
            return current_execute_times

    def still_valid_to_execute(self):
        """
        檢查是否可以繼續執行被限制的操作,
        是的話返回 True ,不是的話返回 False 。
        """
        updated_current_execute_times = self.client.incr(self.current_execute_times_key)
        max_execute_times = self.get_max_execute_times()
        return (updated_current_execute_times <= max_execute_times)

    def remaining_execute_times(self):
        """
        返回操作的剩餘可執行次數。
        """
        current_execute_times = self.get_current_execute_times()
        max_execute_times = self.get_max_execute_times()
        return max_execute_times - current_execute_times

    def reset_current_execute_times(self):
        """
        清零操作的已執行次數。
        """
        self.client.set(self.current_execute_times_key, 0)

這個限速器的關鍵在於set_max_execute_times()方法和still_valid_to_execute()方法:前者用於將最大可執行次數儲存在一個字串鍵裡面,後者則會在每次被呼叫時對可執行次數執行減1操作,並檢查目前剩餘的可執行次數是否已經變為負數,如果為負數,則表示可執行次數已經耗盡,不為負數則表示操作可以繼續執行。

字串重點 ***

  • Redis的字串鍵可以把單獨的一個鍵和單獨的一個值在資料庫中關聯起來,並且這個鍵和值既可以儲存文字資料,又可以儲存二進位制資料。
  • SET命令在預設情況下會直接覆蓋字串鍵已有的值,如果我們只想在鍵不存在的情況下為它設定值,那麼可以使用帶有NX選項的SET命令;相反,如果我們只想在鍵已經存在的情況下為它設定新值,那麼可以使用帶有XX選項的SET命令。
  • 使用MSET、MSETNX以及MGET命令可以有效地減少程式的網路通訊次數從而提升程式的執行效率
  • Redis使用者可以通過制定命名格式來提升Redis資料的可讀性並避免鍵名衝突。
  • 字串值的正數索引以0為開始,從字串的開頭向結尾不斷遞增;字串值的負數索引以-1為開始,從字串的結尾向開頭不斷遞減。
  • GETRANGE key start end命令接受的是閉區間索引範圍,位於start索引和end索引上的值也會被包含在命令返回的內容當中。
  • SETRANGE命令在需要時會自動對字串值進行擴充套件,並使用空位元組填充新擴充套件空間中沒有內容的部分。
  • APPEND命令在鍵不存在時執行設定操作,在鍵存在時執行追加操作。
  • Redis會把能夠被表示為long long int型別的整數以及能夠被表示為longdouble型別的浮點數當作數字來處理。

雜湊

雜湊命令補充

HSETNX

HSETNX:只在欄位不存在的情況下為它設定值

HSETNX命令的作用和HSET命令的作用非常相似,它們之間的區別在於,HSETNX命令只會在指定欄位不存在的情況下執行設定操作:

HSETNX stu:1 name whw

HSETNX命令在欄位不存在並且成功為它設定值時返回1,在欄位已經存在並導致設定操作未能成功執行時返回0。

# 失敗
127.0.0.1:6379> hsetnx stu:1 name whw
(integer) 0
# 失敗
127.0.0.1:6379> hsetnx stu:1 age 22
(integer) 0
# 成功
127.0.0.1:6379> hsetnx stu:1 gender male
(integer) 1

實現短網址生成程式

為了給使用者提供更多發言空間,並記錄使用者在網站上的連結點選行為,大部分社交網站都會將使用者輸入的網址轉換為相應的短網址。比如,如果我們在新浪微博中發言時輸入網址http://redisdoc.com/geo/index.html,那麼微博將把這個網址轉換為相應的短網址http://t.cn/RqRRZ8n,當用戶訪問這個短網址時,微博在後臺就會對這次點選進行一些資料統計,然後再引導使用者的瀏覽器跳轉到http://redisdoc.com/geo/index.html上面。

建立短網址本質上就是要創建出短網址ID與目標網址之間的對映,並在使用者訪問短網址時,根據短網址的ID從對映記錄中找出與之相對應的目標網址。

因為Redis的雜湊非常適合用來儲存短網址ID與目標網址之間的對映,所以我們可以基於Redis的雜湊實現一個短網址程式,程式碼清單展示了一個這樣的例子。

### 新增快取功能

#coding:utf-8

from cache import Cache
from base36 import base10_to_base36

ID_COUNTER = "ShortyUrl::id_counter"
URL_HASH = "ShortyUrl::url_hash" 
URL_CACHE = "ShortyUrl::url_cache"

### 將10進位制轉換為36進位制
def base10_to_base36(number):
    alphabets = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    result = ""
    while number != 0 :
        number, i = divmod(number, 36)
        result = (alphabets[i] + result)
    return result or alphabets[0]

### 實現短網址的類
class ShortyUrl:

    def __init__(self, client):
        self.client = client
        self.cache = Cache(self.client, URL_CACHE)  # 建立快取物件

    def shorten(self, target_url):
        """
        為目標網址建立一個短網址 ID 。
        """
        # 嘗試在快取裡面尋找目標網址對應的短網址 ID
        cached_short_id = self.cache.get(target_url)
        if cached_short_id is not None:
            return cached_short_id

        ### 如果沒有的話就在之前的id的基礎上增1 新建一個新的ID!!!
        new_id = self.client.incr(ID_COUNTER)
        short_id = base10_to_base36(new_id)
        self.client.hset(URL_HASH, short_id, target_url)
        # 在快取裡面關聯起目標網址和短網址 ID
        # 這樣程式就可以在使用者下次輸入相同的目標網址時
        # 直接重用已有的短網址 ID
        self.cache.set(target_url, short_id)
        return short_id

    def restore(self, short_id):
        """
        根據給定的短網址 ID ,返回與之對應的目標網址。
        """
        return self.client.hget(URL_HASH, short_id)

ShortyUrl類的shorten()方法負責為輸入的網址生成短網址ID,它的工作包括以下4個步驟:

1)為每個給定的網址建立一個十進位制數字ID。

2)將十進位制數字ID轉換為三十六進位制,並將這個三十六進位制數字用作給定網址的短網址ID,這種方法在數字ID長度較大時可以有效地縮短數字ID的長度。程式碼清單展示了將數字從十進位制轉換成三十六進位制的base10_to_base36函式的具體實現。

3)將短網址ID和目標網址之間的對映關係儲存到雜湊中。

4)向呼叫者返回剛剛生成的短網址ID。

restore()方法要做的事情和shorten()方法正好相反,它會從儲存著對映關係的雜湊裡面取出與給定短網址ID相對應的目標網址,然後將其返回給呼叫者。

使用雜湊鍵重新實現計數器

class Counter:

    def __init__(self, client, hash_key, counter_name):
        self.client = client
        self.hash_key = hash_key
        self.counter_name = counter_name

    def increase(self, n=1):
        """
        將計數器的值加上 n ,然後返回計數器當前的值。
        如果使用者沒有顯式地指定 n ,那麼將計數器的值加上一。
        """
        return self.client.hincrby(self.hash_key, self.counter_name, n)

    def decrease(self, n=1):
        """
        將計數器的值減去 n ,然後返回計數器當前的值。
        如果使用者沒有顯式地指定 n ,那麼將計數器的值減去一。
        """
        return self.client.hincrby(self.hash_key, self.counter_name, -n)

    def get(self):
        """
        返回計數器的當前值。
        """
        value = self.client.hget(self.hash_key, self.counter_name)
        # 如果計數器並不存在,那麼返回 0 作為預設值。
        if value is None:
            return 0
        else:
            return int(value)

    def reset(self):
        """
        將計數器的值重置為 0 。
        """
        self.client.hset(self.hash_key, self.counter_name, 0)

這個計數器實現充分地發揮了雜湊的優勢:

  • 它允許使用者將多個相關聯的計數器儲存到同一個雜湊鍵中實行集中管理,而不必像字串計數器那樣,為每個計數器單獨設定一個字串鍵。
  • 與此同時,通過對雜湊中的不同欄位執行HINCRBY命令,程式可以對指定的計數器執行加法操作和減法操作,而不會影響到儲存在同一雜湊中的其他計數器。

實現使用者登陸會話 ***

為了方便使用者,網站一般都會為已登入的使用者生成一個加密令牌,然後把這個令牌分別儲存在伺服器端和客戶端,之後每當使用者再次訪問該網站的時候,網站就可以通過驗證客戶端提交的令牌來確認使用者的身份,從而使得使用者不必重複地執行登入操作。

另外,為了防止使用者因為長時間不輸入密碼而遺忘密碼,以及為了保證令牌的安全性,網站一般都會為令牌設定一個過期期限(比如一個月),當期限到達之後,使用者的會話就會過時,而網站則會要求使用者重新登入。

上面描述的這種使用令牌來避免重複登入的機制一般稱為登入會話(loginsession),通過使用Redis的雜湊,我們可以構建出程式碼清單所示的登入會話程式。

import random
from time import time  # 獲取浮點數格式的 unix 時間戳
from hashlib import sha256

# 會話的預設過期時間
DEFAULT_TIMEOUT = 3600*24*30    # 一個月

# 儲存會話令牌以及會話過期時間戳的雜湊
SESSION_TOKEN_HASH = "session::token"
SESSION_EXPIRE_TS_HASH = "session::expire_timestamp"

# 會話狀態
SESSION_NOT_LOGIN = "SESSION_NOT_LOGIN"
SESSION_EXPIRED = "SESSION_EXPIRED"
SESSION_TOKEN_CORRECT = "SESSION_TOKEN_CORRECT"
SESSION_TOKEN_INCORRECT = "SESSION_TOKEN_INCORRECT"

def generate_token():
    """
    生成一個隨機的會話令牌。
    """
    random_string = str(random.getrandbits(256)).encode('utf-8')
    return sha256(random_string).hexdigest()


class LoginSession:

    def __init__(self, client, user_id):
        self.client = client
        self.user_id = user_id

    def create(self, timeout=DEFAULT_TIMEOUT):
        """
        建立新的登入會話並返回會話令牌,
        可選的 timeout 引數用於指定會話的過期時間(以秒為單位)。
        """
        # 生成會話令牌
        user_token = generate_token()
        # 計算會話到期時間戳
        expire_timestamp = time()+timeout
        # 以使用者 ID 為欄位,將令牌和到期時間戳分別儲存到兩個雜湊裡面
        self.client.hset(SESSION_TOKEN_HASH, self.user_id, user_token)
        self.client.hset(SESSION_EXPIRE_TS_HASH, self.user_id, expire_timestamp)
        # 將會話令牌返回給使用者
        return user_token

    def validate(self, input_token):
        """
        根據給定的令牌驗證使用者身份。
        這個方法有四個可能的返回值,分別對應四種不同情況:
        1. SESSION_NOT_LOGIN —— 使用者尚未登入
        2. SESSION_EXPIRED —— 會話已過期
        3. SESSION_TOKEN_CORRECT —— 使用者已登入,並且給定令牌與使用者令牌相匹配
        4. SESSION_TOKEN_INCORRECT —— 使用者已登入,但給定令牌與使用者令牌不匹配
        """
        # 嘗試從兩個雜湊裡面取出使用者的會話令牌以及會話的過期時間戳
        user_token = self.client.hget(SESSION_TOKEN_HASH, self.user_id)
        expire_timestamp = self.client.hget(SESSION_EXPIRE_TS_HASH, self.user_id)

        # 如果會話令牌或者過期時間戳不存在,那麼說明使用者尚未登入
        if (user_token is None) or (expire_timestamp is None):
            return SESSION_NOT_LOGIN

        # 將當前時間戳與會話的過期時間戳進行對比,檢查會話是否已過期
        # 因為 HGET 命令返回的過期時間戳是字串格式的
        # 所以在進行對比之前要先將它轉換成原來的浮點數格式
        if time() > float(expire_timestamp):
            return SESSION_EXPIRED

        # 使用者令牌存在並且未過期,那麼檢查它與給定令牌是否一致
        if input_token == user_token:
            return SESSION_TOKEN_CORRECT
        else:
            return SESSION_TOKEN_INCORRECT

    def destroy(self):
        """
        銷燬會話。
        """
        # 從兩個雜湊裡面分別刪除使用者的會話令牌以及會話的過期時間戳
        self.client.hdel(SESSION_TOKEN_HASH, self.user_id)
        self.client.hdel(SESSION_EXPIRE_TS_HASH, self.user_id)

LoginSession的create()方法首先會計算出隨機的會話令牌以及會話的過期時間戳,然後使用使用者ID作為欄位,將令牌和過期時間戳分別儲存到兩個雜湊裡面。

在此之後,每當客戶端向伺服器傳送請求並提交令牌的時候,程式就會使用validate()方法驗證被提交令牌的正確性:validate()方法會根據使用者的ID,從兩個雜湊裡面分別取出使用者的會話令牌以及會話的過期時間戳,然後通過一系列檢查判斷令牌是否正確以及會話是否過期。

最後,destroy()方法可以在使用者手動退出(logout)時呼叫,它可以刪除使用者的會話令牌以及會話的過期時間戳,讓使用者重新回到未登入狀態。

儲存圖資料 ***

在構建地圖應用、設計電路圖、進行任務排程、分析網路流量等多種任務中,都需要對圖(graph)資料結構實施建模,並存儲相關的圖資料。對於不少資料庫來說,想要高效、直觀地儲存圖資料並不是一件容易的事情,但是Redis卻能夠以多種不同的方式表示圖資料結構,其中一種方式就是使用雜湊。

例如,假設我們想要儲存圖3-20所示的帶權重有向圖,那麼可以建立一個圖3-21所示的雜湊鍵,這個雜湊鍵會以start_vertex->end_vertex的形式將各個頂點之間的邊儲存到雜湊的欄位中,並將欄位的值設定成邊的權重。通過這種方法,我們可以將圖的相關資料全部儲存到雜湊中,程式碼清單3-5展示了使用這種方法實現的圖資料儲存程式。

def make_edge_name_from_vertexs(start, end):
    """
    使用邊的起點和終點組建邊的名字。
    例子:對於 start 為 "a" 、 end 為 "b" 的輸入,這個函式將返回 "a->b" 。
    """
    return str(start) + "->" + str(end)

def decompose_vertexs_from_edge_name(name):
    """
    從邊的名字中分解出邊的起點和終點。
    例子:對於輸入 "a->b" ,這個函式將返回結果 ["a", "b"] 。
    """
    return name.split("->")


class Graph:

    def __init__(self, client, hkey):
        self.client = client
        self.key = hkey

    def add_edge(self, start, end, weight):
        """
        新增一條從頂點 start 連線至頂點 end 的邊,並將邊的權重設定為 weight 。
        """
        edge = make_edge_name_from_vertexs(start, end)
        self.client.hset(self.key, edge, weight)

    def remove_edge(self, start, end):
        """
        移除從頂點 start 連線至頂點 end 的一條邊。
        這個方法在成功刪除邊時返回 True ,
        因為邊不存在而導致刪除失敗時返回 False 。
        """
        edge = make_edge_name_from_vertexs(start, end)
        return self.client.hdel(self.key, edge)

    def get_edge_weight(self, start, end):
        """
        獲取從頂點 start 連線至頂點 end 的邊的權重,
        如果給定的邊不存在,那麼返回 None 。
        """
        edge = make_edge_name_from_vertexs(start, end)
        return self.client.hget(self.key, edge)

    def has_edge(self, start, end):
        """
        檢查頂點 start 和頂點 end 之間是否有邊,
        是的話返回 True ,否則返回 False 。
        """
        edge = make_edge_name_from_vertexs(start, end)
        return self.client.hexists(self.key, edge)

    def add_multi_edges(self, *tuples):
        """
        一次向圖中新增多條邊。
        這個方法接受任意多個格式為 (start, end, weight) 的三元組作為引數。
        """
        # redis-py 客戶端的 hmset() 方法接受一個字典作為引數
        # 格式為 {field1: value1, field2: value2, ...}
        # 為了一次對圖中的多條邊進行設定
        # 我們要將待設定的各條邊以及它們的權重儲存在以下字典
        nodes_and_weights = {}

        # 遍歷輸入的每個三元組,從中取出邊的起點、終點和權重
        for start, end, weight in tuples:
            # 根據邊的起點和終點,創建出邊的名字
            edge = make_edge_name_from_vertexs(start, end)
            # 使用邊的名字作為欄位,邊的權重作為值,把邊及其權重儲存到字典裡面
            nodes_and_weights[edge] = weight

        # 根據字典中儲存的欄位和值,對雜湊進行設定
        self.client.hmset(self.key, nodes_and_weights)

    def get_multi_edge_weights(self, *tuples):
        """
        一次獲取多條邊的權重。
        這個方法接受任意多個格式為 (start, end) 的二元組作為引數,
        然後返回一個列表作為結果,列表中依次儲存著每條輸入邊的權重。
        """
        # hmget() 方法接受一個格式為 [field1, field2, ...] 的列表作為引數
        # 為了一次獲取圖中多條邊的權重
        # 我們需要把所有想要獲取權重的邊的名字依次放入到以下列表裡面
        edge_list = []

        # 遍歷輸入的每個二元組,從中獲取邊的起點和終點
        for start, end in tuples:
            # 根據邊的起點和終點,創建出邊的名字
            edge = make_edge_name_from_vertexs(start, end)
            # 把邊的名字放入到列表中
            edge_list.append(edge)

        # 根據列表中儲存的每條邊的名字,從雜湊裡面獲取它們的權重
        return self.client.hmget(self.key, edge_list)

    def get_all_edges(self):
        """
        以集合形式返回整個圖包含的所有邊,
        集合包含的每個元素都是一個 (start, end) 格式的二元組。
        """
        # hkeys() 方法將返回一個列表,列表中包含多條邊的名字
        # 例如 ["a->b", "b->c", "c->d"]
        edges = self.client.hkeys(self.key)

        # 建立一個集合,用於儲存二元組格式的邊
        result = set()
        # 遍歷每條邊的名字
        for edge in edges:
            # 根據邊的名字,分解出邊的起點和終點
            start, end = decompose_vertexs_from_edge_name(edge)
            # 使用起點和終點組成一個二元組,然後把它放入到結果集合裡面
            result.add((start, end))

        return result

    def get_all_edges_with_weight(self):
        """
        以集合形式返回整個圖包含的所有邊,以及這些邊的權重。
        集合包含的每個元素都是一個 (start, end, weight) 格式的三元組。
        """
        # hgetall() 方法將返回一個包含邊和權重的字典作為結果
        # 格式為 {edge1: weight1, edge2: weight2, ...}
        edges_and_weights = self.client.hgetall(self.key)

        # 建立一個集合,用於儲存三元組格式的邊和權重
        result = set()
        # 遍歷字典中的每個元素,獲取邊以及它的權重
        for edge, weight in edges_and_weights.items():
            # 根據邊的名字,分解出邊的起點和終點
            start, end = decompose_vertexs_from_edge_name(edge)
            # 使用起點、終點和權重構建一個三元組,然後把它新增到結果集合裡面
            result.add((start, end, weight))

        return result

這個圖資料儲存程式的核心概念就是把邊(edge)的起點和終點組合成一個欄位名,並把邊的權重(weight)用作欄位的值,然後使用HSET命令或者HMSET命令把它們儲存到雜湊中。比如,如果使用者輸入的邊起點為"a",終點為"b",權重為"30",那麼程式將執行命令HSET hash "a->b" 30,把"a"至"b"的這條邊及其權重30儲存到雜湊中。

在此之後,程式就可以使用HDEL命令刪除圖的某條邊,使用HGET命令或者HMGET命令獲取邊的權重,使用HEXISTS命令檢查邊是否存在,使用HKEYS命令和HGETALL命令獲取圖的所有邊以及權重。

使用雜湊鍵實現文章儲存程式 **

比起用多個字串鍵來儲存文章的各項資料,更好的做法是把每篇文章的所有資料都儲存到同一個雜湊中

from time import time

class Article:

    def __init__(self, client, article_id):
        self.client = client
        self.article_id = str(article_id)
        self.article_hash = "article::" + self.article_id

    def is_exists(self):
        """
        檢查給定 ID 對應的文章是否存在。
        """
        # 如果文章雜湊裡面已經設定了標題,那麼我們認為這篇文章存在
        return self.client.hexists(self.article_hash, "title")

    def create(self, title, content, author):
        """
        建立一篇新文章,建立成功時返回 True ,
        因為文章已經存在而導致建立失敗時返回 False 。
        """
        # 文章已存在,放棄執行建立操作
        if self.is_exists(): 
            return False

        # 把所有文章資料都放到字典裡面
        article_data = {
            "title": title,
            "content": content,
            "author": author,
            "create_at": time()
        }
        # redis-py 的 hmset() 方法接受一個字典作為引數,
        # 並根據字典內的鍵和值對雜湊的欄位和值進行設定。
        return self.client.hmset(self.article_hash, article_data)

    def get(self):
        """
        返回文章的各項資訊。
        """
        # hgetall() 方法會返回一個包含標題、內容、作者和建立日期的字典!!!
        article_data = self.client.hgetall(self.article_hash)
        # 把文章 ID 也放到字典裡面,以便使用者操作
        article_data["id"] = self.article_id
        return article_data

    def update(self, title=None, content=None, author=None):
        """
        對文章的各項資訊進行更新,
        更新成功時返回 True ,失敗時返回 False 。
        """
        # 如果文章並不存在,那麼放棄執行更新操作
        if not self.is_exists(): 
            return False

        article_data = {}
        if title is not None:
            article_data["title"] = title
        if content is not None:
            article_data["content"] = content
        if author is not None:
            article_data["author"] = author
        # 引數也是字典的形式
        return self.client.hmset(self.article_hash, article_data)

雖然Redis為字串提供了MSET命令和MSETNX命令,但是並沒有為雜湊提供HMSET命令對應的HMSETNX命令,所以這個程式在建立一篇新文章之前,需要先通過is_exists()方法檢查文章是否存在,然後再考慮是否使用HMSET命令進行設定。

在使用字串鍵儲存文章資料的時候,為了避免資料庫中出現鍵名衝突,程式必須為每篇文章的每個屬性都設定一個獨一無二的鍵,比如使用article::10086::title鍵儲存ID為10086的文章的標題,使用article::12345::title鍵儲存ID為12345的文章的標題,諸如此類。相反,因為新的文章儲存程式可以直接將一篇文章的所有相關資訊都儲存到同一個雜湊中,所以它可以直接在雜湊裡面使用title作為標題的欄位,而不必擔心出現命名衝突。

雜湊與字串 ***

對於表中列出的字串命令和雜湊命令來說,它們之間的最大區別就是前者處理的是字串鍵,而後者處理的則是雜湊鍵,除此之外,這些命令要做的事情幾乎都是相同的。

雜湊鍵的優勢

雜湊的最大優勢,就是它只需要在資料庫裡面建立一個鍵,就可以把任意多的欄位和值儲存到雜湊裡面。相反,因為每個字串鍵只能儲存一個鍵值對,所以如果使用者要使用字串鍵去儲存多個數據項,就只能在資料庫中建立多個字串鍵。

從上圖中可以看到,為了儲存4個數據項,程式需要用到4個字串鍵或者一個雜湊鍵。按此計算,如果我們需要儲存100萬篇文章,那麼在使用雜湊鍵的情況下,程式只需要在資料庫裡面建立100萬個雜湊鍵就可以了;但是如果使用字串鍵,那麼程式就需要在資料庫裡面建立400萬個字串鍵。

資料庫鍵數量增多帶來的問題主要和資源有關:

  • 為了對資料庫以及資料庫鍵的使用情況進行統計,Redis會為每個資料庫鍵儲存一些額外的資訊,並因此帶來一些額外的記憶體消耗。對於單個數據庫鍵來說,這些額外的記憶體消耗幾乎可以忽略不計,但是當資料庫鍵的數量達到上百萬、上千萬甚至更多的時候,這些額外的記憶體消耗就會變得比較可觀。
  • 當雜湊包含的欄位數量比較少的時候,Redis就會使用特殊的記憶體優化結構去儲存雜湊中的欄位和值。與字串鍵相比,這種記憶體優化結構儲存相同資料所需要的記憶體要少得多。使用記憶體優化結構的雜湊越多,記憶體優化結構的效果也就越明顯。在一定條件下,對於相同的資料,使用雜湊鍵進行儲存比使用字串鍵儲存要節約一半以上的記憶體,有時候甚至會更多。
  • 除了需要耗費更多記憶體之外,更多的資料庫鍵也需要佔用更多的CPU。每當Redis需要對資料庫中的鍵進行處理時,資料庫包含的鍵越多,進行處理所需的CPU資源就會越多,處理所耗費的時間也會越長。典型的情況包括:
    • 統計資料庫和資料庫鍵的使用情況。
    • 對資料庫執行持久化操作,或者根據持久化檔案還原資料庫。
    • 通過模式匹配在資料庫中查詢某個鍵,或者執行類似的查詢操作。

最後,除了資源方面的優勢之外,雜湊鍵還可以有效地組織起相關的多項資料,讓程式產生更容易理解的資料,使得針對資料的批量操作變得更方便。比如在上面展示的圖3-23中,使用雜湊鍵儲存文章資料就比使用字串鍵儲存文章資料更為清晰、易懂。

字串鍵的有點

雖然使用雜湊鍵可以有效地節約資源並更好地組織資料,但是字串鍵也有自己的優點:

  • 雖然雜湊鍵命令和字串鍵命令在部分功能上有重合的地方,但是字串鍵命令提供的操作比雜湊鍵命令更為豐富。比如,字串能夠使用SETRANGE命令和GETRANGE命令設定或者讀取字串值的其中一部分,或者使用APPEND命令將新內容追加到字串值的末尾,而雜湊鍵並不支援這些操作。
  • 第12章中將對Redis的鍵過期功能進行介紹,這一功能可以在指定時間到達時,自動刪除指定的鍵。因為鍵過期功能針對的是整個鍵,使用者無法為雜湊中的不同欄位設定不同的過期時間,所以當一個雜湊鍵過期的時候,它包含的所有欄位和值都將被刪除。與此相反,如果使用者使用字串鍵儲存資訊項,就不會遇到這樣的問題——使用者可以為每個字串鍵分別設定不同的過期時間,讓它們根據實際的需要自動被刪除。

字串鍵與雜湊鍵的選擇

從資源佔用、支援的操作以及過期時間3個方面對比了字串鍵和雜湊鍵的優缺點。

既然字串鍵和雜湊鍵各有優點,那麼我們在構建應用程式的時候,什麼時候應該使用字串鍵,什麼時候又該使用雜湊鍵呢?對於這個問題,以下總結了一些選擇的條件和方法:

  • 如果程式需要為每個資料項單獨設定過期時間,那麼使用字串鍵。
  • 如果程式需要對資料項執行諸如SETRANGE、GETRANGE或者APPEND等操作,那麼優先考慮使用字串鍵。當然,使用者也可以選擇把資料儲存在雜湊中,然後將類似SETRANGE、GETRANGE這樣的操作交給客戶端執行。
  • 如果程式需要儲存的資料項比較多,並且你希望儘可能地減少儲存資料所需的記憶體,就應該優先考慮使用雜湊鍵。
  • 如果多個數據項在邏輯上屬於同一組或者同一類,那麼應該優先考慮使用雜湊鍵。

雜湊鍵重點 ***

  • 雜湊鍵會將一個鍵和一個雜湊在資料庫中關聯起來,使用者可以在雜湊中為任意多個欄位設定值。與字串鍵一樣,雜湊的欄位和值既可以是文字資料,也可以是二進位制資料。
  • 使用者可以通過雜湊鍵把相關聯的多項資料儲存到同一個雜湊中,以便對其進行管理,或者針對它們執行批量操作。
  • 因為Redis並沒有為雜湊提供相應的減法操作命令,所以如果使用者想對欄位儲存的數字值執行減法操作,就需要將負數增量傳遞給HINCRBY命令或HINCRBYFLOAT命令。
  • Redis雜湊包含的欄位在底層是以無序方式儲存的,根據欄位插入的順序不同,包含相同欄位的雜湊在執行HKEYS、HVALS和HGETALL等命令時可能會得到不同的結果,因此使用者在使用這3個命令時,不應該對命令返回元素的排列順序作任何假設。
  • 字串鍵和雜湊鍵雖然在操作方式上非常相似,但是因為它們都擁有各自獨有的優點和缺點,所以在一些情況下,這兩種資料結構是沒有辦法完全代替對方的。因此使用者在構建應用程式的時候,應該根據實際需要來選擇相應的資料結構。

列表

LPUSHX、RPUSHX:只對已存在的列表執行推入操作

# 不存在不會lpush
127.0.0.1:6379> lpushx lst1 whw naruto sasuke
(integer) 0
127.0.0.1:6379> lpush lst1 whw
(integer) 1
# 存在的話才會lpush
127.0.0.1:6379> lpushx lst1  naruto sasuke
(integer) 3
127.0.0.1:6379> lrange lst1 0 -1
1) "sasuke"
2) "naruto"
3) "whw"

先進先出佇列

class FIFOqueue:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def enqueue(self, item):
        """
        將給定元素放入佇列,然後返回隊列當前包含的元素數量作為結果。
        """
        return self.client.rpush(self.key, item)

    def dequeue(self):
        """
        移除並返回佇列目前入隊時間最長的元素。
        """
        return self.client.lpop(self.key)

分頁

對於網際網路上每一個具有一定規模的網站來說,分頁程式都是必不可少的:新聞站點、部落格、論壇、搜尋引擎等,都會使用分頁程式將數量眾多的資訊分割為多個頁面,使得使用者可以以頁為單位瀏覽網站提供的資訊,並以此來控制網站每次取出的資訊數量。

程式碼清單展示了一個使用列表實現分頁程式的方法,這個程式可以將給定的元素有序地放入一個列表中,然後使用LRANGE命令從列表中取出指定數量的元素,從而實現分頁這一概念。

class Paging:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def add(self, item):
        """
        將給定元素新增到分頁列表中。
        """
        self.client.lpush(self.key, item)

    def get_page(self, page_number, item_per_page):
        """
        從指定頁數中取出指定數量的元素。
        """
        # 根據給定的 page_number (頁數)和 item_per_page (每頁包含的元素數量)
        # 計算出指定分頁元素在列表中所處的索引範圍
        # 例子:如果 page_number = 1 , item_per_page = 10
        # 那麼程式計算得出的起始索引就是 0 ,而結束索引則是 9
        start_index = (page_number - 1) * item_per_page
        end_index = page_number * item_per_page - 1
        # 根據索引範圍從列表中獲取分頁元素
        return self.client.lrange(self.key, start_index, end_index)

    def size(self):
        """
        返回列表目前包含的分頁元素數量。
        """
        return self.client.llen(self.key)

待辦事項列表

現在很多人都會使用待辦事項軟體(也就是通常說的TODO軟體)來管理日常工作,這些軟體通常會提供一些列表,使用者可以將要做的事情記錄在待辦事項列表中,並將已經完成的事項放入已完成事項列表中。

程式碼清單展示了一個使用列表實現的待辦事項程式,這個程式的核心概念是使用兩個列表來分別記錄待辦事項和已完成事項:

  • 當用戶新增一個新的待辦事項時,程式就把這個事項放入待辦事項列表中。
  • 當用戶完成待辦事項列表中的某個事項時,程式就把這個事項從待辦事項列表中移除,並放入已完成事項列表中。

totolist1

def make_todo_list_key(user_id):
    """
    儲存待辦事項的列表。
    """
    return user_id + "::todo_list"

def make_done_list_key(user_id):
    """
    儲存已完成事項的列表。
    """
    return user_id + "::done_list"


class TodoList:

    def __init__(self, client, user_id):
        self.client = client
        self.user_id = user_id
        self.todo_list = make_todo_list_key(self.user_id)
        self.done_list = make_done_list_key(self.user_id)

    def add(self, event):
        """
        將指定事項新增到待辦事項列表中。
        """
        self.client.lpush(self.todo_list, event)

    def remove(self, event):
        """
        從待辦事項列表中移除指定的事項。
        """
        self.client.lrem(self.todo_list, 0, event)

    def done(self, event):
        """
        將待辦事項列表中的指定事項移動到已完成事項列表,
        以此來表示該事項已完成。
        """
        # 從待辦事項列表中移除指定事項
        self.remove(event)
        # 並將它新增到已完成事項列表中
        self.client.lpush(self.done_list, event)

    def show_todo_list(self):
        """
        列出所有待辦事項。
        """
        return self.client.lrange(self.todo_list, 0, -1)

    def show_done_list(self):
        """
        列出所有已完成事項。
        """
        return self.client.lrange(self.done_list, 0, -1)

todolist2

#coding:utf-8

def make_event_key(event_id):
    return "TodoList::event::" + str(event_id)

def make_todolist_key(user_id):
    return "TodoList::todo_events::" + str(user_id)

def make_donelist_key(user_id):
    return "TodoList::done_events::" + str(user_id)


class Event:

    def __init__(self, client, id):
        self.client = client
        self.id = id
        self.key = make_event_key(id)

    def set(self, title, content="", category="", due_date=""):
        """
        設定待辦事項的各項資訊,並在設定成功時返回 True 。
        """
        data = {
            "title": title,
            "content": content,
            "category": category,
            "due_date": due_date
        }
        return self.client.hmset(self.key, data)

    def get(self):
        """
        獲取待辦事項的各項資訊,並以字典方式返回這些資訊。
        """
        # 獲取資訊
        event_data = self.client.hgetall(self.key)
        # 將待辦事項的 ID 也新增到被返回的資訊中,方便查詢
        event_data["id"] = self.id
        return event_data


class TodoList:

    def __init__(self, client, user_id):
        self.client = client
        # 根據使用者的 ID ,創建出代辦事項列表和已完成事項列表
        self.todolist = make_todolist_key(user_id)
        self.donelist = make_donelist_key(user_id)
        # 待辦事項的 ID 生成器
        self.event_id = "TodoList::event_id"

    def add(self, title, content="", category="", due_date=""):
        """
        新增新的待辦事項,並返回該事項的 ID 。
        """
        # 為新的待辦事項生成 ID
        new_event_id = self.client.incr(self.event_id)
        # 設定待辦事項的相關資訊
        new_event = Event(self.client, new_event_id)
        new_event.set(title, content, category, due_date)
        # 將待辦事項的 ID 新增到待辦事項列表中
        self.client.rpush(self.todolist, new_event_id)
        return new_event_id

    def remove(self, event_id):
        """
        移除指定的待辦事項,
        移除成功時返回 True ,失敗時返回 False 。
        """
        self.client.lrem(self.todolist, event_id, 0)

    def done(self, event_id):
        """
        將指定的待辦事項設定為已完成。
        """
        self.client.lrem(self.todolist, event_id, 0)
        self.client.lpush(self.donelist, event_id)

    def show_todo_list(self):
        """
        列出使用者的所有待辦事項的 ID 。
        """
        return self.client.lrange(self.todolist, 0, -1)

    def show_done_list(self):
        """
        列出使用者的所有已完成事項的 ID 。
        """
        return self.client.lrange(self.donelist, 0, -1)

BLPOP:阻塞式左端彈出操作

BLPOP命令是帶有阻塞功能的左端彈出操作,它接受任意多個列表以及一個秒級精度的超時時限作為引數:

BLPOP list [list ...] timeout

BLPOP命令會按照從左到右的順序依次檢查使用者給定的列表,並對最先遇到的非空列表執行左端元素彈出操作。如果BLPOP命令在檢查了使用者給定的所有列表之後都沒有發現可以執行彈出操作的非空列表,那麼它將阻塞執行該命令的客戶端並開始等待,直到某個給定列表變為非空,又或者等待時間超出給定時限為止。

當BLPOP命令成功對某個非空列表執行了彈出操作之後,它將向用戶返回一個包含兩個元素的陣列:陣列的第一個元素記錄了執行彈出操作的列表,即被彈出元素的來源列表,而陣列的第二個元素則是被彈出元素本身。

# 返回的第一個值是彈出元素的列表lst1。第二個元素sasuke源於lst1
127.0.0.1:6379> BLPOP lst1 lst2 5
1) "lst1"
2) "sasuke"

解除阻塞狀態

正如前面所說,當BLPOP命令發現使用者給定的所有列表都為空時,就會讓執行命令的客戶端進入阻塞狀態。如果在客戶端被阻塞的過程中,有另一個客戶端嚮導致阻塞的列表推入了新的元素,那麼該列表就會變為非空,而被阻塞的客戶端也會隨著BLPOP命令成功彈出列表元素而重新回到非阻塞狀態。

如果在同一時間,有多個客戶端因為同一個列表而被阻塞,那麼當導致阻塞的列表變為非空時,伺服器將按照“先阻塞先服務”的規則,依次為被阻塞的各個客戶端彈出列表元素。

最後,如果被推入列表的元素數量少於被阻塞的客戶端數量,那麼先被阻塞的客戶端將會先解除阻塞,而未能解除阻塞的客戶端則需要繼續等待下次推入操作。

比如,如果有5個客戶端因為列表為空而被阻塞,但是推入列表的元素只有3個,那麼最先被阻塞的3個客戶端將會解除阻塞狀態,而剩下的2個客戶端則會繼續阻塞。

處理空列表

如果使用者向BLPOP命令傳入的所有列表都是空列表,並且這些列表在給定的時限之內一直沒有變成非空列表,那麼BLPOP命令將在給定時限到達之後向客戶端返回一個空值,表示沒有任何元素被彈出:

127.0.0.1:6379> BLPOP lst3 lst4 3
(nil)
(3.02s)

帶有阻塞功能的訊息佇列

在構建應用程式的時候,有時會遇到一些非常耗時的操作,比如傳送郵件,將一條新微博同步給上百萬個使用者,對硬碟進行大量讀寫,執行龐大的計算等。因為這些操作非常耗時,所以如果我們直接在響應使用者請求的過程中執行它們,那麼使用者就需要等待非常長時間。

例如,為了驗證使用者身份的有效性,有些網站在註冊新使用者的時候,會向用戶給定的郵件地址傳送一封啟用郵件,使用者只有在點選了驗證郵件裡面的啟用連結之後,新註冊的賬號才能夠正常使用。

程式碼清單展示了一個使用Redis實現的訊息佇列,它使用RPUSH命令將訊息推入佇列,並使用BLPOP命令從佇列中取出待處理的訊息。

class MessageQueue:

    def __init__(self, client, queue_name):
        self.client = client
        self.queue_name = queue_name

    def add_message(self, message):
        """
        將一條訊息放入到佇列裡面。
        """
        self.client.rpush(self.queue_name, message)

    def get_message(self, timeout=0):
        """
        從佇列裡面獲取一條訊息,
        如果暫時沒有訊息可用,那麼就在 timeout 引數指定的時限內阻塞並等待可用訊息出現。

        timeout 引數的預設值為 0 ,表示一直等待直到訊息出現為止。
        """
        # blpop 的結果可以是 None ,也可以是一個包含兩個元素的元組
        # 元組的第一個元素是彈出元素的來源佇列,而第二個元素則是被彈出的元素!!!
        result = self.client.blpop(self.queue_name, timeout)
        if result is not None:
            source_queue, poped_item = result
            return poped_item

    def len(self):
        """
        返回佇列目前包含的訊息數量。
        """
        return self.client.llen(self.queue_name)

使用訊息佇列實現實時提醒

訊息佇列除了可以在應用程式的內部使用,還可以用於實現面向使用者的實時提醒系統。

比如,如果我們在構建一個社交網站,那麼可以使用JavaScript指令碼,讓客戶端以非同步的方式呼叫MessageQueue類的get_message()方法,然後程式就可以在使用者被關注的時候、收到了新回覆的時候或者收到新私信的時候,通過呼叫add_message()方法來向用戶傳送提醒資訊。

列表重點 ***

  • Redis的列表是一種線性的有序結構,可以按照元素推入列表中的順序來儲存元素,並且列表中的元素可以重複出現。
  • 使用者可以使用LPUSH、RPUSH、RPOP、LPOP等多個命令,從列表的兩端推入或者彈出元素,也可以通過LINSERT命令將新元素插入列表已有元素的前面或後面。
  • 使用者可以使用LREM命令從列表中移除指定的元素,或者直接使用LTRIM命令對列表進行修剪。
  • 當用戶傳給LRANGE命令的索引範圍超出了列表的有效索引範圍時,LRANGE命令將對傳入的索引範圍進行修正,並根據修正後的索引範圍來獲取列表元素。
  • BLPOP、BRPOP和BRPOPLPUSH是阻塞版本的彈出和推入命令,如果使用者給定的所有列表都為空,那麼執行命令的客戶端將被阻塞,直到給定的阻塞時限到達或者某個給定列表非空為止。

集合

集合實現唯一計數器 *

在前面對字串鍵以及雜湊鍵進行介紹的時候,曾經展示過如何使用這兩種鍵去實現計數器程式。我們當時實現的計數器都非常簡單:每當某個動作被執行時,程式就可以呼叫計數器的加法操作或者減法操作,對動作的執行次數進行記錄。

以上這種簡單的計數行為在大部分情況下都是有用的,但是在某些情況下,我們需要一種要求更為嚴格的計數器,這種計數器只會對特定的動作或者物件進行一次計數而不是多次計數。

舉個例子,一個網站的受歡迎程度通常可以用瀏覽量和使用者數量這兩個指標進行描述:

  • 瀏覽量記錄的是網站頁面被使用者訪問的總次數,網站的每個使用者都可以重複地對同一個頁面進行多次訪問,而這些訪問會被瀏覽量計數器一個不漏地記下來。
  • 使用者數量記錄的是訪問網站的IP地址數量,即使同一個IP地址多次訪問相同的頁面,使用者數量計數器也只會對這個IP地址進行一次計數。

對於網站的瀏覽量,我們可以繼續使用字串鍵或者雜湊鍵實現的計數器進行計數,但如果我們想要記錄網站的使用者數量,就需要構建一個新的計數器,這個計數器對於每個特定的IP地址只會進行一次計數,我們把這種對每個物件只進行一次計數的計數器稱為唯一計數器(unique counter)。

程式碼清單展示了一個使用集合實現的唯一計數器,這個計數器通過把被計數的物件新增到集合來保證每個物件只會被計數一次,然後通過獲取集合的大小來判斷計數器目前總共對多少個物件進行了計數。

class UniqueCounter:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def count_in(self, item):
        """
        嘗試將給定元素計入到計數器當中:
        如果給定元素之前沒有被計數過,那麼方法返回 True 表示此次計數有效;
        如果給定元素之前已經被計數過,那麼方法返回 False 表示此次計數無效。
        """
        return self.client.sadd(self.key, item) == 1

    def get_result(self):
        """
        返回計數器的值。
        """
        return self.client.scard(self.key)

集合示例:打標籤 *

為了對網站上的內容進行分類標識,很多網站都提供了打標籤(tagging)功能。比如論壇可能會允許使用者為帖子新增標籤,這些標籤既可以對帖子進行歸類,又可以讓其他使用者快速地瞭解到帖子要講述的內容。再比如,一個圖書分類網站可能會允許使用者為自己收藏的每一本書新增標籤,使得使用者可以快速地找到被添加了某個標籤的所有圖書,並且網站還可以根據使用者的這些標籤進行資料分析,從而幫助使用者找到他們可能感興趣的圖書,除此之外,購物網站也可以為自己的商品加上標籤,比如“新上架”“熱銷中”“原裝進口”等,方便顧客瞭解每件商品的不同特點和屬性。類似的例子還有很多。

程式碼清單展示了一個使用集合實現的打標籤程式,通過這個程式,我們可以為不同的物件新增任意多個標籤:同一個物件的所有標籤都會被放到同一個集合裡面,集合裡的每一個元素就是一個標籤。

def make_tag_key(item):
    return item + "::tags"

class Tagging:

    def __init__(self, client, item):
        self.client = client
        self.key = make_tag_key(item)

    def add(self, *tags):
        """
        為物件新增一個或多個標籤。
        """
        self.client.sadd(self.key, *tags)

    def remove(self, *tags):
        """
        移除物件的一個或多個標籤。
        """
        self.client.srem(self.key, *tags)

    def is_included(self, tag):
        """
        檢查物件是否帶有給定的標籤,
        是的話返回 True ,不是的話返回 False 。
        """
        return self.client.sismember(self.key, tag)

    def get_all_tags(self):
        """
        返回物件帶有的所有標籤。
        """
        return self.client.smembers(self.key)

    def count(self):
        """
        返回物件帶有的標籤數量。
        """
        return self.client.scard(self.key)

集合實現點贊功能 *

除了點贊之外,很多網站還有諸如“+1”“頂”“喜歡”等功能,這些功能的名字雖然各有不同,但它們在本質上和點贊功能是一樣的。

程式碼清單展示了一個使用集合實現的點贊程式,這個程式使用集合來儲存對內容進行了點讚的使用者,從而確保每個使用者只能對同一內容點贊一次,並通過使用不同的集合命令來實現檢視點贊數量、檢視所有點贊使用者以及取消點贊等功能。

class Like:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def cast(self, user):
        """
        使用者嘗試進行點贊。
        如果此次點贊執行成功,那麼返回 True ;
        如果使用者之前已經點過贊,那麼返回 False 表示此次點贊無效。
        """
        return self.client.sadd(self.key, user) == 1

    def undo(self, user):
        """
        取消使用者的點贊。
        """
        self.client.srem(self.key, user)

    def is_liked(self, user):
        """
        檢查使用者是否已經點過贊。
        是的話返回 True ,否則的話返回 False 。
        """
        return self.client.sismember(self.key, user)

    def get_all_liked_users(self):
        """
        返回所有已經點過讚的使用者。
        """
        return self.client.smembers(self.key)

    def count(self):
        """
        返回已點贊使用者的人數。
        """
        return self.client.scard(self.key)

集合實現投票示例 *

問答網站、文章推薦網站、論壇這類注重內容質量的網站上通常都會提供投票功能,使用者可以通過投票來支援一項內容或者反對一項內容:

  • 一項內容獲得的支援票數越多,就會被網站安排到越明顯的位置,使得網站的使用者可以更快速地瀏覽到高質量的內容。
  • 與此相反,一項內容獲得的反對票數越多,它就會被網站安排到越不明顯的位置,甚至被當作廣告或者無用內容隱藏起來,使得使用者可以忽略這些低質量的內容。

根據網站性質的不同,不同的網站可能會為投票功能設定不同的名稱,比如有些網站可能會把“支援”和“反對”叫作“推薦”和“不推薦”,而有些網站可能會使用“喜歡”和“不喜歡”來表示“支援”和“反對”,諸如此類,但這些網站的投票功能在本質上都是一樣的。

程式碼清單展示了一個使用集合實現的投票程式:對於每一項需要投票的內容,這個程式都會使用兩個集合來分別儲存投支援票的使用者以及投反對票的使用者,然後通過對這兩個集合執行命令來實現投票、取消投票、統計投票數量、獲取已投票使用者名稱單等功能。

def vote_up_key(vote_target):
    return vote_target + "::vote_up"

def vote_down_key(vote_target):
    return vote_target + "::vote_down"

class Vote:

    def __init__(self, client, vote_target):
        self.client = client
        self.vote_up_set = vote_up_key(vote_target)
        self.vote_down_set = vote_down_key(vote_target)

    def is_voted(self, user):
        """
        檢查使用者是否已經投過票(可以是贊成票也可以是反對票),
        是的話返回 True ,否則返回 False 。
        """
        return self.client.sismember(self.vote_up_set, user) or \
               self.client.sismember(self.vote_down_set, user)

    def vote_up(self, user):
        """
        讓使用者投贊成票,並在投票成功時返回 True ;
        如果使用者已經投過票,那麼返回 False 表示此次投票無效。
        """
        if self.is_voted(user): 
            return False

        self.client.sadd(self.vote_up_set, user)
        return True

    def vote_down(self, user):
        """
        讓使用者投反對票,並在投票成功時返回 True ;
        如果使用者已經投過票,那麼返回 False 表示此次投票無效。
        """
        if self.is_voted(user): 
            return False

        self.client.sadd(self.vote_down_set, user)
        return True

    def undo(self, user):
        """
        取消使用者的投票。
        """
        self.client.srem(self.vote_up_set, user)
        self.client.srem(self.vote_down_set, user)

    def vote_up_count(self):
        """
        返回投支援票的使用者數量。
        """
        return self.client.scard(self.vote_up_set)

    def get_all_vote_up_users(self):
        """
        返回所有投支援票的使用者。
        """
        return self.client.smembers(self.vote_up_set)

    def vote_down_count(self):
        """
        返回投反對票的使用者數量。
        """
        return self.client.scard(self.vote_down_set)

    def get_all_vote_down_users(self):
        """
        返回所有投反對票的使用者。
        """
        return self.client.smembers(self.vote_down_set)

集合示例:社交關係

微博、Twitter以及類似的社交網站都允許使用者通過加關注或者加好友的方式,構建一種社交關係。這些網站上的每個使用者都可以關注其他使用者,也可以被其他使用者關注。通過正在關注名單(following list),使用者可以檢視自己正在關注的使用者及其人數;通過關注者名單(follower list),使用者可以檢視有哪些人正在關注自己,以及有多少人正在關注自己。

程式碼清單展示了一個使用集合來記錄社交關係的方法:

  • 程式為每個使用者維護兩個集合,一個集合儲存使用者的正在關注名單,而另一個集合則儲存使用者的關注者名單。
  • 當一個使用者(關注者)關注另一個使用者(被關注者)的時候,程式會將被關注者新增到關注者的正在關注名單中,並將關注者新增到被關注者的關注者名單裡面。
  • 當關注者取消對被關注者的關注時,程式會將被關注者從關注者的正在關注名單中移除,並將關注者從被關注者的關注者名單中移除。
def following_key(user):
    return user + "::following"

def follower_key(user):
    return user + "::follower"

class Relationship:

    def __init__(self, client, user):
        self.client = client
        self.user = user

    def follow(self, target):
        """
        關注目標使用者。
        """
        # 把 target 新增到當前使用者的正在關注集合裡面
        user_following_set = following_key(self.user)
        self.client.sadd(user_following_set, target)
        # 把當前使用者新增到 target 的關注者集合裡面
        target_follower_set = follower_key(target)
        self.client.sadd(target_follower_set, self.user)

    def unfollow(self, target):
        """
        取消對目標使用者的關注。
        """
        # 從當前使用者的正在關注集合中移除 target
        user_following_set = following_key(self.user)
        self.client.srem(user_following_set, target)
        # 從 target 的關注者集合中移除當前使用者
        target_follower_set = follower_key(target)
        self.client.srem(target_follower_set, self.user)

    def is_following(self, target):
        """
        檢查當前使用者是否正在關注目標使用者,
        是的話返回 True ,否則返回 False 。
        """
        # 如果 target 存在於當前使用者的正在關注集合中
        # 那麼說明當前使用者正在關注 target
        user_following_set = following_key(self.user)
        return self.client.sismember(user_following_set, target)

    def get_all_following(self):
        """
        返回當前使用者正在關注的所有人。
        """
        user_following_set = following_key(self.user)
        return self.client.smembers(user_following_set)

    def get_all_follower(self):
        """
        返回當前使用者的所有關注者。
        """
        user_follower_set = follower_key(self.user)
        return self.client.smembers(user_follower_set)

    def count_following(self):
        """
        返回當前使用者正在關注的人數。
        """
        user_following_set = following_key(self.user)
        return self.client.scard(user_following_set)

    def count_follower(self):
        """
        返回當前使用者的關注者人數。
        """
        user_follower_set = follower_key(self.user)
        return self.client.scard(user_follower_set)

集合實現抽獎

為了推銷商品並回饋消費者,商家經常會舉辦一些抽獎活動,每個符合條件的消費者都可以參加這種抽獎,而商家則需要從所有參加抽獎的消費者中選出指定數量的獲獎者,並向他們贈送物品、金錢或者其他購物優惠。

程式碼清單展示了一個使用集合實現的抽獎程式,這個程式會把所有參與抽獎活動的玩家都新增到一個集合中,然後通過SRANDMEMBER命令隨機地選出獲獎者。

#coding:utf-8

class Lottery:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def add_player(self, user):
        """
        將使用者新增到抽獎活動當中。
        """
        self.client.sadd(self.key, user)

    def get_all_players(self):
        """
        返回參加抽獎活動的所有使用者。
        """
        return self.client.smembers(self.key)

    def player_count(self):
        """
        返回參加抽獎活動的使用者人數。
        """
        return self.client.scard(self.key)

    def draw(self, number):
        """
        抽取指定數量的獲獎者。
        """
        # 因為 redis-py 目前還不支援 SPOP 命令的 count 引數
        # 所以我們在這裡只能通過呼叫多次 SPOP  命令來獲得多個隨機元素
        winners = list()
        for i in range(number):
            winners.append(self.client.spop(self.key))
        return winners

集合示例:共同關注與推薦關注

在前面我們學習瞭如何使用集合儲存社交網站的好友關係,但是除了基本的關注和被關注之外,社交網站通常還會提供一些額外的功能,幫助使用者去發現一些自己可能會感興趣的人。

例如,當我們在微博上訪問某個使用者的個人頁面時,頁面上就會展示出我們和這個使用者都在關注的人

共同關注

def following_key(user):
    return user + "::following"

class CommonFollowing:

    def __init__(self, client):
        self.client = client

    def calculate(self, user, target):
        """
        計算並返回當前使用者和目標使用者共同關注的人。
        """
        user_following_set = following_key(user)
        target_following_set = following_key(target)
        return self.client.sinter(user_following_set, target_following_set)

    def calculate_and_store(self, user, target, store_key):
        """
        計算出當前使用者和目標使用者共同關注的人,
        並把結果儲存到 store_key 指定的鍵裡面,
        最後返回共同關注的人數作為返回值。
        """
        user_following_set = following_key(user)
        target_following_set = following_key(target)
        return self.client.sinterstore(store_key, user_following_set, target_following_set)

推薦關注

程式碼清單展示了一個推薦關注程式的實現程式碼,這個程式會從使用者的正在關注集合中隨機選出指定數量的使用者作為種子使用者,然後對這些種子使用者的正在關注集合執行並集計算,最後從這個並集中隨機地選出一些使用者作為推薦關注的物件。

def following_key(user):
    return user + "::following"

def recommend_follow_key(user):
    return user + "::recommend_follow"

class RecommendFollow:

    def __init__(self, client, user):
        self.client = client
        self.user = user

    def calculate(self, seed_size):
        """
        計算並儲存使用者的推薦關注資料。
        """
        # 1)從使用者關注的人中隨機選一些人作為種子使用者
        user_following_set = following_key(self.user)
        following_targets = self.client.srandmember(user_following_set, seed_size)
        # 2)收集種子使用者的正在關注集合鍵名
        target_sets = set()
        for target in following_targets:
            target_sets.add(following_key(target))
        # 3)對所有種子使用者的正在關注集合執行並集計算,並儲存結果
        return self.client.sunionstore(recommend_follow_key(self.user), *target_sets)

    def fetch_result(self, number):
        """
        從已有的推薦關注資料中隨機地獲取指定數量的推薦關注使用者。
        """
        return self.client.srandmember(recommend_follow_key(self.user), number)

    def delete_result(self):
        """
        刪除已計算出的推薦關注資料。
        """
        self.client.delete(recommend_follow_key(self.user))

在執行這段程式碼之前,使用者peter關注了tom、david、jack、mary和sam這5個使用者,而這5個使用者又分別關注瞭如圖5-14所示的一些使用者,從結果來看,推薦程式隨機選中了david、sam和mary作為種子使用者,然後又從這3個使用者的正在關注集合的並集中隨機選出了10個人作為peter的推薦關注物件。

需要注意的是,這裡使用的是非常簡單的推薦演算法,假設使用者會對自己正在關注的人的關注物件感興趣,但實際的情況可能並非如此。為了獲得更為精準的推薦效果,實際的社交網站通常會使用更為複雜的推薦演算法,有興趣的讀者可以自行查詢這方面的資料。

集合實現:使用反向索引構建商品篩選器

在訪問網店或者購物網站的時候,我們經常會看到類似圖5-15中顯示的商品篩選器,對於不同的篩選條件,這些篩選器會給出不同的選項,使用者可以通過選擇不同的選項來快速找到自己想要的商品。

實現商品篩選器的方法之一是使用反向索引,這種資料結構可以為每個物品新增多個關鍵字,然後根據關鍵字去反向獲取相應的物品。舉個例子,對於"X1Carbon"這檯筆記本電腦來說,我們可以為它新增"ThinkPad"、"14inch"、"Windows"等關鍵字,然後通過這些關鍵字來反向獲取"X1 Carbon"這檯筆記本電腦。

實現反向索引的關鍵是要在物品和關鍵字之間構建起雙向的對映關係,比如對於剛剛提到的"X1 Carbon"膝上型電腦來說,反向索引程式需要構建出圖5-16所示的兩種對映關係:

  • 第一種對映關係將"X1 Carbon"對映至它帶有的各個關鍵字。
  • 第二種對映關係將"ThinkPad"、"14inch"、"Windows"等多個關鍵字對映至"X1 Carbon"。

程式碼清單展示了一個使用集合實現的反向索引程式,對於使用者給定的每一件物品,這個程式都會使用一個集合去儲存物品帶有的多個關鍵字,與此同時,對於這件物品的每一個關鍵字,程式都會使用一個集合去儲存關鍵字與物品之間的對映。因為構建反向索引所需的這兩種對映都是一對多對映,所以使用集合來儲存這兩種對映關係的做法是可行的。

def make_item_key(item):
    return "InvertedIndex::" + item + "::keywords"

def make_keyword_key(keyword):
    return "InvertedIndex::" + keyword + "::items"

class InvertedIndex:

    def __init__(self, client):
        self.client = client

    def add_index(self, item, *keywords):
        """
        為物品新增關鍵字。
        """
        # 將給定關鍵字新增到物品集合中
        item_key = make_item_key(item)
        result = self.client.sadd(item_key, *keywords)
        # 遍歷每個關鍵字集合,把給定物品新增到這些集合當中
        for keyword in keywords:
            keyword_key = make_keyword_key(keyword)
            self.client.sadd(keyword_key, item)
        # 返回新新增關鍵字的數量作為結果
        return result

    def remove_index(self, item, *keywords):
        """
        移除物品的關鍵字。
        """
        # 將給定關鍵字從物品集合中移除
        item_key = make_item_key(item)
        result = self.client.srem(item_key, *keywords)
        # 遍歷每個關鍵字集合,把給定物品從這些集合中移除
        for keyword in keywords:
            keyword_key = make_keyword_key(keyword)
            self.client.srem(keyword_key, item)
        # 返回被移除關鍵字的數量作為結果
        return result

    def get_keywords(self, item):
        """
        獲取物品的所有關鍵字。
        """
        return self.client.smembers(make_item_key(item))

    def get_items(self, *keywords):
        """
        根據給定的關鍵字獲取物品。
        """
        # 根據給定的關鍵字,計算出與之對應的集合鍵名
        keyword_key_list = map(make_keyword_key, keywords)
        # 然後對這些儲存著各式物品的關鍵字集合執行並集計算
        # 從而查找出帶有給定關鍵字的物品
        return self.client.sinter(*keyword_key_list)

集合重點

  • 集合允許使用者儲存任意多個各不相同的元素。
  • 所有針對單個元素的集合操作,複雜度都為O(1)。
  • 在使用SADD命令向集合中新增元素時,已存在於集合中的元素會自動被忽略。
  • 因為集合以無序的方式儲存元素,所以兩個包含相同元素的集合在使用SMEMBERS命令時可能會得到不同的結果。
  • SRANDMEMBER命令不會移除被隨機選中的元素,而SPOP命令的做法正相反。
  • 因為集合計算需要使用大量的計算資源,所以我們應該儘量儲存並重用集合計算的結果,在有需要的情況下,還可以把集合計算放到從伺服器中進行。

有序集合

有序集合:排行榜

我們在網上常常會看到各式各樣的排行榜,比如,在音樂網站上可能會看到試聽排行榜、下載排行榜、華語歌曲排行榜和英語歌曲排行榜等,而在視訊網站上可能會看到觀看排行榜、購買排行榜、收藏排行榜等,甚至連專案託管網站GitHub都提供了各種不同的排行榜,以此來幫助使用者找到近期最受人矚目的新專案。

程式碼清單展示了一個使用有序集合實現的排行榜程式:

  • 這個程式使用ZADD命令向排行榜中新增被排序的元素及其分數,並使用ZREVRANK命令去獲取元素在排行榜中的排名,以及使用ZSCORE命令去獲取元素的分數。
  • 當用戶不再需要對某個元素進行排序的時候,可以呼叫由ZREM命令實現的remove()方法,從排行榜中移除該元素。
  • 如果使用者想要修改某個被排序元素的分數,那麼只需要呼叫由ZINCRBY命令實現的increase_score()方法或者decrease_score()方法即可。
  • 當用戶想要獲取排行榜前N位的元素及其分數時,只需要呼叫由ZREVRANGE命令實現的top()方法即可。
class RankingList:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def set_score(self, item, score):
        """
        為排行榜中的指定元素設定分數,不存在的元素會被新增到排行榜裡面。
        """
        self.client.zadd(self.key, {item:score})

    def get_score(self, item):
        """
        獲取排行榜中指定元素的分數。
        """
        return self.client.zscore(self.key, item)

    def remove(self, item):
        """
        從排行榜中移除指定的元素。
        """
        self.client.zrem(self.key, item)

    def increase_score(self, item, increment):
        """
        將給定元素的分數增加 increment 分。
        """
        self.client.zincrby(self.key, increment, item)

    def decrease_score(self, item, decrement):
        """
        將給定元素的分數減少 decrement 分。
        """
        # 因為 Redis 沒有直接提供能夠減少元素分值的命令
        # 所以這裡通過傳入一個負數減量來達到減少分值的目的
        self.client.zincrby(self.key, 0-decrement, item)

    def get_rank(self, item):
        """
        獲取給定元素在排行榜中的排名。
        """
        rank = self.client.zrevrank(self.key, item)
        # 因為 Redis 元素的排名是以 0 為開始的,
        # 而現實世界中的排名通常以 1 為開始,
        # 所以這裡在返回排名之前會執行加一操作。
        if rank is not None: 
            return rank+1

    def top(self, n, with_score=False):
        """
        獲取排行榜中得分最高的 n 個元素,
        如果可選的 with_score 引數的值為 True ,那麼將元素的分數(分值)也一併返回。
        """
        return self.client.zrevrange(self.key, 0, n-1, withscores=with_score)

有序集合:時間線

在網際網路上,有很多網站都會根據內容的釋出時間來對內容進行排序,比如:

  • 部落格系統會按照文章釋出時間的先後,把最近釋出的文章放在前面,而釋出時間較早的文章則放在後面,這樣訪客在瀏覽部落格的時候,就可以先閱讀最新的文章,然後再閱讀較早的文章。
  • 新聞網站會按照新聞的釋出時間,把最近發生的新聞放在網站的前面,而早前發生的新聞則放在網站的後面,這樣當用戶訪問該網站的時候,就可以第一時間檢視到最新的新聞報道。
  • 諸如微博和Twitter這樣的微部落格都會把使用者最新發布的訊息放在頁面的前面,而稍早之前釋出的訊息則放在頁面的後面,這樣使用者就可以通過向後滾動網頁,檢視最近一段時間自己關注的人都發表了哪些動態。

類似的情形還有很多。通過對這類行為進行抽象,我們可以創建出程式碼清單所示的時間執行緒序:

  • 這個程式會把被新增到時間線裡面的元素用作成員,與元素相關聯的時間戳用作分值,將元素和它的時間戳新增到有序集合中。
  • 因為時間線中的每個元素都有一個與之相關聯的時間戳,所以時間線中的元素將按照時間戳的大小進行排序。
  • 通過對時間線中的元素執行ZREVRANGE命令或者ZREVRANGEBYSCORE命令,使用者可以以分頁的方式按順序取出時間線中的元素,或者從時間線中取出指定時間區間內的元素。
class Timeline:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def add(self, item, time):
        """
        將元素新增到時間線裡面。
        """
        self.client.zadd(self.key, {item:time})

    def remove(self, item):
        """
        從時間線裡面移除指定元素。
        """
        self.client.zrem(self.key, item)

    def count(self):
        """
        返回時間線包含的元素數量。
        """
        return self.client.zcard(self.key)

    def pagging(self, number, count, with_time=False):
        """
        按照每頁 count 個元素計算,取出時間線第 number 頁上的所有元素,
        這些元素將根據時間戳逆序排列。
        如果可選引數 with_time 的值為 True ,那麼元素對應的時間戳也會一併被返回。
        注意:number 引數的起始值是 1 而不是 0 。
        """
        start_index = (number - 1)*count
        end_index = number*count-1
        return self.client.zrevrange(self.key, start_index, end_index, withscores=with_time) 

    def fetch_by_time_range(self, min_time, max_time, number, count, with_time=False):
        """
        按照每頁 count 個元素計算,獲取指定時間段第 number 頁上的所有元素,
        這些元素將根據時間戳逆序排列。
        如果可選引數 with_time 的值為 True ,那麼元素對應的時間戳也會一併被返回。
        注意:number 引數的起始值是 1 而不是 0 。
        """
        start_index = (number-1)*count
        return self.client.zrevrangebyscore(self.key, max_time, min_time, start_index, count, withscores=with_time)

有序集合:商品推薦 *

在瀏覽網上商城的時候,我們常常會看到類似“購買此商品的顧客也同時購買”這樣的商品推薦功能

從抽象的角度來講,這些推薦功能實際上都是通過記錄使用者的訪問路徑來實現的:如果使用者在對一個目標執行了類似瀏覽或者購買這樣的操作之後,也對另一個目標執行了相同的操作,那麼程式就會對這次操作的訪問路徑進行記錄和計數,然後程式就可以通過計數結果來知道使用者在對指定目標執行了某個操作之後,還會對哪些目標執行相同的操作。

程式碼清單展示了一個使用以上原理實現的路徑統計程式:

  • 每當使用者從起點origin對終點destination進行一次訪問,程式都會使用ZINCRBY命令對儲存著起點origin訪問記錄的有序集合的destination成員執行一次分值加1操作。
  • 在此之後,程式只需要對儲存著origin訪問記錄的有序集合執行ZREVRANGE命令,就可以知道使用者在訪問了起點origin之後,最經常訪問的目的地有哪些。
def make_record_key(origin):
    return "forward_to_record::{0}".format(origin)

class Path:

    def __init__(self, client):
        self.client = client

    def forward_to(self, origin, destination):
        """
        記錄一次從起點 origin 到目的地 destination 的訪問。
        """
        key = make_record_key(origin)
        self.client.zincrby(key, 1, destination)

    def pagging_record(self, origin, number, count, with_time=False):
        """
        按照每頁 count 個目的地計算,
        從起點 origin 的訪問記錄中取出位於第 number 頁的訪問記錄,
        其中所有訪問記錄均按照訪問次數從多到小進行排列。
        如果可選的 with_time 引數的值為 True ,那麼將具體的訪問次數也一併返回。
        """
        key = make_record_key(origin)
        start_index = (number-1)*count
        end_index = number*count-1
        return self.client.zrevrange(key, start_index, end_index, withscores=with_time, score_cast_func=int) # score_cast_func = int 用於將成員的分值從浮點數轉換為整數

zrangbylex/zrevrangebylex:返回指定字典範圍內的成員

正如本章開頭所說,對於擁有不同分值的有序集合成員來說,成員的大小將由分值決定,至於分值相同的成員,它們的大小則由該成員在字典序中的大小決定

這種排列規則的一個特例是,當有序集合的所有成員都擁有相同的分值時,有序集合的成員將不再根據分值進行排序,而是根據字典序進行排序。在這種情況下,本章前面介紹的根據分值對成員進行操作的命令,比如ZRANGEBYSCORE、ZCOUNT和ZREMRANGEBYSCORE等,都將不再適用

為了讓使用者可以對字典序排列的有序集合執行類似ZRANGEBYSCORE這樣的操作,Redis提供了相應的ZRANGEBYLEX、ZREVRANGEBYLEX、ZLEXCOUNT和ZREMRANGEBYLEX命令,這些命令可以分別對字典序排列的有序集合執行升序排列的範圍獲取操作、降序排列的範圍獲取操作、統計位於字典序指定範圍內的成員數量以及移除位於字典序指定範圍內的成員,本章接下來將分別對這些命令進行介紹。

(略)

ZPOPMAX、ZPOPMIN:彈出分值最高和最低的成員

ZPOPMAX和ZPOPMIN是Redis 5.0版本新新增的兩個命令,分別用於移除並返回有序集合中分值最大和最小的N個元素:

ZPOPMAX sorted_set [count]
ZPOPMIN sorted_set [count]

其中被移除元素的數量可以通過可選的count引數來指定。如果使用者沒有顯式地給定count引數,那麼命令預設只會移除一個元素。

BZPOPMAX、BZPOPMIN:阻塞式最大/最小元素彈出操作

BZPOPMAX命令和BZPOPMIN命令分別是ZPOPMAX命令以及ZPOPMIN命令的阻塞版本,這兩個阻塞命令都接受任意多個有序集合和一個秒級精度的超時時限作為引數:


BZPOPMAX sorted_set [sorted_set ...] timeout

接收到引數的BZPOPMAX命令和BZPOPMIN命令會依次檢查使用者給定的有序集合,並從它遇到的第一個非空有序集合中彈出指定的元素。如果命令在檢查了所有給定有序集合之後都沒有發現可彈出的元素,那麼它將阻塞執行命令的客戶端,並在給定的時限之內等待可彈出的元素出現,直到等待時間超過給定時限為止。使用者可以通過將超時時限設定為0來讓命令一直阻塞,直到可彈出的元素出現為止。

BZPOPMAX命令和BZPOPMIN命令在成功彈出元素時將返回一個包含3個項的列表,這3個項分別為被彈出元素所在的有序集合、被彈出元素的成員以及被彈出元素的分值。與此相反,如果這兩個命令因為等待超時而未能彈出任何元素,那麼它們將返回一個空值作為結果。

有序集合重點

  • 有序集合同時擁有“有序”和“集合”兩種性質,集合性質保證有序集合只會包含各不相同的成員,而有序性質則保證了有序集合中的所有成員都會按照特定的順序進行排列。
  • 在一般情況下,有序集合成員的大小由分值決定,而分值相同的成員的大小則由成員在字典序中的大小決定。
  • 成員的分值除了可以是數字之外,還可以是表示無窮大的"+inf"或者表示無窮小的"-inf"。
  • ZADD命令從Redis 3.0.2版本開始,可以通過給定可選項來決定執行新增操作或是執行更新操作。
  • 因為Redis只提供了對成員分值執行加法計算的ZINCRBY命令,而沒有提供相應的減法計算命令,所以我們只能通過向ZINCRBY命令傳入負數增量來對成員分值執行減法計算。
  • ZINTERSTORE命令和ZUNIONSTORE命令除了可以使用有序集合作為輸入之外,還可以使用集合作為輸入。在預設情況下,這兩個命令會把集合的成員看作分值為1的有序集合成員來計算。
  • 當有序集合的所有成員都擁有相同的分值時,使用者可以通過ZRANGEBYLEX、ZLEXCOUNT、ZREMRANGEBYLEX等命令,按照字典序對有序集合中的成員進行操作。

HyperLogLog

HyperLogLog簡介 @@

點陣圖(bitmap)

點陣圖(bitmap)@@

地理座標

地理座標 @@

流(stream)@@

Redis附加功能

Redis資料庫的操作 ***

Redis為資料庫提供了非常豐富的操作命令,通過這些命令,使用者可以:

  • ●指定自己想要使用的資料庫。
  • ●一次性獲取資料庫包含的所有鍵,迭代地獲取資料庫包含的所有鍵,或者隨機地獲取資料庫中的某個鍵。
  • ●根據給定鍵的值進行排序。
  • ●檢查給定的一個或多個鍵,看它們是否存在於資料庫當中。
  • ●檢視給定鍵的型別。
  • ●對給定鍵進行重新命名。
  • ●移除指定的鍵,或者將它從一個數據庫移動到另一個數據庫。
  • ●清空資料庫包含的所有鍵。
  • ●交換給定的兩個資料庫。

SELECT:切換至指定的資料庫

一個Redis伺服器可以包含多個數據庫。在預設情況下,Redis伺服器在啟動時將會建立16個數據庫:這些資料庫都使用號碼進行標識,其中第一個資料庫為0號資料庫,第二個資料庫為1號資料庫,而第三個資料庫則為2號資料庫,以此類推。

Redis雖然不允許在同一個資料庫中使用兩個同名的鍵,但是由於不同資料庫擁有不同的名稱空間,因此在不同資料庫中使用同名的鍵是完全沒有問題的,而使用者也可以通過使用不同資料庫來儲存不同的資料,以此來達到重用鍵名並且減少鍵衝突的目的。

比如,如果我們將使用者的個人資訊和會話資訊都存放在同一個資料庫中,那麼為了區分這兩種資訊,程式就需要使用user::::profile格式的鍵來儲存使用者資訊,並使用user::::session格式的鍵來儲存使用者會話;但如果將這兩種資訊分別儲存在0號資料庫和1號資料庫中,那麼程式就可以在0號資料庫中使用user::格式的鍵來儲存使用者資訊,並在1號資料庫中繼續使用user::格式的鍵來儲存使用者會話。

redis> SELECT 3
OK
redis[3]> set name whw

KEYS:獲取所有與給定匹配符相匹配的鍵

KEYS命令接受一個全域性匹配符作為引數,然後返回資料庫中所有與這個匹配符相匹配的鍵作為結果:

keys pattern

如果我們想要獲取所有以user::為字首的鍵,那麼可以執行以下命令:

keys user::*

全域性匹配符

TYPE:檢視鍵的型別 *

TYPE命令允許我們檢視給定鍵的型別:

127.0.0.1:6379> keys *
1) "msg"
2) "pv_counter::123,100"
3) "fruits"
127.0.0.1:6379> type msg
string
127.0.0.1:6379> type fruits
set
127.0.0.1:6379>

流水線與事物 *****

在執行這些命令的時候,我們總是單獨地執行每個命令,也就是說,先將一個命令傳送到伺服器,等伺服器執行完這個命令並將結果返回給客戶端之後,再執行下一個命令,以此類推,直到所有命令都執行完畢為止。

這種執行命令的方式雖然可行,但在效能方面卻不是最優的,並且在執行時可能還會出現一些非常隱蔽的錯誤。為了解決這些問題,本章將會介紹Redis的流水線特性以及事務特性,前者可以有效地提升Redis程式的效能,而後者則可以避免單獨執行命令時可能會出現的一些錯誤。

Redis的流水線特性 *

在一般情況下,使用者每執行一個Redis命令,Redis客戶端和Redis伺服器就需要執行以下步驟:

1)客戶端向伺服器傳送命令請求。

2)伺服器接收命令請求,並執行使用者指定的命令呼叫,然後產生相應的命令執行結果。

3)伺服器向客戶端返回命令的執行結果。

4)客戶端接收命令的執行結果,並向用戶進行展示。

與大多數網路程式一樣,執行Redis命令所消耗的大部分時間都用在了傳送命令請求和接收命令結果上面:Redis伺服器處理一個命令請求通常只需要很短的時間,但客戶端將命令請求傳送給伺服器以及伺服器向客戶端返回命令結果的過程卻需要花費不少時間。通常情況下,程式需要執行的Redis命令越多,它需要進行的網路通訊操作也會越多,程式的執行速度也會因此而變慢。

為了解決這個問題,我們可以使用Redis提供的流水線特性:這個特性允許客戶端把任意多條Redis命令請求打包在一起,然後一次性地將它們全部發送給伺服器,而伺服器則會在流水線包含的所有命令請求都處理完畢之後,一次性地將它們的執行結果全部返回給客戶端。

通過使用流水線特性,我們可以將執行多個命令所需的網路通訊次數從原來的N次降低為1次,這可以大幅度地減少程式在網路通訊方面耗費的時間,使得程式的執行效率得到顯著的提升。

作為例子,圖13-1展示了在沒有使用流水線的情況下,執行3個Redis命令產生的網路通訊示意圖,而圖13-2則展示了在使用流水線的情況下,執行相同Redis命令產生的網路通訊示意圖。可以看到,在使用了流水線之後,程式進行網路通訊的次數從原來的3次降低到了1次。

雖然Redis伺服器提供了流水線特性,但這個特性還需要客戶端支援才能使用。幸運的是,包括redis-py在內的絕大部分Redis客戶端都提供了對流水線特性的支援,因此Redis使用者在絕大部分情況下都能夠享受到流水線特性帶來的好處。

為了在redis-py客戶端中使用流水線特性,我們需要用到pipeline()方法,呼叫這個方法會返回一個流水線物件,使用者只需要像平時執行Redis命令那樣,使用流水線物件呼叫相應的命令方法,就可以把想要執行的Redis命令放入流水線中

作為例子,以下程式碼展示瞭如何以流水線方式執行SET、INCRBY和SADD命令:

import redis

POOL = redis.ConnectionPool(host="127.0.0.1",port=6379,max_connections=10000)
conn = redis.Redis(connection_pool=POOL,max_connections=1000)

pipe = conn.pipeline(transaction=False)
pipe.set("msg","hello_world")
pipe.incrby("pv_counter::123,100")
pipe.sadd("fruits","apple","banana","cherry")
# 呼叫流水線物件的execute()方法,將佇列中的3個命令呼叫打包傳送給伺服器
pipe.execute()

這段程式碼先使用pipeline()方法建立了一個流水線物件,並將這個物件儲存到了pipe變數中(pipeline()方法中的transaction=False引數表示不在流水線中使用事務,這個引數的具體意義將在本章後續內容中說明)。在此之後,程式通過流水線物件分別呼叫了set()方法、incrby()方法和sadd()方法,將這3個方法對應的命令呼叫放入了流水線佇列中。最後,程式呼叫流水線物件的execute()方法,將佇列中的3個命令呼叫打包傳送給伺服器,而伺服器會在執行完這些命令之後,把各個命令的執行結果依次放入一個列表中,然後將這個列表返回給客戶端。

看看效果:

# 一開始沒有key
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>
# 執行完程式後有key了
127.0.0.1:6379> keys *
1) "msg"
2) "pv_counter::123,100"
3) "fruits"
127.0.0.1:6379>

流水線使用注意事項 *

雖然Redis伺服器並不會限制客戶端在流水線中包含的命令數量,但是卻會為客戶端的輸入緩衝區設定預設值為1GB的體積上限:當客戶端傳送的資料量超過這一限制時,Redis伺服器將強制關閉該客戶端。因此使用者在使用流水線特性時,最好不要一下把大量命令或者一些體積非常龐大的命令放到同一個流水線中執行,以免觸碰到Redis的這一限制。

除此之外,很多客戶端本身也帶有隱含的緩衝區大小限制,如果你在使用流水線特性的過程中,發現某些流水線命令沒有被執行,或者流水線返回的結果不完整,那麼很可能就是你的程式觸碰到了客戶端內建的緩衝區大小限制。在遇到這種情況時,請縮減流水線命令的數量及其體積,然後再進行嘗試。

使用流水線優化隨機鍵建立程式 *

每建立一個鍵,redis-py客戶端就需要與Redis伺服器進行一次網路通訊:考慮到這個程式執行的都是一些非常簡單的命令,每次網路通訊只執行一個命令的做法無疑是非常低效的。為了解決這個問題,我們可以使用流水線把程式生成的所有命令都包裹起來,這樣的話,建立多個隨機鍵所需要的網路通訊次數就會從原來的N次降低為1次。程式碼清單展示了修改之後的流水線版本隨機鍵建立程式。

import random

def create_random_type_keys(client, number):
    """
    在資料庫中建立指定數量的型別隨機鍵。
    """
    # 建立流水線物件 —— transaction為True表示事務 False表示流水線
    pipe = client.pipeline(transaction=False)
    for i in range(number):
        # 構建鍵名
        key = "key:{0}".format(i)
        # 從六個鍵建立函式中隨機選擇一個
        create_key_func = random.choice([
            create_string, 
            create_hash, 
            create_list, 
            create_set, 
            create_zset, 
            create_stream
        ])
        # 把待執行的 Redis 命令放入流水線佇列中
        create_key_func(pipe, key)
    # 執行流水線包裹的所有命令
    pipe.execute()

def create_string(client, key):
    client.set(key, "")

def create_hash(client, key):
    client.hset(key, "", "")

def create_list(client, key):
    client.rpush(key, "")

def create_set(client, key):
    client.sadd(key, "")

def create_zset(client, key):
    client.zadd(key, {"":0})

def create_stream(client, key):
    client.xadd(key, {"":""})

即使只在本地網路中進行測試,新版的隨機鍵建立程式也有5倍的效能提升。當客戶端與伺服器處於不同的網路之中,特別是它們之間的連線速度較慢時,流水線版本的效能提升還會更多。

Redis事務 ***

雖然Redis的LPUSH命令和RPUSH命令允許使用者一次向列表推入多個元素,但是列表的彈出命令LPOP和RPOP每次卻只能彈出一個元素!

因為Redis並沒有提供能夠一次彈出多個列表元素的命令,所以為了方便地執行這一任務,使用者可能會寫出程式碼清單所示的程式碼。

def mlpop(client, list_key, number):
    # 用於儲存被彈出元素的結果列表
    items = []  
    for i in range(number):
        # 執行 LPOP 命令,彈出一個元素
        poped_item = client.lpop(list_key)
        # 將被彈出的元素追加到結果列表末尾
        items.append(poped_item)
    # 返回結果列表
    return items

mlpop()函式通過將多條LPOP命令傳送至伺服器來達到彈出多個元素的目的。遺憾的是,這個函式並不能保證它傳送的所有LPOP命令都會被伺服器執行:如果伺服器在執行多個LPOP命令的過程中下線了,那麼mlpop()傳送的這些LPOP命令將只有一部分會被執行。

舉個例子,如果我們呼叫mlpop(client, "lst", 3),嘗試從"lst"列表中彈出3個元素,那麼mlpop()將向伺服器連續傳送3個LPOP命令,但如果伺服器在順利執行前兩個LPOP命令之後因為故障下線了,那麼"lst"列表將只有2個元素會被彈出。

需要注意的是,即使我們使用流水線特性,把多條LPOP命令打包在一起傳送,也不能保證所有命令都會被伺服器執行:這是因為流水線只能保證多條命令會一起被髮送至伺服器但它並不保證這些命令都會被伺服器執行。

為了實現一個正確且安全的mlpop()函式,我們需要一種能夠讓伺服器將多個命令打包起來一併執行的技術,而這正是本節將要介紹的事務特性:

  • 事務可以將多個命令打包成一個命令來執行,當事務成功執行時,事務中包含的所有命令都會被執行。
  • 相反,如果事務沒有成功執行,那麼它包含的所有命令都不會被執行。

通過使用事務,使用者可以保證自己想要執行的多個命令要麼全部被執行,要麼一個都不執行。以mlpop()函式為例,通過使用事務,我們可以保證被呼叫的多個LPOP命令要麼全部執行,要麼一個也不執行,從而杜絕只有其中一部分LPOP命令被執行的情況出現。

事務的安全性

在對資料庫的事務特性進行介紹時,人們一般都會根據資料庫對ACID性質的支援程度去判斷資料庫的事務是否安全。

具體來說,Redis的事務總是具有ACID性質中的A、C、I性質:

  • 原子性(Atomic):如果事務成功執行,那麼事務中包含的所有命令都會被執行;相反,如果事務執行失敗,那麼事務中包含的所有命令都不會被執行。
  • 一致性(Consistent):Redis伺服器會對事務及其包含的命令進行檢查,確保無論事務是否執行成功,事務本身都不會對資料庫造成破壞。
  • 隔離性(Isolate):每個Redis客戶端都擁有自己獨立的事務佇列,並且每個Redis事務都是獨立執行的,不同事務之間不會互相干擾。

除此之外,當Redis伺服器執行在特定的持久化模式之下時,Redis的事務也具有ACID性質中的D性質:

  • 永續性(Durable):當事務執行完畢時,它的結果將被儲存在硬碟中,即使伺服器在此之後停機,事務對資料庫所做的修改也不會丟失。

事務對伺服器的影響

因為事務在執行時會獨佔伺服器,所以使用者應該避免在事務中執行過多命令,更不要將一些需要進行大量計算的命令放入事務中,以免造成伺服器阻塞。

流水線與事物

正如前面所言,流水線與事務雖然在概念上有些相似,但是在作用上卻並不相同:流水線的作用是將多個命令打包,然後一併傳送至伺服器,而事務的作用則是將多個命令打包,然後讓伺服器一併執行它們。

因為Redis的事務在EXEC命令執行之前並不會產生實際效果,所以很多Redis客戶端都會使用流水線去包裹事務命令,並將入隊的命令快取在本地,等到使用者輸入EXEC命令之後,再將所有事務命令通過流水線一併傳送至伺服器,這樣客戶端在執行事務時就可以達到“打包傳送,打包執行”的最優效果。

本書使用的redis-py客戶端就是這樣處理事務命令的客戶端之一,當我們使用pipeline()方法開啟一個事務時,redis-py預設將使用流水線包裹事務佇列中的所有命令。

import redis


POOL = redis.ConnectionPool(host="127.0.0.1",port=6379,max_connections=10000)

conn = redis.Redis(connection_pool=POOL,max_connections=1000)
# 開啟事務
# transaction預設為True,表示執行事務!!!
transaction = conn.pipeline(transaction=True)
# 將命令放入事務佇列
transaction.set("title","Hand in Hand")
transaction.sadd("fruits","apple","banana")
transaction.rpush("numbers",123,222,666)
# 執行事務
transaction.execute()

在執行transaction.execute()呼叫時,redis-py將通過流水線向伺服器傳送以下命令:

MULTI
SET title "Hand in Hadn"
SADD fruits "apple" "banana"
RPUSH numbers 123 222 666
EXEC

這樣,無論事務包含了多少個命令,redis-py也只需要與伺服器進行一次網路通訊。

事務示例:實現mlpop()函式 ***

在瞭解了事務的使用方法之後,現在是時候用它來重新實現一個安全且正確的mlpop()函數了,為此,我們需要使用事務包裹被執行的所有LPOP命令,就像程式碼所示的那樣。

def mlpop(client, list_key, number):
    # 開啟事務
    transaction = client.pipeline()
    # 將多個 LPOP 命令放入事務佇列
    for i in range(number):
        transaction.lpop(list_key)
    # 執行事務
    return transaction.execute()

新版的mlpop()函式通過事務確保自己傳送的多個LPOP命令要麼全部執行,要麼全部不執行,以此來避免只有一部分LPOP命令被執行的情況出現。

帶有樂觀鎖的事務 ***

本書在第2章實現了具有基本獲取和釋放功能的鎖程式,並在第12章為該程式加上了自動釋放功能,但是這兩個鎖程式都有一個問題,那就是它們的釋放操作都是不安全的:

  • 無論某個客戶端是否是鎖的持有者,只要它呼叫release()方法,鎖就會被釋放。
  • 在鎖被佔用期間,如果某個不是持有者的客戶端錯誤地呼叫了release()方法,那麼鎖將在持有者不知情的情況下釋放,並導致系統中同時存在多個鎖。

為了解決這個問題,我們需要修改鎖實現,給它加上身份驗證功能:

  • 客戶端在嘗試獲取鎖的時候,除了需要輸入鎖的最大使用時限之外,還需要輸入一個代表身份的識別符號,當客戶端成功取得鎖時,程式將把這個識別符號儲存在代表鎖的字串鍵中。
  • 當客戶端呼叫release()方法時,它需要將自己的識別符號傳給release()方法,而release()方法則需要驗證客戶端傳入的識別符號與鎖鍵儲存的識別符號是否相同,以此來判斷呼叫release()方法的客戶端是否就是鎖的持有者,從而決定是否釋放鎖。

不安全的帶身份標識的鎖的實現

class IdentityLock:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def acquire(self, identity, timeout):
        """
        嘗試獲取一個帶有身份識別符號和最大使用時限的鎖,
        成功時返回 True ,失敗時返回 False 。
        """
        result = self.client.set(self.key, identity, ex=timeout, nx=True)
        return result is not None

    def release(self, input_identity):
        """
        根據給定的識別符號,嘗試釋放鎖。
        返回 True 表示釋放成功;
        返回 False 則表示給定的識別符號與鎖持有者的識別符號並不相同,釋放請求被拒絕。
        """
        # 獲取鎖鍵儲存的識別符號
        lock_identity = self.client.get(self.key)
        if lock_identity is None:
            # 如果鎖鍵的識別符號為空,那麼說明鎖已經被釋放
            return True
        elif input_identity == lock_identity:
            # 如果給定的識別符號與鎖鍵的識別符號相同,那麼釋放這個鎖
            self.client.delete(self.key)
            return True
        else:
            # 如果給定的識別符號與鎖鍵的識別符號並不相同
            # 那麼說明當前客戶端不是鎖的持有者
            # 拒絕本次釋放請求
            return False

這個鎖實現在絕大部分情況下都能夠正常執行,但它的release()方法包含了一個非常隱蔽的錯誤:在程式使用GET命令獲取鎖鍵的值以後,直到程式呼叫DEL命令刪除鎖鍵的這段時間裡面,鎖鍵的值有可能已經發生了變化,因此程式執行的DEL命令有可能會導致當前持有者的鎖被錯誤地釋放。

舉個例子,表就展示了一個鎖被錯誤釋放的例子:客戶端A是鎖原來的持有者,它呼叫release()方法嘗試釋放自己的鎖,但是當客戶端A執行完GET命令並確認自己就是鎖的持有者之後,鎖鍵卻因為過期而自動被移除了,緊接著客戶端B又通過執行acquire()方法成功取得了鎖,然而客戶端A並未察覺這一變化,它以為自己還是鎖的持有者,並呼叫DEL命令把屬於客戶端B的鎖給釋放了。

為了正確地實現release()方法,我們需要一種機制,它可以保證如果鎖鍵的值在GET命令執行之後發生了變化,那麼DEL命令將不會被執行。在Redis中,這種機制被稱為樂觀鎖。

WATCH:對鍵進行監視

詳見這裡

樂觀鎖實現:帶有身份驗證功能的鎖 ***

之前展示的鎖實現的問題在於,在GET命令執行之後,直到DEL命令執行之前的這段時間裡,鎖鍵的值有可能會發生變化,並出現誤刪鎖鍵的情況。為了解決這個問題,我們需要使用樂觀鎖去保證DEL命令只會在鎖鍵的值沒有發生任何變化的情況下執行,程式碼清單展示了修改之後的鎖實現。

from redis import WatchError

class IdentityLock:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def acquire(self, identity, timeout):
        """
        嘗試獲取一個帶有身份識別符號和最大使用時限的鎖,
        成功時返回 True ,失敗時返回 False 。
        """
        result = self.client.set(self.key, identity, ex=timeout, nx=True)
        return result is not None

    def release(self, input_identity):
        """
        根據給定的識別符號,嘗試釋放鎖。
        返回 True 表示釋放成功;
        返回 False 則表示給定的識別符號與鎖持有者的識別符號並不相同,釋放請求被拒絕。
        """
        # 開啟流水線
        pipe = self.client.pipeline()
        try:
            # 監視鎖鍵~~~!!!
            pipe.watch(self.key)
            # 獲取鎖鍵儲存的識別符號
            lock_identity = pipe.get(self.key)
            if lock_identity is None:
                # 如果鎖鍵的識別符號為空,那麼說明鎖已經被釋放
                return True
            elif input_identity == lock_identity:
                # 如果給定的識別符號與鎖鍵儲存的識別符號相同,那麼釋放這個鎖
                # 為了確保 DEL 命令在執行時的安全性,我們需要使用事務去包裹它!!!
                pipe.multi()
                pipe.delete(self.key)
                pipe.execute()
                return True
            else:
                # 如果給定的識別符號與鎖鍵儲存的識別符號並不相同
                # 那麼說明當前客戶端不是鎖的持有者
                # 拒絕本次釋放請求
                return False
        ### WatchError
        except WatchError:
            # 丟擲異常說明在 DEL 命令執行之前,已經有其他客戶端修改了鎖鍵
            return False
        finally:
            # 取消對鍵的監視~~~!!!
            pipe.unwatch()
            # 因為 redis-py 在執行 WATCH 命令期間,會將流水線與單個連線進行繫結
            # 所以在執行完 WATCH 命令之後,必須呼叫 reset() 方法將連線歸還給連線池
            pipe.reset()

帶有身份驗證功能的計數訊號量(允許多個客戶端同時使用資源) ***

本書前面介紹瞭如何使用鎖去獲得一項資源的獨佔使用權,並給出了幾個不同的鎖實現,但是除了獨佔一項資源之外,有時候我們也會想讓多個使用者共享一項資源,只要共享者的數量不超過我們限制的數量即可。

舉個例子,假設我們的系統有一項需要進行大量計算的操作,如果很多使用者同時執行這項操作,那麼系統的計算資源將會被耗盡。為了保證系統的正常運作,我們可以使用計數訊號量來限制在同一時間內能夠執行該操作的最大使用者數量。

計數訊號量(counter semaphore)與鎖非常相似,它們都可以限制資源的使用權,但是與鎖只允許單個客戶端使用資源的做法不同,計數訊號量允許多個客戶端同時使用資源,只要這些客戶端的數量不超過指定的限制即可。

程式碼清單展示了一個帶有身份驗證功能的計數訊號量實現:

  • 這個程式會把所有成功取得訊號量的客戶端的識別符號儲存在格式為semaphore::::holders的集合鍵中,至於訊號量的最大可獲取數量則儲存在格式為semaphore::::max_size的字串鍵中。
  • 在使用計數訊號量之前,使用者需要先通過set_max_size()方法設定計數訊號量的最大可獲取數量。
  • get_max_size()方法和get_current_size()方法可以分別獲取計數訊號量的最大可獲取數量以及當前已獲取數量。
  • 獲取訊號量的acquire()方法是程式的核心:在獲取訊號量之前,程式會先使用兩個GET命令分別獲取訊號量的當前已獲取數量以及最大可獲取數量,如果訊號量的當前已獲取數量並未超過最大可獲取數量,那麼程式將執行SADD命令,將客戶端給定的識別符號新增到holders集合中。
  • 由於GET命令執行之後直到SADD命令執行之前的這段時間裡,可能會有其他客戶端搶先取得了訊號量,並導致可用訊號量數量發生變化,因此程式需要使用WATCH命令監視holders鍵,並使用事務包裹SADD命令,以此通過樂觀鎖機制確保訊號量獲取操作的安全性。
  • 因為max_size鍵的值也會影響訊號量獲取操作的執行結果,並且這個鍵的值在SADD命令執行之前也可能會被其他客戶端修改,所以程式在監視holders鍵的同時,也需要監視max_size鍵。
  • 當客戶端想要釋放自己持有的訊號量時,只需要把自己的識別符號傳給release()方法即可,release()方法將呼叫SREM命令,從holders集合中查詢並移除客戶端給定的識別符號。
from redis import WatchError

class Semaphore:

    def __init__(self, client, name):
        self.client = client
        self.name = name
        # 用於儲存訊號量持有者識別符號的集合
        self.holder_key = "semaphore::{0}::holders".format(name)
        # 用於記錄訊號量最大可獲取數量的字串
        self.size_key = "semaphore::{0}::max_size".format(name)

    def set_max_size(self, size):
        """
        設定訊號量的最大可獲取數量。
        """
        self.client.set(self.size_key, size)

    def get_max_size(self):
        """
        返回訊號量的最大可獲取數量。
        """
        result = self.client.get(self.size_key)
        if result is None:
            return 0
        else:
            return int(result)

    def get_current_size(self):
        """
        返回目前已被獲取的訊號量數量。
        """
        return self.client.scard(self.holder_key)

    def acquire(self, identity):
        """
        嘗試獲取一個訊號量,成功時返回 True ,失敗時返回 False 。
        傳入的 identity 引數將被用於標識客戶端的身份。

        如果呼叫該方法時訊號量的最大可獲取數量尚未被設定,那麼引發一個 TypeError 。
        """
        # 開啟流水線
        pipe = self.client.pipeline()
        try:
            # 監視與訊號量有關的兩個鍵
            pipe.watch(self.size_key, self.holder_key)

            # 取得當前已被獲取的訊號量數量,以及最大可獲取的訊號量數量
            current_size = pipe.scard(self.holder_key)
            max_size_in_str = pipe.get(self.size_key)
            if max_size_in_str is None:
                raise TypeError("Semaphore max size not set")
            else:
                max_size = int(max_size_in_str)

            if current_size < max_size:
                # 如果還有剩餘的訊號量可用
                # 那麼將給定的識別符號放入到持有者集合中
                pipe.multi()
                pipe.sadd(self.holder_key, identity)
                pipe.execute()
                return True
            else:
                # 沒有訊號量可用,獲取失敗
                return False
        except WatchError:
            # 獲取過程中有其他客戶端修改了 size_key 或者 holder_key ,獲取失敗
            return False
        finally:
            # 取消監視
            pipe.unwatch()
            # 將連線歸還給連線池
            pipe.reset()

    def release(self, identity):
        """
        根據給定的識別符號,嘗試釋放當前客戶端持有的訊號量。
        返回 True 表示釋放成功,返回 False 表示由於識別符號不匹配而導致釋放失敗。
        """
        # 嘗試從持有者集合中移除給定的識別符號
        result = self.client.srem(self.holder_key, identity)
        # 移除成功則說明訊號量釋放成功
        return result == 1

Redis流水線與事務重點 ***

  • 在通常情況下,程式需要執行的Redis命令越多,需要進行的網路通訊次數也會越多,程式的執行速度也會變得越慢。通過使用Redis的流水線特性,程式可以一次把多個命令傳送給Redis伺服器,這可以將執行多個命令所需的網路通訊次數從原來的N次降低為1次,從而使得程式的執行效率得到顯著提升。
  • 通過使用Redis的事務特性,使用者可能將多個命令打包成一個命令執行:當事務成功執行時,事務中包含的所有命令都會被執行;相反,如果事務執行失敗,那麼它包含的所有命令都不會被執行。
  • Redis事務總是具有ACID性質中的原子性、一致性和隔離性,至於是否具有耐久性則取決於Redis使用的持久化模式。
  • Redis事務總是具有ACID性質中的原子性、一致性和隔離性,至於是否具有耐久性則取決於Redis使用的持久化模式。
  • 流水線與事務雖然在概念上有相似之處,但它們並不相等:流水線的作用是打包傳送多條命令,而事務的作用則是打包執行多條命令。
  • 為了優化事務的執行效率,很多Redis客戶端都會把待執行的事務命令快取在本地,然後在使用者執行EXEC命令時,通過流水線一次把所有事務命令傳送至Redis伺服器。
  • 通過同時使用WATCH命令和事務,使用者可以構建一種樂觀鎖機制,這種機制可以確保事務只會在指定鍵沒有發生任何變化的情況下執行。