1. 程式人生 > >基於redis叢集實現的分散式鎖,可用於秒殺商品的庫存數量管理,有測試程式碼(何志雄)

基於redis叢集實現的分散式鎖,可用於秒殺商品的庫存數量管理,有測試程式碼(何志雄)

轉載請標明出處。

在分散式系統中,經常會出現需要競爭同一資源的情況,本程式碼基於redis3.0.1+jedis2.7.1實現了分散式鎖。

可用於例如秒殺系統中的商品庫存的管理。付完整程式碼及測試用例。

package com.gaojiasoft.gaojiaRedis;

import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
 * 分散式鎖管理器,支援對單個資源加鎖解鎖,或給一批資源的批量加鎖及解鎖
* @ClassName: DistributedLockManger 
* @Description:  
* @author 何志雄 001
* @company 南京高嘉軟體技術有限公司
* @date 2015年6月3日 下午5:44:06
 */
public class DistributedLockManger
{

    private static final Logger LOGGER = LoggerFactory.getLogger(DistributedLockManger.class);

    private static final int DEFAULT_SINGLE_EXPIRE_TIME = 3;

    //    private static final int DEFAULT_BATCH_EXPIRE_TIME = 6;

    //static的變數無法註解
    @Autowired
    JedisCluster jc;

    private static DistributedLockManger lockManger;

    public DistributedLockManger()
    {
    }

    @PostConstruct
    public void init()
    {
        lockManger = this;
        lockManger.jc = this.jc;
    }

    /**
     * 獲取鎖 如果鎖可用   立即返回true,  否則立即返回false,作為非阻塞式鎖使用 
     * @param key
     * @param value
     * @return
     */
    public static boolean tryLock(String key , String value)
    {
        try
        {
            return tryLock(key, value, 0L, null);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 鎖在給定的等待時間內空閒,則獲取鎖成功 返回true, 否則返回false,作為阻塞式鎖使用
     * @param key 鎖鍵
     * @param value 被誰鎖定
     * @param timeout 嘗試獲取鎖時長,建議傳遞500,結合實踐單位,則可表示500毫秒
     * @param unit,建議傳遞TimeUnit.MILLISECONDS
     * @return
     * @throws InterruptedException 
     */
    public static boolean tryLock(String key , String value , long timeout , TimeUnit unit) throws InterruptedException
    {
        //納秒
        long begin = System.nanoTime();
        do
        {
            //LOGGER.debug("{}嘗試獲得{}的鎖.", value, key);
            Long i = lockManger.jc.setnx(key, value);
            if (i == 1)
            {
                lockManger.jc.expire(key, DEFAULT_SINGLE_EXPIRE_TIME);
                LOGGER.debug("{}成功獲取{}的鎖,設定鎖過期時間為{}秒 ", value, key, DEFAULT_SINGLE_EXPIRE_TIME);
                return true;
            }
            else
            {
                // 存在鎖 ,但可能獲取不到,原因是獲取的一剎那間
//                String desc = lockManger.jc.get(key);
//                LOGGER.error("{}正被{}鎖定.", key, desc);
            }
            if (timeout == 0)
            {
                break;
            }
            //在其睡眠的期間,鎖可能被解,也可能又被他人佔用,但會嘗試繼續獲取鎖直到指定的時間
            Thread.sleep(100);
        }
        while ((System.nanoTime() - begin) < unit.toNanos(timeout));
        //因超時沒有獲得鎖
        return false;
    }

    /**
     * 釋放單個鎖
     * @param key 鎖鍵
     * @param value 被誰釋放
     */
    public static void unLock(String key , String value)
    {
        try
        {
            lockManger.jc.del(key);
            LOGGER.debug("{}鎖被{}釋放 .", key, value);
        }
        catch (JedisConnectionException je)
        {

        }
        catch (Exception e)
        {

        }
    }

public void test() throws InterruptedException
    {
        String productId = "18061249844";
        String userId;
        for (int i = 1; i <= 500; i++)
        {
            //隨機產生一個使用者
            userId = UUID.randomUUID().toString();
            //該使用者試圖鎖定(如果被鎖,嘗試等待300毫秒),在處理一些事情後,再釋放鎖
            testLock(productId, userId);
            Thread.sleep(20);
        }
    }

    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(150, 150, 30L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(), new BasicThreadFactory.Builder().daemon(true)
                    .namingPattern("mongo-oper-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());

    private static void testLock(final String key , final String value)
    {
        executor.execute(new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    //獲取鎖,如果不能立即獲取,嘗試等待1000毫秒
                    boolean isLock = DistributedLockManger.tryLock(key, value, 500, TimeUnit.MILLISECONDS);
                    if (isLock)
                    {
                        //doSomething(佔用鎖20毫秒到4秒,模擬處理事務)
                        long doSomeThingTime = RandomUtils.nextLong(20, 4000);
                        LOGGER.debug("{}將持有鎖{}時長{}毫秒.", value, key, doSomeThingTime);
                        Thread.sleep(doSomeThingTime);
                        //然後釋放鎖
                        DistributedLockManger.unLock(key, value);
                    }
                }
                catch (Throwable th)
                {
                }
            }
        });
    }

    public JedisCluster getJc()
    {
        return jc;
    }

    public void setJc(JedisCluster jc)
    {
        this.jc = jc;
    }
}

Spring配置:

redis.properties

#最大閒置連線數
im.hs.server.redis.maxIdle=500
#最大連線數,超過此連線時操作redis會報錯
im.hs.server.redis.maxTotal=5000
im.hs.server.redis.maxWaitTime=1000
im.hs.server.redis.testOnBorrow=true
#最小閒置連線數,spring啟動的時候自動建立該數目的連線供應用程式使用,不夠的時候會申請。
im.hs.server.redis.minIdle=300

spring-redis.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="
			http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

	<context:property-placeholder location="classpath:conf/redis/redis.properties" />

	<!-- 事實上,只需要連線1個節點就可以 -->
	<bean id="culster7001" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.153" />
		<constructor-arg name="port" value="7001" />
	</bean>
	<bean id="culster7002" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.153" />
		<constructor-arg name="port" value="7002" />
	</bean>
	<bean id="culster7003" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.154" />
		<constructor-arg name="port" value="7003" />
	</bean>
	<bean id="culster7004" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.154" />
		<constructor-arg name="port" value="7004" />
	</bean>
	<bean id="culster7005" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.155" />
		<constructor-arg name="port" value="7005" />
	</bean>
	<bean id="culster7006" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.155" />
		<constructor-arg name="port" value="7006" />
	</bean>
	<bean id="culster7007" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.152" />
		<constructor-arg name="port" value="7007" />
	</bean>
	<bean id="culster7008" class="redis.clients.jedis.HostAndPort">
		<constructor-arg name="host" value="192.168.62.152" />
		<constructor-arg name="port" value="7008" />
	</bean>
	<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
		<property name="maxTotal" value="${im.hs.server.redis.maxTotal}" />
		<property name="minIdle" value="${im.hs.server.redis.minIdle}" />
		<property name="maxWaitMillis" value="${im.hs.server.redis.maxWaitTime}" />
		<property name="maxIdle" value="${im.hs.server.redis.maxIdle}" />
		<property name="testOnBorrow" value="${im.hs.server.redis.testOnBorrow}" />
		<property name="testOnReturn" value="true" />
		<property name="testWhileIdle" value="true" />
	</bean>


	<bean id="jc" class="redis.clients.jedis.JedisCluster">
		<constructor-arg name="nodes">
			<set>
				<ref bean="culster7001" />
				<ref bean="culster7002" />
				<ref bean="culster7003" />
				<ref bean="culster7004" />
				<ref bean="culster7005" />
				<ref bean="culster7006" />
				<ref bean="culster7007" />
				<ref bean="culster7008" />
			</set>
		</constructor-arg>
		<constructor-arg name="poolConfig"> <ref bean="poolConfig" /> </constructor-arg>
	</bean>
	
	<bean id="distributedLock" class="com.gaojiasoft.gaojiaRedis.DistributedLockManger" />

	<context:annotation-config />
</beans>
測試程式碼:
package com.gaojiasoft.gaojiaRedis;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import junit.framework.TestCase;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ClusterJedisTester extends TestCase
{
    private Logger logger = LoggerFactory.getLogger(ClusterJedisTester.class);

    private static String[] list = new String[] { "classpath:conf/redis/spring-redis.xml" };
    
    private static ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(list, true, null);
    
    DistributedLockManger  distributedLock = (DistributedLockManger)context.getBean("distributedLock");
    
    @Test
    public void testLock() throws InterruptedException
    {
        distributedLock.test();
        while(true)
        {
            Thread.sleep(1000);
        }
    }
}