1. 程式人生 > >Redis 事務和Pipeline--JAVA(系列文章三)

Redis 事務和Pipeline--JAVA(系列文章三)

在傳統的關係型資料當中,使用事務是我們最常見的操作。來到Redis當中這裡有事務嗎,Redis是支援事務的。但是這個事務跟關係型資料庫的傳統事務不一樣,在關係型資料庫當中我們可以對出現錯誤的sql進行回滾,但是在redis是沒有這一說的。

在Redis事務當中,所有操作都是在提交的統一執行的,所以並沒有迴歸操作,其實這個事務更像是批處理的感覺。以下就是事務常用的命令:

1、watch

2、unwatch

3、multi

4、exec

5、discard

以下就是使用Jedis使用事務的程式碼:

public static void main(String args[]){

    GenericObjectPoolConfig config = new 
GenericObjectPoolConfig(); config.setMaxIdle(8); config.setMaxTotal(10); config.setMinIdle(2); config.setMaxWaitMillis(3000); jedisPool = new JedisPool(config, "localhost"); Jedis conn = jedisPool.getResource(); Transaction transaction = conn.multi(); Response<Long> newListPushResult = transaction.rpush("newList"
,"A","B","C"); Response<List<String>> newListResponse = transaction.lrange("newList",0,-1); transaction.exec(); System.out.println("newListPushResult : " + newListPushResult.get()); for (String item : newListResponse.get()) { System.err.println( item + " "); } conn.close();
}

通過程式碼我們可以看到我們的執行結果是在exec之後才統一返回,所以Jedis會用一個Response物件最為事務物件transaction的執行放回值。如果我們在transaction執行exec方法之前呼叫response物件的get方法會出現異常:

Exception in thread "main" redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method.

事實上我們還可以使用exec方法的返回值獲得事務執行過程的結果,但是這個顯然是不夠方便的:

public static void main(String args[]){

    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(10);
config.setMinIdle(2);
config.setMaxWaitMillis(3000);
jedisPool = new JedisPool(config, "localhost");
Jedis conn = jedisPool.getResource();
Transaction transaction = conn.multi();
transaction.rpush("newList","A","B","C");
transaction.lrange("newList",0,-1);
List<Object> result = transaction.exec();
System.out.println("newListPushResult : " + result.get(0));
List<String> newListValues = (List<String>) result.get(1);
    for (String item : newListValues) {
        System.err.println( item + " ");
}

    conn.close();
}

事務其中最為只要的功能是實現鎖,redis提供一個watch命令。watch是一種樂觀鎖,watch命令的引數為key,當我們watch了一個key的時候,在事務執行之前被修改了,事務是不會執行成功的。只要我們在watch到事務執這段時間未被修改事務才會執行成功。當然如果我們在業務的中途不需要監控這個key的變化了也可以使用unwatch命令進行取消watch。以下是watch在jedis事務程式碼上的使用(下面會模擬一段購買商品的業務):

public static void main(String args[]){

    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(10);
config.setMinIdle(2);
config.setMaxWaitMillis(3000);
jedisPool = new JedisPool(config, "localhost");
Jedis conn = jedisPool.getResource();
//market 有序集合 score 商品價格  member 商品名稱
conn.zadd("market",100,"ProductA");
conn.zadd("market",33,"ProductB");
//使用者物件
conn.hset("user:1","funds","900");
//監控market和user:1兩個KEY的變化
conn.watch("user:1","market");
Double price = conn.zscore("market","ProductA");
String funds_str = conn.hget("user:1","funds");
    if(price == null || funds_str == null){
        System.out.println("商品或使用者資訊不存在");
conn.unwatch();
        return;
}

    double funds = Double.valueOf(funds_str);
    if(price > funds){
        System.out.println("餘額不足");
conn.unwatch();
        return ;
}

    //模擬中圖餘額被修改了,導致不夠錢買單
conn.hset("user:1","funds","20");
Transaction transaction = conn.multi();
transaction.hincrByFloat("user:1","funds",-price);
transaction.zrem("market","ProductA");
transaction.zadd("user_package:1",price,"ProductA");
List<Object> result = transaction.exec();
    if(result == null || result.size() == 0 ){
        System.out.println(">>>執行失敗,中途被watch的key出現修改");
}else{
        System.out.println(">>>>執行成功");
}

    conn.close();
}

輸出如下:

>>>執行失敗,中途被watch的key出現修改。

我們可以使用redis-cli檢視是否有做修改:

127.0.0.1:6379> ZRANGE market 0 -1

1) "ProductB"

2) "ProductA"

可以清楚看到是沒有再market中刪除ProductA的,如果在測試過程中都可以使用MONITOR監控命令,但是在正式環境需要慎重使用MONITOR命令,因為這樣會導致輸出快取暴增問題。

雖然watch能解決我們對鎖的需求,但是watch也不是一個完美的解決方案。因為watch是屬於樂觀鎖,如果對於大併發的業務系統中顯然是不合適的,因為redis的處理速度相當的快,所以導致很多時候會出現執行失敗,每次失敗我們都會在一段時間中嘗試重試。這樣會導致大量的頻寬和資源損耗,重複執行也導致很多時候執行效率低下的問題。而且我們發現watch是針對key進行樂觀鎖的,其實在market這個集合當中,我只需要market中的ProductA不被修改和User的funds不被修改即可,並不需要將整個key進行監控。

為了彌補這個問題我們會使用setnx命令進行鎖操作,其實setnx並不是什麼鎖,以下是redis中文社群對setnx命令的解釋:

SETNX key value

將 key 的值設為 value ,當且僅當 key 不存在。

若給定的 key 已經存在,則 SETNX 不做任何動作。

SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。

可用版本:
>= 1.0.0
時間複雜度:
O(1)
返回值:
設定成功,返回 1 。 設定失敗,返回 0 。
那我們應該怎麼去利用setnx如果存在就無法set的這個特性去實現鎖呢?看下面的程式碼就清楚了:
public static void main(String args[]) throws InterruptedException {

    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(10);
config.setMinIdle(2);
config.setMaxWaitMillis(3000);
jedisPool = new JedisPool(config, "localhost");
Jedis conn = jedisPool.getResource();
//market 有序集合 score 商品價格  member 商品名稱
conn.zadd("market", 100, "ProductA");
conn.zadd("market", 33, "ProductB");
//使用者物件
conn.hset("user:1", "funds", "900");
    long startTime = System.currentTimeMillis();
    boolean isMarketProductLocked = false;
    boolean isUserFundsLocked = false;
String locked_id = UUID.randomUUID().toString();
    while (System.currentTimeMillis() - startTime < 3000) {
        if (!isMarketProductLocked) {
            if (conn.setnx("lock:market:productA", locked_id) == 1) {
                isMarketProductLocked = true;
}
        }
        if (!isUserFundsLocked) {
            if (conn.setnx("lock:user:1:funds", locked_id) == 1) {
                isUserFundsLocked = true;
}
        }
        if(isUserFundsLocked && isMarketProductLocked){
            break;
}
        Thread.sleep(80);
}

    if (!(isMarketProductLocked && isUserFundsLocked)) {
        System.out.println("請求鎖失敗!");
        return;
}


    Double price = conn.zscore("market", "ProductA");
String funds_str = conn.hget("user:1", "funds");
    if (price == null || funds_str == null) {
        System.out.println("商品或使用者資訊不存在");
conn.unwatch();
        return;
}

    double funds = Double.valueOf(funds_str);
    if (price > funds) {
        System.out.println("餘額不足");
conn.unwatch();
        return;
}


    Transaction transaction = conn.multi();
transaction.hincrByFloat("user:1", "funds", -price);
transaction.zrem("market", "ProductA");
transaction.zadd("user_package:1", price, "ProductA");
List<Object> result = transaction.exec();
    if (result == null || result.size() == 0) {
        System.out.println(">>>執行失敗,中途被watch的key出現修改");
} else {
        System.out.println(">>>>執行成功");
}

    conn.del("lock:market:productA");
conn.del("lock:user:1:funds");
conn.close();
}
其實這是利用setnx的特性在業務中實現的元素級別悲觀鎖。以下是我寫的一個業務測試鎖的效能(模擬一堆使用者對商城上購買商品的測試):
public class App {

    volatile public static long successCounter = 0;
    volatile public static long failCounter = 0;
    public static long time = 0;
    volatile public static JedisPool jedisPool;
    public static void main(String[] args) {

        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(12);
config.setMinIdle(5);
config.setMaxTotal(30);
config.setMaxWaitMillis(3000);
jedisPool = new JedisPool(config, "localhost");
Jedis conn = jedisPool.getResource();
conn.flushAll();
List<User> userList = init(jedisPool);
time = System.currentTimeMillis();
        for (User item : userList) {
            try {
                new Thread(new BuyItem(jedisPool.getResource(), item)).start();
} catch (Exception e) {
                e.printStackTrace();
}
        }

    }

    static List<User> init(JedisPool pool) {
        List<User> users = new ArrayList<User>();
users.add(new User(1, "A", 9999999999d));
users.add(new User(2, "B", 9999999999d));
users.add(new User(3, "C", 9999999999d));
users.add(new User(4, "D", 9999999999d));
users.add(new User(5, "E", 9999999999d));
users.add(new User(6, "F", 9999999999d));
users.add(new User(7, "G", 9999999999d));
users.add(new User(8, "H", 9999999999d));
users.add(new User(9, "I", 9999999999d));
users.add(new User(10, "J", 9999999999d));
List<Grocery> groceries = new LinkedList<Grocery>();
        for (int i = 0; i < 1000000; i++) {
            double price = new Random().nextInt(10) + 10;
groceries.add(new Grocery("grocery_" + i, price));
}

        initUser(pool.getResource(), users);
initMarket(pool.getResource(), groceries);
        return users;
}

    static void initUser(Jedis conn, List<User> users) {
        Pipeline pipeline = conn.pipelined();
pipeline.multi();
        for (User item : users) {
            String hashKey = "User:" + item.getId();
pipeline.hset(hashKey, "username", item.getUsername());
pipeline.hset(hashKey, "funds", String.valueOf(item.getFunds()));
}
        pipeline.exec();
conn.close();
}

    static void initMarket(Jedis conn, List<Grocery> groceries) {
        Pipeline pipeline = conn.pipelined();
pipeline.multi();
        int index = 0;
        for (Grocery item : groceries) {
            String zSetKey = "market";
pipeline.zadd(zSetKey, item.getPrice(), item.getName());
            if (index++ >= 1000) {
                index = 0;
pipeline.exec();
pipeline.multi();
}
        }
        pipeline.exec();
conn.close();
}


    static class BuyItem implements Runnable {

        private Jedis conn;
        private User user;
        public BuyItem(Jedis conn, User user) {
            this.conn = conn;
            this.user = user;
}

        public void run() {

            String buyerKey = "User:" + user.getId();
String markterKey = "market";
String inventoryOfbuyerKey = "inventory:" + user.getId();
String salerKey = null;
            try {
                for (int i = 0; i < 5000; i++) {

                    try {
                        String id = UUID.randomUUID().toString();
                        long randmon = (long) (Math.random() * conn.zcard(markterKey));
String groceryKey = (String) conn.zrange(markterKey, randmon, randmon).toArray()[0];
                        boolean hasLock = false;
String lockName = "lock:" + markterKey + ":" + groceryKey;
                        long startTime = System.currentTimeMillis();
                        while (System.currentTimeMillis() - startTime <= 5000) {
                            if (conn.setnx(lockName, id) == 1) {
                                hasLock = true;
                                break;
}
                        }
                        if (!hasLock) {
                            System.err.println("can not got a lock!");
failCounter++;
                            continue;
}

                        double price = conn.zscore(markterKey, groceryKey);
                        if (Double.valueOf(conn.hget(buyerKey, "funds")) < price) {
                            return;
}

                        Transaction transaction = conn.multi();
transaction.hincrByFloat(buyerKey, "funds", -price);
transaction.zadd(inventoryOfbuyerKey, price, groceryKey);
transaction.zrem(markterKey, groceryKey);
List<Object> result = transaction.exec();
transaction.close();
                        if (result == null || result.size() <= 0) {
                            System.out.println("can not exec ! ");
failCounter++;
} else {
                            successCounter++;
}

                        while (true) {
                            conn.watch(lockName);
                            if (id.equals(conn.get(lockName))) {
                                conn.del(lockName);
conn.unwatch();
                                break;
} else {
                                conn.unwatch();
                                break;
}
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
App.failCounter++;
}
                }
            } finally {
                System.out.println(">>>>>failCounter:" + App.failCounter);
System.out.println(">>>>>successCounter:" + App.successCounter);
System.out.println(">>>>>time:" + (System.currentTimeMillis() - time));
conn.close();
}
        }


    }
}

輸出如下:

>>>>>failCounter:0  失敗為0
>>>>>successCounter:49820  成功交易49820次
>>>>>time:11948 耗時為11948毫秒

其實如果稍微做優化這個速度可以更快,不過我們這裡是提現了setnx鎖的作用而已。

Pipeline

其實Pipeline跟事務一樣都是做批量操作的,如果不必要使用事務可以使用Pipeline,Pipeline比事務的速度更快,因為事務處理批量提交還需要其他的一些業務邏輯的。使用pipeline。

Pipeline pipeline = conn.pipelined();
Response<Long> rpushResult = pipeline.rpush("testList", "A", "B", "C");
Response<List<String>> testListResponse = pipeline.lrange("testList", 0, -1);
pipeline.sync();
System.out.println("rpushResult:" + rpushResult.get());
for (String item : testListResponse.get()) {
    System.out.print( item + " ");
}

當然可以使用pipeline.syncAndReturnAll(),這個跟transaction.exec的返回方式一樣。

但是需要注意不要一次性搞一堆資料發出去,這樣很可能會阻塞redis,因為人redis是單執行緒的去執行命令的,所以如果一個命令太慢可能會阻塞其他客戶端所以使用flushall,keys * 這些操作也可能會阻塞客戶端。pipline和transaction也同理,如果exec和sync一次性提交的命令非常多就非常有可能會阻塞redis,同時jedis讀寫也是會超時的,如果一次性pipeline提交的資料太多也可能會導致讀寫超時異常,所以我上面一次性寫入market的測試資料都是通過分批次去提交的:

static void initMarket(Jedis conn, List<Grocery> groceries) {
    Pipeline pipeline = conn.pipelined();
pipeline.multi();
    int index = 0;
    for (Grocery item : groceries) {
        String zSetKey = "market";
pipeline.zadd(zSetKey, item.getPrice(), item.getName());
        if (index++ >= 1000) {
            index = 0;
pipeline.exec();
pipeline.multi();
}
    }
    pipeline.exec();
conn.close();
}