1. 程式人生 > 資料庫 >python實現redis三種cas事務操作

python實現redis三種cas事務操作

cas全稱是compare and set,是一種典型的事務操作。

簡單的說,事務就是為了存取資料庫中同一資料時不破壞操作的隔離性和原子性,從而保證資料的一致性。

一般資料庫,比如MySql是如何保證資料一致性的呢,主要是加鎖,悲觀鎖。比如在訪問資料庫某條資料的時候,會用SELECT FOR UPDATE ,這MySql就會對這條資料進行加鎖,直到事務被提交(COMMIT),或者回滾(ROLLBACK)。如果此時,有其他事務對被加鎖的資料進行寫入,那麼該事務將會被阻塞,直到第一個事務完成為止。它的缺點在於:持有鎖的事務執行越慢,等待解鎖的事務阻塞時間就越長。並且容易產生死鎖(前面有篇文章有講解死鎖)!

本文會介紹三種redis實現cas事務的方法,並會解決下面的虛擬問題:
維護一個值,如果這個值小於當前時間,則設定為當前時間;如果這個值大於當前時間,則設定為當前時間+30。簡單的單執行緒環境下程式碼如下:

# 初始化
r = redis.Redis()
if not r.exists("key_test"):
  r.set("key_test",0)

def inc():
  count = int(r.get('key_test')) + 30 #1
  # 如果值比當前時間小,則設定為當前時間
  count = max(count,int(time.time())) #2
  r.set('key_test',count) #3
  return count

很簡單的一段程式碼,在單執行緒環境下可以跑的很歡,但顯然,是無法移植到多執行緒或者是多程序環境的(程序A和B同時執行到#1,獲取了相同的count值,然後執行#2#3,會導致count值總共只增加了30)。而為了能在多程序環境下執行,我們需要引入一些其他的東西。

py-redis本身自帶的事務操作

redis有這麼幾個和事務相關的命令,multi,exec,watch。通過這幾個命令,可以實現‘將多個命令打包,然後一次性、按順序執行,且不會被終端'。事務會從MULTI開始,執行EXEC後觸發事件。另外,我們還需要WATCH,watch可以監視任意數量的鍵,當在呼叫EXEC執行事務時,如果任意一個鍵被修改了,整個事務不會執行。

下邊是使用redis本身的事務解決cas問題的程式碼。

class CasNormal(object):
  def __init__(self,host,key):
    self.r = redis.Redis(host)
    self.key = key
    if not self.r.exists(self.key):
      self.r.set(self.key,0)

  def inc(self):
    with self.r.pipeline() as pipe:
      while True:
        try:
          #監視一個key,如果在執行期間被修改了,會丟擲WatchError
          pipe.watch(self.key)
          next_count = 30 + int(pipe.get(self.key))
          pipe.multi()
          if next_count < int(time.time()):
            next_count = int(time.time())
          pipe.set(self.key,next_count)
          pipe.execute()
          return next_count
        except WatchError:
          continue
        finally:
          pipe.reset()

程式碼也不復雜,引入了之前說到的multi,watch,如果對事務操作比較熟悉的同學,可以很容易看出來,這是一個樂觀鎖的操作(咱們假設沒人競爭來著,每次去拿資料的時候都不會上鎖,真有人來改了再說。)樂觀鎖在高併發的情況下會顯得很無力,文末的效能對比會顯示這個問題。

使用基於redis的悲觀鎖

悲觀鎖,就是很悲觀的鎖,每次拿資料都會假設別人也要拿,先給鎖起來,用完再把鎖釋放掉。redis本身沒有實現悲觀鎖,但我們可以先用redis實現一個悲觀鎖。

ok,咱們現在有悲觀鎖了,做起事來也有底氣了,根據上邊的程式碼,咱們只要加上@ synchronized註釋就能保證同一時間只有一個程序在執行。下邊是基於悲觀鎖的解決方案。

lock_conn = redis.Redis("localhost")

class CasLock(object):
  def __init__(self,0)

  @synchronized(lock_conn,"lock",10)
  def inc(self):
    next_count = 30 + int(self.r.get(self.key))
    if next_count < int(time.time()):
      next_count = int(time.time())
    self.r.set(self.key,next_count)
    return next_count

程式碼看上去少多了(因為引入了synchronized...)

基於lua指令碼實現

上邊兩種方法都是用鎖來實現的,鎖的實現總會出現競爭的問題,區別無非是出現競爭了咋辦的問題。使用redis lua指令碼的實現,可以直接把這個cas操作當成一個原子操作。

我們知道,redis本身的一系列操作,都是原子操作,且redis會按順序執行所有收到的命令。先看程式碼

class CasLua(object):
  def __init__(self,0)
    self._lua = self.r.register_script("""
    local next_count = redis.call('get',KEYS[1]) + ARGV[1]
    ARGV[2] = tonumber(ARGV[2])
    if next_count < ARGV[2] then
      next_count = ARGV[2]
    end
    redis.call('set',KEYS[1],next_count)
    return tostring(next_count)
        """)

  def inc(self):
    return int(self._lua([self.key],[30,int(time.time())]))

這裡先註冊了這個指令碼,後邊可以直接去使用他。關於redis lua指令碼的文章有不少,感興趣的可以去搜搜看,這邊就不贅述了。

效能對比

這邊的測試只是一個非常簡單的測試(不過還是能看出效果來的),測試換機就是自己的開發機,數字看個大小就行了。

分別測了三種操作在單執行緒,五個執行緒,十個執行緒,五十個執行緒情況下,進行1000次操作各自的表現,時間如下

     optimistic Lock pessimistic lock  lua
1thread       0.43       0.71 0.35
5thread       5.80       3.10 0.62
10thread      17.80       5.60 1.30
50thread      245.00       29.60 6.50

依次是redis本身事務實現的樂觀鎖,基於redis實現的悲觀鎖以及lua實現。

在比較悲觀鎖和樂觀鎖之前,需要先說明一點,這邊的測試對樂觀鎖不是很公平,樂觀鎖本身就是假設不會有很多的併發的。在單執行緒情況下,悲觀鎖要差一些。單執行緒下,不存在競爭關係,悲觀鎖耗時長僅因為是多了一次redis的網路互動。隨著執行緒的增加,悲觀鎖的效能逐漸變好,畢竟悲觀鎖本身就是為了解決這種高併發高競爭的環境而誕生的。在50執行緒的時候,樂觀鎖的實現單次操作的時間要0.245秒,非常恐怖,如果是生產環境,幾乎都不能用了。

至於lua的效能,快的不可思議,幾乎就是線性增加。(50執行緒的情況下,平均的1000次完成時間是6.5s,換言之,6.5秒內執行了50 * 1000次cas操作)。

以上測試都是本地redis,本地測試,如果redis是遠端的,網路互動時間會增加,lua優勢會更加明顯。希望對大家的學習有所幫助,也希望大家多多支援我們。