Redis 事務和Pipeline--JAVA(系列文章三)
在傳統的關係型資料當中,使用事務是我們最常見的操作。來到Redis當中這裡有事務嗎,Redis是支援事務的。但是這個事務跟關係型資料庫的傳統事務不一樣,在關係型資料庫當中我們可以對出現錯誤的sql進行回滾,但是在redis是沒有這一說的。
在Redis事務當中,所有操作都是在提交的統一執行的,所以並沒有迴歸操作,其實這個事務更像是批處理的感覺。以下就是事務常用的命令:
1、watch
2、unwatch
3、multi
4、exec
5、discard
以下就是使用Jedis使用事務的程式碼:
public static void main(String args[]){ GenericObjectPoolConfig config = newGenericObjectPoolConfig(); config.setMaxIdle(8); config.setMaxTotal(10); config.setMinIdle(2); config.setMaxWaitMillis(3000); jedisPool = new JedisPool(config, "localhost"); Jedis conn = jedisPool.getResource(); Transaction transaction = conn.multi(); Response<Long> newListPushResult = transaction.rpush("newList","A","B","C"); Response<List<String>> newListResponse = transaction.lrange("newList",0,-1); transaction.exec(); System.out.println("newListPushResult : " + newListPushResult.get()); for (String item : newListResponse.get()) { System.err.println( item + " "); } conn.close();}
通過程式碼我們可以看到我們的執行結果是在exec之後才統一返回,所以Jedis會用一個Response物件最為事務物件transaction的執行放回值。如果我們在transaction執行exec方法之前呼叫response物件的get方法會出現異常:
Exception in thread "main" redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method.
事實上我們還可以使用exec方法的返回值獲得事務執行過程的結果,但是這個顯然是不夠方便的:
public static void main(String args[]){ GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxIdle(8); config.setMaxTotal(10); config.setMinIdle(2); config.setMaxWaitMillis(3000); jedisPool = new JedisPool(config, "localhost"); Jedis conn = jedisPool.getResource(); Transaction transaction = conn.multi(); transaction.rpush("newList","A","B","C"); transaction.lrange("newList",0,-1); List<Object> result = transaction.exec(); System.out.println("newListPushResult : " + result.get(0)); List<String> newListValues = (List<String>) result.get(1); for (String item : newListValues) { System.err.println( item + " "); } conn.close(); }
事務其中最為只要的功能是實現鎖,redis提供一個watch命令。watch是一種樂觀鎖,watch命令的引數為key,當我們watch了一個key的時候,在事務執行之前被修改了,事務是不會執行成功的。只要我們在watch到事務執這段時間未被修改事務才會執行成功。當然如果我們在業務的中途不需要監控這個key的變化了也可以使用unwatch命令進行取消watch。以下是watch在jedis事務程式碼上的使用(下面會模擬一段購買商品的業務):
public static void main(String args[]){ GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxIdle(8); config.setMaxTotal(10); config.setMinIdle(2); config.setMaxWaitMillis(3000); jedisPool = new JedisPool(config, "localhost"); Jedis conn = jedisPool.getResource(); //market 有序集合 score 商品價格 member 商品名稱 conn.zadd("market",100,"ProductA"); conn.zadd("market",33,"ProductB"); //使用者物件 conn.hset("user:1","funds","900"); //監控market和user:1兩個KEY的變化 conn.watch("user:1","market"); Double price = conn.zscore("market","ProductA"); String funds_str = conn.hget("user:1","funds"); if(price == null || funds_str == null){ System.out.println("商品或使用者資訊不存在"); conn.unwatch(); return; } double funds = Double.valueOf(funds_str); if(price > funds){ System.out.println("餘額不足"); conn.unwatch(); return ; } //模擬中圖餘額被修改了,導致不夠錢買單 conn.hset("user:1","funds","20"); Transaction transaction = conn.multi(); transaction.hincrByFloat("user:1","funds",-price); transaction.zrem("market","ProductA"); transaction.zadd("user_package:1",price,"ProductA"); List<Object> result = transaction.exec(); if(result == null || result.size() == 0 ){ System.out.println(">>>執行失敗,中途被watch的key出現修改"); }else{ System.out.println(">>>>執行成功"); } conn.close(); }
輸出如下:
>>>執行失敗,中途被watch的key出現修改。
我們可以使用redis-cli檢視是否有做修改:
127.0.0.1:6379> ZRANGE market 0 -1
1) "ProductB"
2) "ProductA"
可以清楚看到是沒有再market中刪除ProductA的,如果在測試過程中都可以使用MONITOR監控命令,但是在正式環境需要慎重使用MONITOR命令,因為這樣會導致輸出快取暴增問題。雖然watch能解決我們對鎖的需求,但是watch也不是一個完美的解決方案。因為watch是屬於樂觀鎖,如果對於大併發的業務系統中顯然是不合適的,因為redis的處理速度相當的快,所以導致很多時候會出現執行失敗,每次失敗我們都會在一段時間中嘗試重試。這樣會導致大量的頻寬和資源損耗,重複執行也導致很多時候執行效率低下的問題。而且我們發現watch是針對key進行樂觀鎖的,其實在market這個集合當中,我只需要market中的ProductA不被修改和User的funds不被修改即可,並不需要將整個key進行監控。
為了彌補這個問題我們會使用setnx命令進行鎖操作,其實setnx並不是什麼鎖,以下是redis中文社群對setnx命令的解釋:
SETNX key value
將 key
的值設為 value
,當且僅當 key
不存在。
若給定的 key
已經存在,則 SETNX 不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
- 可用版本:
- >= 1.0.0
- 時間複雜度:
- O(1)
- 返回值:
-
設定成功,返回
1
。 設定失敗,返回0
。
public static void main(String args[]) throws InterruptedException { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxIdle(8); config.setMaxTotal(10); config.setMinIdle(2); config.setMaxWaitMillis(3000); jedisPool = new JedisPool(config, "localhost"); Jedis conn = jedisPool.getResource(); //market 有序集合 score 商品價格 member 商品名稱 conn.zadd("market", 100, "ProductA"); conn.zadd("market", 33, "ProductB"); //使用者物件 conn.hset("user:1", "funds", "900"); long startTime = System.currentTimeMillis(); boolean isMarketProductLocked = false; boolean isUserFundsLocked = false; String locked_id = UUID.randomUUID().toString(); while (System.currentTimeMillis() - startTime < 3000) { if (!isMarketProductLocked) { if (conn.setnx("lock:market:productA", locked_id) == 1) { isMarketProductLocked = true; } } if (!isUserFundsLocked) { if (conn.setnx("lock:user:1:funds", locked_id) == 1) { isUserFundsLocked = true; } } if(isUserFundsLocked && isMarketProductLocked){ break; } Thread.sleep(80); } if (!(isMarketProductLocked && isUserFundsLocked)) { System.out.println("請求鎖失敗!"); return; } Double price = conn.zscore("market", "ProductA"); String funds_str = conn.hget("user:1", "funds"); if (price == null || funds_str == null) { System.out.println("商品或使用者資訊不存在"); conn.unwatch(); return; } double funds = Double.valueOf(funds_str); if (price > funds) { System.out.println("餘額不足"); conn.unwatch(); return; } Transaction transaction = conn.multi(); transaction.hincrByFloat("user:1", "funds", -price); transaction.zrem("market", "ProductA"); transaction.zadd("user_package:1", price, "ProductA"); List<Object> result = transaction.exec(); if (result == null || result.size() == 0) { System.out.println(">>>執行失敗,中途被watch的key出現修改"); } else { System.out.println(">>>>執行成功"); } conn.del("lock:market:productA"); conn.del("lock:user:1:funds"); conn.close(); }其實這是利用setnx的特性在業務中實現的元素級別悲觀鎖。以下是我寫的一個業務測試鎖的效能(模擬一堆使用者對商城上購買商品的測試):
public class App { volatile public static long successCounter = 0; volatile public static long failCounter = 0; public static long time = 0; volatile public static JedisPool jedisPool; public static void main(String[] args) { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxIdle(8); config.setMaxTotal(12); config.setMinIdle(5); config.setMaxTotal(30); config.setMaxWaitMillis(3000); jedisPool = new JedisPool(config, "localhost"); Jedis conn = jedisPool.getResource(); conn.flushAll(); List<User> userList = init(jedisPool); time = System.currentTimeMillis(); for (User item : userList) { try { new Thread(new BuyItem(jedisPool.getResource(), item)).start(); } catch (Exception e) { e.printStackTrace(); } } } static List<User> init(JedisPool pool) { List<User> users = new ArrayList<User>(); users.add(new User(1, "A", 9999999999d)); users.add(new User(2, "B", 9999999999d)); users.add(new User(3, "C", 9999999999d)); users.add(new User(4, "D", 9999999999d)); users.add(new User(5, "E", 9999999999d)); users.add(new User(6, "F", 9999999999d)); users.add(new User(7, "G", 9999999999d)); users.add(new User(8, "H", 9999999999d)); users.add(new User(9, "I", 9999999999d)); users.add(new User(10, "J", 9999999999d)); List<Grocery> groceries = new LinkedList<Grocery>(); for (int i = 0; i < 1000000; i++) { double price = new Random().nextInt(10) + 10; groceries.add(new Grocery("grocery_" + i, price)); } initUser(pool.getResource(), users); initMarket(pool.getResource(), groceries); return users; } static void initUser(Jedis conn, List<User> users) { Pipeline pipeline = conn.pipelined(); pipeline.multi(); for (User item : users) { String hashKey = "User:" + item.getId(); pipeline.hset(hashKey, "username", item.getUsername()); pipeline.hset(hashKey, "funds", String.valueOf(item.getFunds())); } pipeline.exec(); conn.close(); } static void initMarket(Jedis conn, List<Grocery> groceries) { Pipeline pipeline = conn.pipelined(); pipeline.multi(); int index = 0; for (Grocery item : groceries) { String zSetKey = "market"; pipeline.zadd(zSetKey, item.getPrice(), item.getName()); if (index++ >= 1000) { index = 0; pipeline.exec(); pipeline.multi(); } } pipeline.exec(); conn.close(); } static class BuyItem implements Runnable { private Jedis conn; private User user; public BuyItem(Jedis conn, User user) { this.conn = conn; this.user = user; } public void run() { String buyerKey = "User:" + user.getId(); String markterKey = "market"; String inventoryOfbuyerKey = "inventory:" + user.getId(); String salerKey = null; try { for (int i = 0; i < 5000; i++) { try { String id = UUID.randomUUID().toString(); long randmon = (long) (Math.random() * conn.zcard(markterKey)); String groceryKey = (String) conn.zrange(markterKey, randmon, randmon).toArray()[0]; boolean hasLock = false; String lockName = "lock:" + markterKey + ":" + groceryKey; long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime <= 5000) { if (conn.setnx(lockName, id) == 1) { hasLock = true; break; } } if (!hasLock) { System.err.println("can not got a lock!"); failCounter++; continue; } double price = conn.zscore(markterKey, groceryKey); if (Double.valueOf(conn.hget(buyerKey, "funds")) < price) { return; } Transaction transaction = conn.multi(); transaction.hincrByFloat(buyerKey, "funds", -price); transaction.zadd(inventoryOfbuyerKey, price, groceryKey); transaction.zrem(markterKey, groceryKey); List<Object> result = transaction.exec(); transaction.close(); if (result == null || result.size() <= 0) { System.out.println("can not exec ! "); failCounter++; } else { successCounter++; } while (true) { conn.watch(lockName); if (id.equals(conn.get(lockName))) { conn.del(lockName); conn.unwatch(); break; } else { conn.unwatch(); break; } } } catch (Exception e) { e.printStackTrace(); App.failCounter++; } } } finally { System.out.println(">>>>>failCounter:" + App.failCounter); System.out.println(">>>>>successCounter:" + App.successCounter); System.out.println(">>>>>time:" + (System.currentTimeMillis() - time)); conn.close(); } } } }
輸出如下:
>>>>>failCounter:0 失敗為0
>>>>>successCounter:49820 成功交易49820次
>>>>>time:11948 耗時為11948毫秒
其實如果稍微做優化這個速度可以更快,不過我們這裡是提現了setnx鎖的作用而已。
Pipeline
其實Pipeline跟事務一樣都是做批量操作的,如果不必要使用事務可以使用Pipeline,Pipeline比事務的速度更快,因為事務處理批量提交還需要其他的一些業務邏輯的。使用pipeline。
Pipeline pipeline = conn.pipelined(); Response<Long> rpushResult = pipeline.rpush("testList", "A", "B", "C"); Response<List<String>> testListResponse = pipeline.lrange("testList", 0, -1); pipeline.sync(); System.out.println("rpushResult:" + rpushResult.get()); for (String item : testListResponse.get()) { System.out.print( item + " "); }
當然可以使用pipeline.syncAndReturnAll(),這個跟transaction.exec的返回方式一樣。
但是需要注意不要一次性搞一堆資料發出去,這樣很可能會阻塞redis,因為人redis是單執行緒的去執行命令的,所以如果一個命令太慢可能會阻塞其他客戶端所以使用flushall,keys * 這些操作也可能會阻塞客戶端。pipline和transaction也同理,如果exec和sync一次性提交的命令非常多就非常有可能會阻塞redis,同時jedis讀寫也是會超時的,如果一次性pipeline提交的資料太多也可能會導致讀寫超時異常,所以我上面一次性寫入market的測試資料都是通過分批次去提交的:
static void initMarket(Jedis conn, List<Grocery> groceries) { Pipeline pipeline = conn.pipelined(); pipeline.multi(); int index = 0; for (Grocery item : groceries) { String zSetKey = "market"; pipeline.zadd(zSetKey, item.getPrice(), item.getName()); if (index++ >= 1000) { index = 0; pipeline.exec(); pipeline.multi(); } } pipeline.exec(); conn.close(); }