1. 程式人生 > >Redis整合Spring實現分布式鎖

Redis整合Spring實現分布式鎖

gte 展示 是否 alt cep 狀態 wait 封裝 RR

spring把專門的數據操作獨立封裝在spring-data系列中,spring-data-redis是對Redis的封裝

技術分享圖片
<dependencies>
  <!-- 添加spring-data的支持 -->
      <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
        <version>1.4.2.RELEASE</version>
      </dependency>
      
      <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
          <version>2.6.2</version>
      </dependency>
    
     <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>2.4.2</version>
     </dependency>
     
     
    
     <dependency>
        <groupId>javax.persistence</groupId>
        <artifactId>persistence-api</artifactId>
        <version>1.0.2</version>
     </dependency>
     
      <!-- 添加junit支持 -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>4.3.2.RELEASE</version>
    </dependency>


    <dependency>
        <groupId>commons-logging</groupId>
        <artifactId>commons-logging</artifactId>
        <version>1.2</version>
    </dependency>
    
  </dependencies>
技術分享圖片

Spring 配置文件applicationContext.xml

技術分享圖片
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxws="http://cxf.apache.org/jaxws"
    xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
       http://cxf.apache.org/jaxws http://cxf.apache.org/schema/jaxws.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
       http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 命令空間加入上面這行  -->

 <context:property-placeholder location="classpath:redis.properties"/>
 
 <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
   <property name="maxIdle" value="${redis.maxIdle}" />
   <property name="maxTotal" value="${redis.maxTotal}" />
   <property name="MaxWaitMillis" value="${redis.MaxWaitMillis}" />
   <property name="testOnBorrow" value="${redis.testOnBorrow}" />
 </bean>
 
 
 <bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}" p:pool-config-ref="poolConfig" />

 <bean id="redisTemplate" class="org.spring.framework.data.redis.core.RedisTemplate">
   <property name="connectionFactory" ref="connectionFactory" />
 </bean>
</beans>
技術分享圖片

註意新版的maxTotal,MaxWaitMillis這兩個字段與舊版的不同。

redis連接池配置文件redis.properties

技術分享圖片
redis.host=192.168.2.129
redis.port=6379 
redis.pass=redis129 
 
redis.maxIdle=300 
redis.maxTotal=600 
redis.MaxWaitMillis=1000 
redis.testOnBorrow=true
技術分享圖片

好了,配置完成,下面寫上代碼

測試代碼

技術分享圖片
import javax.persistence.Entity;
import javax.persistence.Table;

/**
 * @author 
 * @date 創建時間:2017年2月9日 上午8:51:02
 * @parameter
 */
@Entity
@Table(name= "t_user")
public class User {
    //主鍵
    private String id;
    //用戶名
    private String userName;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getUserName() {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
    
}
技術分享圖片

BaseRedisDao

技術分享圖片
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * @author 
 * @date 創建時間:2017年2月9日 上午9:02:16
 * @parameter
 */
public class BaseRedisDao<K,V> {
    
    @Autowired(required=true)
    protected RedisTemplate<K,V> redisTemplate;

}
技術分享圖片

IUserDao

技術分享圖片
public interface IUserDao {
     
    public boolean save(User user);
     
    public boolean update(User user);
 
    public boolean delete(String userIds);
     
    public User find(String userId);
     
}
技術分享圖片

UserDao

技術分享圖片
package com.shanheyongmu.dao;

import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;

import com.shanheyongmu.entity.User;

/**
 * @author 
 * @date 創建時間:2017年2月9日 上午9:08:28
 * @parameter
 */
public class UserDao extends BaseRedisDao<String, User> implements IUserDao  {

    public boolean save(final User user) {
        boolean res = redisTemplate.execute(new RedisCallback<Boolean>() {

            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[] key = serializer.serialize(user.getId());
                byte[] value = serializer.serialize(user.getUserName());
                //set not exits
                return connection.setNX(key, value);
            }
            
        });
        return res;
    }

    public boolean update(final User user) {
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {

            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[] key = serializer.serialize(user.getId());
                byte[] name = serializer.serialize(user.getUserName());
                //set
                connection.set(key, name);
                return true;
            }
        });
        return result;
    }

    public boolean delete(final String userId) {
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {

            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[] key = serializer.serialize(userId);
                //delete
                connection.del(key);
                return true;
            }
        });
        return result;
    }

    public User find(final String userId) {
        User result = redisTemplate.execute(new RedisCallback<User>() {

            public User doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[] key = serializer.serialize(userId);
                //get
                byte[] value = connection.get(key);
                if(value == null) {
                    return null;
                }
                String name = serializer.deserialize(value);
                User resUser = new User();
                resUser.setId(userId);
                resUser.setUserName(name);
                return resUser;
            }
        });
        return result;
    }

}
技術分享圖片

Test

寫這個類的時候需要引入junit包和spring-test.jar

技術分享圖片
package com.shanheyongmu.test;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;


import com.shanheyongmu.dao.IUserDao;
import com.shanheyongmu.entity.User;



/**
 * @author 
 * @date 創建時間:2017年2月9日 上午10:42:55
 * @parameter
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:applicationContext.xml"})
public class RedisTest extends AbstractJUnit4SpringContextTests {
    
    @Autowired
    private IUserDao userDao;
    @Test
    public void testSaveUser() {
        User user = new User();
        user.setId("402891815170e8de015170f6520b0000");
        user.setUserName("zhangsan");
    }
    
    @Test
    public void testGetUser() {
        User user = new User();
        user = userDao.find("402891815170e8de015170f6520b0000");
        System.out.println(user.getId() + "-" +user.getUserName());
    }
    
    @Test
    public void testUpdateUser() {
        User user = new User();
        user.setId("402891815170e8de015170f6520b0000");
        user.setUserName("lisi");
        boolean res = userDao.update(user);
        Assert.assertTrue(res);
        
    }
    @Test
    public void testDeleteUser() {
        boolean res = userDao.delete("402891815170e8de015170f6520b0000");
        Assert.assertTrue(res);
    }
    
    
}
技術分享圖片

String類型的增刪該查已完成,Hash,List,Set數據類型的操作就不舉例了,和使用命令的方式差不多。如下

技術分享圖片
connection.hSetNX(key, field, value);
connection.hDel(key, fields);
connection.hGet(key, field);
 
connection.lPop(key);
connection.lPush(key, value);
connection.rPop(key);
connection.rPush(key, values);
 
connection.sAdd(key, values);
connection.sMembers(key);
connection.sDiff(keys);
connection.sPop(key);
技術分享圖片

整合可能遇到的問題

1.NoSuchMethodError

java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V
 
Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V

類似找不到類,找不到方法的問題,當確定依賴的jar已經引入之後,此類問題多事spring-data-redis以及jedis版本問題,多換個版本試試,本文上面提到的版本可以使用。

1.No qualifying bean

No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found for dependency

找不到bean,考慮applicationContext.xml中配置redisTemplate bean時實現類是否寫錯。例如,BaseRedisDao註入的是RedisTemplate類型的對象,applicationContext.xml中配置的實現類卻是RedisTemplate的子類StringRedisTemplate,那肯定報錯。整合好後,下面我們著重學習基於redis的分布式鎖的實現。

基於redis實現的分布式鎖

我們知道,在多線程環境中,鎖是實現共享資源互斥訪問的重要機制,以保證任何時刻只有一個線程在訪問共享資源。鎖的基本原理是:用一個狀態值表示鎖,對鎖的占用和釋放通過狀態值來標識,因此基於redis實現的分布式鎖主要依賴redis的SETNX命令和DEL命令,SETNX相當於上鎖,DEL相當於釋放鎖,當然,在下面的具體實現中會更復雜些。之所以稱為分布式鎖,是因為客戶端可以在redis集群環境中向集群中任一個可用Master節點請求上鎖(即SETNX命令存儲key到redis緩存中是隨機的)。

現在相信你已經對在基於redis實現的分布式鎖的基本概念有了解,需要註意的是,這個和前面文章提到的使用WATCH 命令對key值進行鎖操作沒有直接的關系。java中synchronized和Lock對象都能對共享資源進行加鎖,下面我們將學習用java實現的redis分布式鎖。

java中的鎖技術

在分析java實現的redis分布式鎖之前,我們先來回顧下java中的鎖技術,為了直觀的展示,我們采用“多個線程共享輸出設備”來舉例。

不加鎖共享輸出設備

技術分享圖片

package com.shanheyongmu.lock;


import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


import com.shanheyongmu.redislock.RedisLock;



public class LockTest { //不加鎖 static class Outputer { public void output(String name) { for(int i=0; i<name.length(); i++) { System.out.print(name.charAt(i)); } System.out.println(); } } public static void main(String[] args) { final Outputer output = new Outputer(); //線程1打印zhangsan new Thread(new Runnable(){ @Override public void run() { while(true) { try{ Thread.sleep(1000); }catch(InterruptedException e) { e.printStackTrace(); } output.output("zhangsan"); } } }).start(); //線程2打印lingsi new Thread(new Runnable(){ @Override public void run() { while(true) { try{ Thread.sleep(1000); }catch(InterruptedException e) { e.printStackTrace(); } output.output("lingsi"); } } }).start(); //線程3打印wangwu new Thread(new Runnable(){ @Override public void run() { while(true) { try{ Thread.sleep(1000); }catch(InterruptedException e) { e.printStackTrace(); } output.output("huangwu"); } } }).start(); } }
技術分享圖片

上面例子中,三個線程同時共享輸出設備output,線程1需要打印zhangsan,線程2需要打印lingsi,線程3需要打印wangwu。在不加鎖的情況,這三個線程會不會因為得不到輸出設備output打架呢,我們來看看運行結果:

技術分享圖片
huangwu
zhangslingsi
an
huangwu
zlingsi
hangsan
huangwu
lzhangsan
ingsi
huangwu
lingsi
技術分享圖片

從運行結果可以看出,三個線程打架了,線程1沒打印完zhangsan,線程2就來搶輸出設備......可見,這不是我們想要的,我們想要的是線程之間能有序的工作,各個線程之間互斥的使用輸出設備output。

使用java5中的Lock對輸出設備加鎖

現在我們對Outputer進行改進,給它加上鎖,加鎖之後每次只有一個線程能訪問它。

技術分享圖片
//使用java5中的鎖
static class Outputer{
    Lock lock = new ReentrantLock();
    public void output(String name) {
        //傳統java加鎖
        //synchronized (Outputer.class){
        lock.lock();
        try {
            for(int i=0; i<name.length(); i++) {
                System.out.print(name.charAt(i));
            }
            System.out.println();
        }finally{
            //任何情況下都有釋放鎖
            lock.unlock();
        }  
        //}
    }
}
技術分享圖片

看看加鎖後的輸出結果:

技術分享圖片
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
......
技術分享圖片

從運行結果中可以看出,三個線程之間不打架了,線程之間的打印變得有序。有個這個基礎,下面我們來學習基於Redis實現的分布式鎖就更容易了。

Redis分布式鎖

實現分析

從上面java鎖的使用中可以看出,鎖對象主要有lock與unlock方法,在lock與unlock方法之間的代碼(臨界區)能保證線程互斥訪問。基於redis實現的Java分布式鎖主要依賴redis的SETNX命令和DEL命令,SETNX相當於上鎖(lock),DEL相當於釋放鎖(unlock)。我們只要實現Lock接口重寫lock()和unlock()即可。但是這還不夠,安全可靠的分布式鎖應該滿足滿足下面三個條件:

l 互斥,不管任何時候,只有一個客戶端能持有同一個鎖。

l 不會死鎖,最終一定會得到鎖,即使持有鎖的客戶端對應的master節點宕掉。

l 容錯,只要大多數Redis節點正常工作,客戶端應該都能獲取和釋放鎖。

那麽什麽情況下回不滿足上面三個條件呢。多個線程(客戶端)同時競爭鎖可能會導致多個客戶端同時擁有鎖。比如,

(1)線程1在master節點拿到了鎖(存入key)

(2)master節點在把線程1創建的key寫入slave之前宕機了,此時集群中的節點已經沒有鎖(key)了,包括master節點的slaver節點

(3)slaver節點升級為master節點

(4)線程2向新的master節點發起鎖(存入key)請求,很明顯,能請求成功。

可見,線程1和線程2同時獲得了鎖。如果在更高並發的情況,可能會有更多線程(客戶端)獲取鎖,這種情況就會導致上文所說的線程“打架”問題,線程之間的執行雜亂無章。

那什麽情況下又會發生死鎖的情況呢。如果擁有鎖的線程(客戶端)長時間的執行或者因為某種原因造成阻塞,就會導致鎖無法釋放(unlock沒有調用),其它線程就不能獲取鎖而而產生無限期死鎖的情況。其它線程在執行lock失敗後即使粗暴的執行unlock刪除key之後也不能正常釋放鎖,因為鎖就只能由獲得鎖的線程釋放,鎖不能正常釋放其它線程仍然獲取不到鎖。解決死鎖的最好方式是設置鎖的有效時間(redis的expire命令),不管是什麽原因導致的死鎖,有效時間過後,鎖將會被自動釋放。

為了保障容錯功能,即只要有Redis節點正常工作,客戶端應該都能獲取和釋放鎖,我們必須用相同的key不斷循環向Master節點請求鎖,當請求時間超過設定的超時時間則放棄請求鎖,這個可以防止一個客戶端在某個宕掉的master節點上阻塞過長時間,如果一個master節點不可用了,應該盡快嘗試下一個master節點。釋放鎖比較簡單,因為只需要在所有節點都釋放鎖就行,不管之前有沒有在該節點獲取鎖成功。

Redlock算法

根據上面的分析,官方提出了一種用Redis實現分布式鎖的算法,這個算法稱為RedLock。RedLock算法的主要流程如下:

技術分享圖片

RedLock算法主要流程

 

Java實現

結合上面的流程圖,加上下面的代碼解釋,相信你一定能理解redis分布式鎖的實現原理

技術分享圖片
public class RedisLock implements Lock{
 
    protected StringRedisTemplate redisStringTemplate;
 
    // 存儲到redis中的鎖標誌
    private static final String LOCKED = "LOCKED";
 
    // 請求鎖的超時時間(ms)
    private static final long TIME_OUT = 30000;
 
    // 鎖的有效時間(s)
    public static final int EXPIRE = 60;
 
    // 鎖標誌對應的key;
    private String key;
 
    // state flag
    private volatile boolean isLocked = false;
 
    public RedisLock(String key) {
        this.key = key;
        @SuppressWarnings("resource")
        ApplicationContext  ctx =  new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
        redisStringTemplate = (StringRedisTemplate)ctx.getBean("redisStringTemplate");
    }
 
    @Override
    public void lock() {
        //系統當前時間,毫秒
        long nowTime = System.nanoTime();
        //請求鎖超時時間,毫秒
        long timeout = TIME_OUT*1000000;
        final Random r = new Random();
        try {
            //不斷循環向Master節點請求鎖,當請求時間(System.nanoTime() - nano)超過設定的超時時間則放棄請求鎖
            //這個可以防止一個客戶端在某個宕掉的master節點上阻塞過長時間
            //如果一個master節點不可用了,應該盡快嘗試下一個master節點
            while ((System.nanoTime() - nowTime) < timeout) {
                //將鎖作為key存儲到redis緩存中,存儲成功則獲得鎖
                if (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
                        LOCKED.getBytes())) {
                    //設置鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其他客戶端能搶占鎖之前可以執行任務的時間
                    //可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
                    redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
                    isLocked = true;
                    //上鎖成功結束請求
                    break;
                }
                //獲取鎖失敗時,應該在隨機延時後進行重試,避免不同客戶端同時重試導致誰都無法拿到鎖的情況出現
                //睡眠3毫秒後繼續請求鎖
                Thread.sleep(3, r.nextInt(500));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    @Override
    public void unlock() {
        //釋放鎖
        //不管請求鎖是否成功,只要已經上鎖,客戶端都會進行釋放鎖的操作
        if (isLocked) {
            redisStringTemplate.delete(key);
        }
    }
 
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub
         
    }
 
    @Override
    public boolean tryLock() {
        // TODO Auto-generated method stub
        return false;
    }
 
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }
 
    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }
}
技術分享圖片

好了,RedisLock已經實現,我們對Outputer使用RedisLock進行修改

技術分享圖片
/使用RedisLock
static class Outputer {
    //創建一個名為redisLock的RedisLock類型的鎖
    RedisLock redisLock = new RedisLock("redisLock");
    public void output(String name) {
        //上鎖
        redisLock.lock();
        try {
            for(int i=0; i<name.length(); i++) {
                System.out.print(name.charAt(i));
            }
            System.out.println();
        }finally{
            //任何情況下都要釋放鎖
            redisLock.unlock();
        }  
    }
}
技術分享圖片

看看使用RedisLock加鎖後的的運行結果

技術分享圖片
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
lingsi
zhangsan
huangwu
......
技術分享圖片

可見,使用RedisLock加鎖後線程之間不再“打架”,三個線程互斥的訪問output。

問題

現在我無法論證RedLock算法在分布式、高並發環境下的可靠性,但從本例三個線程的運行結果看,RedLock算法確實保證了三個線程互斥的訪問output(redis.maxIdle=300 redis.maxTotal=600,運行到Timeout waiting for idle object都沒有出現線程“打架”的問題)。我認為RedLock算法仍有些問題沒說清楚,比如,如何防止宕機時多個線程同時獲得鎖;RedLock算法在釋放鎖的處理上,不管線程是否獲取鎖成功,只要上了鎖,就會到每個master節點上釋放鎖,這就會導致一個線程上的鎖可能會被其他線程釋放掉,這就和每個鎖只能被獲得鎖的線程釋放相互矛盾。這些有待後續進一步交流學習研究。

參考文檔

http://redis.io/topics/distlock

http://ifeve.com/redis-lock/

全部系列到此告一段落了 ,可能很多人覺得還不夠 在此我在搜索問題的過程中發現了比較好的 博客spring data+redis

http://www.cnblogs.com/tankaixiong/p/3660075.html

http://www.cnblogs.com/xiohao/p/9063135.html

Redis整合Spring實現分布式鎖