1. 程式人生 > >分散式鎖實現生成唯一訂單編號

分散式鎖實現生成唯一訂單編號

https://blog.csdn.net/qq_36801998/article/details/85001472

前言
一:發展由來
大多數網際網路系統都是分散式部署的,分散式部署確實能帶來效能和效率上的提升,但為此,我們就需要多解決一個分散式環境下,資料一致性的問題。

當某個資源在多系統之間,具有共享性的時候,為了保證大家訪問這個資源資料是一致的,那麼就必須要求在同一時刻只能被一個客戶端處理,不能併發的執行,否者就會出現同一時刻有人寫有人讀,大家訪問到的資料就不一致了。

二:我們為什麼需要分散式鎖?
在單機時代,雖然不需要分散式鎖,但也面臨過類似的問題,只不過在單機的情況下,如果有多個執行緒要同時訪問某個共享資源的時候,我們可以採用執行緒間加鎖的機制,即當某個執行緒獲取到這個資源後,就立即對這個資源進行加鎖,當使用完資源之後,再解鎖,其它執行緒就可以接著使用了。 在多使用者環境中,在同一時間可能會有多個使用者更新相同的記錄,這會產生衝突。這就是著名的併發性問題。
典型的衝突有:
1.丟失更新:一個事務的更新覆蓋了其它事務的更新結果,就是所謂的更新丟失。例如:使用者A把值從6改為2,使用者B把值從2改為6,則使用者A丟失了他的更新。
2.髒讀:當一個事務讀取其它完成一半事務的記錄時,就會發生髒讀取。例如:使用者A,B看到的值都是6,使用者B把值改為2,使用者A讀到的值仍為6。

例如,在JAVA中,甚至專門提供了一些處理鎖機制的一些API(synchronize/Lock等), 在同一個jvm程序中時,可以使用JUC提供的一些鎖來解決多個執行緒競爭同一個共享資源時候的執行緒安全問題,但是當多個不同機器上的不同jvm程序共同競爭同一個共享資源時候,juc包的鎖就無能無力了,這時候就需要分散式鎖了。

但是到了分散式系統的時代,這種執行緒之間的鎖機制,就沒作用了,系統可能會有多份並且部署在不同的機器上,這些資源已經不是線上程之間共享了,而是屬於程序之間共享的資源。

因此,為了解決這個問題,我們就必須引入「分散式鎖」。

分散式鎖,是指在分散式的部署環境下,通過鎖機制來讓多客戶端互斥的對共享資源進行訪問。

分散式鎖要滿足哪些要求呢?

排他性:在同一時間只會有一個客戶端能獲取到鎖,其它客戶端無法同時獲取

避免死鎖:這把鎖在一段有限的時間之後,一定會被釋放(正常釋放或異常釋放)

常見的有使用redis的setNX函式,樂觀鎖資料庫version版本來實現,ZooKeeper鎖,本節我們談談使用zookeeper的臨時序列節點機制來實現一個分散式鎖。

三:各種鎖的對比
1Synchronized 修身的Java方法,其實就是Synchronized對 this或類(靜態類) 的鎖定,解決資源競爭問題 效能最低,儘量少用
臨界區:通過對多執行緒的序列化來訪問公共資源或一段程式碼

2互斥量:採用互斥物件機制。只有擁有互斥物件的執行緒才能夠訪問公共資源的許可權
Synchronized 修身的程式碼程式碼塊 ,單臺伺服器可使用。

3分散式鎖的實現技術資料庫version版本號
基於資料庫實現分散式鎖 多采用樂觀鎖實現。儘量少用
效能較差,容易出現單點故障
鎖沒有失效的時間,容易死鎖
非阻塞式的

4基於快取實現分散式鎖 小系統多用,Redis實現原理:儲存Redis資料key表示添加了鎖,設定失效時間,刪除key表示解鎖。
非阻塞式的

5基於Zookeeper實現分散式鎖
實現比較簡單、可靠性高、效能較好

四:開始實踐zookeeper的臨時序列節點機制實現
啟動zk服務端

pom依賴
    <!--zookeeper-->
    <dependency>
             <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
     </dependency>
1
2
3
4
5
6
自定義鎖介面
/**
 * 我的分散式鎖介面
 */
public interface ILock {

     //獲取鎖
     void Lock();

     //釋放鎖
     void UnLock();
}

1
2
3
4
5
6
7
8
9
10
11
12
抽象類實現該鎖介面
import com.al.lock.service.ILock;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/***
 * 實現Ilock鎖 連線zk 建立臨時節點
 */
public abstract class ZKAbstractLock implements ILock {
    private static Logger log=LoggerFactory.getLogger(ZKAbstractLock.class);
    private static String host="localhost";
    private static String port="2181";

    //分散式鎖,通過建立統一的臨時節點 建立成功則表示獲取鎖成功,否則失敗
    protected static String node="/zklock";

    protected ZkClient client=new ZkClient(host+":"+port);

    /**
     * 1:先試著建立臨時節點,建立成功則獲取鎖
     * 2:如果建立失敗,表示已被其他執行緒上鎖了;需要監視這個節點刪除(其他執行緒釋放鎖),並且使用CountDownLatch 休眠當前執行緒
     * 3:當其他執行緒釋放鎖後,喚醒當前執行緒,重新獲取鎖。
     * 缺點:有多個執行緒等待這個鎖時,一個執行緒釋放鎖後,其他執行緒都會被喚醒進行鎖的獲取(只有一個會成功獲取),
     * 這樣導致競爭激烈,資源浪費。
     * 解決思路,使用臨時順序節點,當有鎖後只有一個執行緒監視這個節點,其他執行緒不監視。這個執行緒釋放鎖後通知下一個執行緒獲取鎖
     */
    public void Lock() {
        if(tryLock()){
            log.info(Thread.currentThread().getName()+ " Get Lock Success!!!");
        }else{
            //等待之後重新獲取鎖
            waitforLock();
            Lock();
        }
    }
    //釋放當前鎖,由於建立的是臨時節點,則關閉zk的連線,鎖自動釋放
    public void UnLock() {
        client.close();
    }
    //試著去獲取鎖
    protected abstract boolean tryLock();
    //等待獲取鎖
    protected abstract void waitforLock();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
繼承抽象類完成具體的實現
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkException;

import java.util.concurrent.CountDownLatch;

public class ZKLockImp extends ZKAbstractLock {
    private CountDownLatch cld = null;
    /**
     * 建立臨時節點,建立成功則說明獲取鎖,否則表示獲取鎖失敗
     */
    @Override
    protected boolean tryLock() {
        try {
            client.createEphemeral(node);
            return true;
        }catch (ZkException e) {
            return false;
        }
    }
    /**
     * 獲取鎖失敗後,需要在這裡讓執行緒休眠等待
     */
    @Override
    protected void waitforLock() {
        //對ZK建立一個節點監視器 watcher
        IZkDataListener listener = new IZkDataListener() {
            //當zk臨時節點刪除後觸發。當其他執行緒釋放鎖後,這個臨時節點會被刪除,從而觸發
            //讓CountDownLatch 減一,從而喚醒執行緒
            public void handleDataDeleted(String dataPath) throws Exception {
                if (cld != null) {
                    cld.countDown();
                }
            }
            //當節點值改變後觸發
            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
            }
        };
        //對節點新增監視器
        client.subscribeDataChanges(node, listener);
        //節點存在表示之前有鎖已經被佔用,讓執行緒等待這裡
        if (client.exists(node)) {
            cld = new CountDownLatch(1);
            try {
                cld.await();//(非同步等待,定減為0)執行緒會在這裡堵塞,指定門閂指數為0
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //對節點移除監視器
        client.unsubscribeDataChanges(node, listener);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
分散式鎖工廠類
   
/***
 * 分散式鎖生產工廠
 */
public class OrderFactory {
    private static Integer i=0;
    public  static String GetOrder(){
        //JDK 鎖
//        synchronized (i) {
//            i++;
//            return "NewOrder"+new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"+i).format(new Date());
//        }
        //分散式鎖
        i++;
        String ss= "NewOrder"+new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"+i).format(new Date());
        return ss;
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
使用CountDownLatch測試生產分散式鎖唯一訂單編號
   
import com.al.lock.service.ILock;
import com.al.lock.service.impl.OrderFactory;
import com.al.lock.service.impl.ZKLockImp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
/***
 *  持久節點(PERSISTENT ):節點建立後,一直存在,直到主動刪除了該節點。
 * - 臨時節點(EPHEMERAL) :生命週期和客戶端會話繫結,一旦客戶端會話失效,這個節點就會自動刪除。
 * - 序列節點(SEQUENTIAL ):多個執行緒建立同一個順序節點時候,每個執行緒會得到一個帶有編號的節點,節點編號是遞增不重複的
 */
/***
 * 使用了zk節點唯一性來分散式保證高併發鎖
 * 缺點:如果使用的臨時節點,那麼如果一旦當前節點釋放鎖後,其餘執行緒都會同時類訪問當前鎖,就會導致記憶體消耗,效能下降
 * 解決:使用zk臨時順序節點,當前節點獲取了鎖,只有後面一個執行緒進行等待,其餘執行緒無需等待,這樣就大大提高了程式的效能。
 */
public class OrderIDTest implements Runnable  {

    private static int count = 100;//併發執行緒數量
    private static CountDownLatch cdl = new CountDownLatch(count);
    ILock lock = new ZKLockImp();

    //啟動執行緒
    public void run() {
        createOrderNum();
    }

    //建立訂單
    public void createOrderNum() {
        lock.Lock();//建立臨時節點上鎖,其他執行緒等待
        String orderNum = OrderFactory.GetOrder();//開始生產訂單id
        System.out.println(Thread.currentThread().getName() + "建立了訂單號:【" + orderNum+ "】!");
        lock.UnLock();//生產完畢釋放當前鎖
    }

    //1000個併發模擬生成唯一訂單ID
    public static void main(String[] args) {
        for (int i = 0; i < count; i++) {
            new Thread(new OrderIDTest()).start();
            //發令槍裡面的數字減一
            cdl.countDown();
        }
    }
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48


使用壓測工具 jmeter測試生產分散式鎖唯一訂單編號

五:基於樂觀鎖資料庫version版本號控制
簡介:
我們先來看一下如何基於「樂觀鎖」來實現:

樂觀鎖機制其實就是在資料庫表中引入一個版本號(version)欄位來實現的。

當我們要從資料庫中讀取資料的時候,同時把這個version欄位也讀出來,如果要對讀出來的資料進行更新後寫回資料庫,則需要將version加1,同時將新的資料與新的version更新到資料表中,且必須在更新的時候同時檢查目前資料庫裡version值是不是之前的那個version,如果是,則正常更新。如果不是,則更新失敗,說明在這個過程中有其它的程序去更新過資料了。

如圖,假設同一個賬戶,使用者A和使用者B都要去進行取款操作,賬戶的原始餘額是2000,使用者A要去取1500,使用者B要去取1000,如果沒有鎖機制的話,在併發的情況下,可能會出現餘額同時被扣1500和1000,導致最終餘額的不正確甚至是負數。但如果這裡用到樂觀鎖機制,當兩個使用者去資料庫中讀取餘額的時候,除了讀取到2000餘額以外,還讀取了當前的版本號version=1,等使用者A或使用者B去修改資料庫餘額的時候,無論誰先操作,都會將版本號加1,即version=2,那麼另外一個使用者去更新的時候就發現版本號不對,已經變成2了,不是當初讀出來時候的1,那麼本次更新失敗,就得重新去讀取最新的資料庫餘額。

通過上面這個例子可以看出來,使用「樂觀鎖」機制,必須得滿足:

(1)鎖服務要有遞增的版本號version

(2)每次更新資料的時候都必須先判斷版本號對不對,然後再寫入新的版本號

   
update task 

set key =value,version=version+1

where id=#{id} and version=#{version}; //這條語句,就可以更新資料成功啦~


1
2
3
4
5
6
7
8
六:基於redis快取來實現分散式鎖
簡介:
Redis為單程序單執行緒模式,採用佇列模式將併發訪問變成序列訪問,且多客戶端對Redis的連線並不存在競爭關係。redis的SETNX命令可以方便的實現分散式鎖。

SETNX命令簡介

將 key 的值設為 value,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是SET if Not eXists的簡寫。

返回整數,具體為

1,當 key 的值被設定
0,當 key 的值沒被設定
expire命令簡介

expire key timeout
為key設定一個超時時間,單位為second,超過這個時間鎖會自動釋放,避免死鎖。

delete命令簡介

delete key
刪除key

在使用Redis實現分散式鎖的時候,主要就會使用到這三個命令。

實踐:使用的是jedis來連線Redis
獲取鎖的時候,使用setnx加鎖,並使用expire命令為鎖新增一個超時時間,超過該時間則自動釋放鎖,鎖的value值為一個隨機生成的UUID,通過此在釋放鎖的時候進行判斷。
獲取鎖的時候還設定一個獲取的超時時間,若超過這個時間則放棄獲取鎖。
釋放鎖的時候,通過UUID判斷是不是該鎖,若是該鎖,則執行delete進行鎖釋放。
pom.xml
        <!-- jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.2</version>
        </dependency>
1
2
3
4
5
6
redis.properties
server.port=8098

#redis jedis配置

# Redis資料庫索引(預設為0)
spring.redis.database=0
# Redis伺服器地址
spring.redis.host=127.0.0.1
# Redis伺服器連線埠
spring.redis.port=6379
# Redis伺服器連線密碼(預設為空)
#spring.redis.password=
# 連線池最大連線數(使用負值表示沒有限制)
spring.redis.pool.max-active=-1
# 連線池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.pool.max-wait=-1
# 連線池中的最大空閒連線
spring.redis.pool.max-idle=8
# 連線池中的最小空閒連線
spring.redis.pool.min-idle=0
# 連線超時時間(毫秒)
spring.redis.timeout=0
#spring-session 使用
spring.session.store-type=none


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
RedisConfig初始化Jedis連線
        
@Slf4j
@Configuration
@PropertySource("classpath:redis.properties")
public class RedisConfig extends CachingConfigurerSupport {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private int port;

    @Value("${spring.redis.timeout}")
    private int timeout;

    @Value("${spring.redis.pool.max-active}")
    private int maxActive;

    @Value("${spring.redis.pool.max-idle}")
    private int maxIdle;

    @Value("${spring.redis.pool.min-idle}")
    private int minIdle;

    @Value("${spring.redis.pool.max-wait}")
    private long maxWaitMillis;

    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public JedisPool redisPoolFactory(){
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(maxIdle);
        jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
        jedisPoolConfig.setMaxTotal(maxActive);
        jedisPoolConfig.setMinIdle(minIdle);
        JedisPool jedisPool = new JedisPool(jedisPoolConfig,host,port,timeout,password);

        log.info("JedisPool注入成功!");
        log.info("redis地址:" + host + ":" + port);
        return  jedisPool;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
定義鎖的介面
      
/***
 * 我的鎖實現
 */
public interface LockService {
    /***
     *  獲得鎖並且設定鎖失效時間
     * @param locaName  鎖的key
     * @param acquireTimeout 獲取超時時間
     * @param timeout 鎖的超時時間
     * @return 鎖標識
     */
    String lockWithTimeout(String locaName, long acquireTimeout, long timeout);

    /***
     *  根據鎖的名稱刪除這個鎖
     * @param lockName 鎖的key
     * @param identifier  釋放鎖的標識
     * @return 是否成功
     */
    boolean releaseLock(String lockName, String identifier) ;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
實現該介面進行
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;

import java.util.List;
import java.util.UUID;

public class LockServiceImpl implements LockService{

    private final JedisPool jedisPool;

    public LockServiceImpl(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 加鎖
     * @param locaName  鎖的key
     * @param acquireTimeout  獲取超時時間
     * @param timeout   鎖的超時時間
     * @return 鎖標識
     */
    public String lockWithTimeout(String locaName, long acquireTimeout, long timeout) {
        Jedis conn = null;
        String retIdentifier = null;
        try {
            // 獲取連線
            conn = jedisPool.getResource();
            // 隨機生成一個value
            String identifier = UUID.randomUUID().toString();
            // 鎖名,即key值
            String lockKey = "lock:" + locaName;
            // 超時時間,上鎖後超過此時間則自動釋放鎖
            int lockExpire = (int)(timeout / 1000);

            // 獲取鎖的超時時間,超過這個時間則放棄獲取鎖
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用於釋放鎖時間確認
                    retIdentifier = identifier;
                    return retIdentifier;
                }
                // 返回-1代表key沒有設定超時時間,為key設定一個超時時間
                if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retIdentifier;
    }

    /**
     * 釋放鎖
     * @param lockName 鎖的key
     * @param identifier    釋放鎖的標識
     * @return
     */
    public boolean releaseLock(String lockName, String identifier) {
        Jedis conn = null;
        String lockKey = "lock:" + lockName;
        boolean retFlag = false;
        try {
            conn = jedisPool.getResource();
            while (true) {
                // 監視lock,準備開始事務
                conn.watch(lockKey);
                // 通過前面返回的value值判斷是不是該鎖,若是該鎖,則刪除,釋放鎖
                if (!StringUtils.isEmpty(conn.get(lockKey)) && identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (JedisException e) {
            //e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retFlag;
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
分散式鎖工廠類
   
/***
 * 分散式鎖生產工廠
 */
public class OrderFactory {
    private static Integer i=0;
    public  static String GetOrder(){
        //JDK 鎖
//        synchronized (i) {
//            i++;
//            return "NewOrder"+new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"+i).format(new Date());
//        }
        //分散式鎖
        i++;
        String ss= "NewOrder"+new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"+i).format(new Date());
        return ss;
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
LockOrderController測試
import com.al.common.OrderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import redis.clients.jedis.JedisPool;

@Controller
public class LockOrderController {

    @Autowired
    private JedisPool jedisPool;

    //建立訂單
    @RequestMapping("/createOrderNum")
    @ResponseBody
    public String createOrderNum() {

        LockServiceImpl lock = new LockServiceImpl(jedisPool);
        // 返回鎖的value值,供釋放鎖時候進行判斷
        String indentifier = lock.lockWithTimeout("resource", 5000, 100);//設定鎖,加時間
        String orderNum = OrderFactory.GetOrder();//開始生產訂單id
        System.out.println(Thread.currentThread().getName() + "建立了訂單號:【" + orderNum+ "】!");
        lock.releaseLock("resource", indentifier);//生產完畢釋放當前鎖
        return orderNum;

    }
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
jmeter 模擬測試

需要原始碼的聯絡博主,或者留言私發…

以上總結全部是個人和網上經驗總結,如有雷同,請諒解,歡迎大家研討技術,點關注,後續繼續更新…
--------------------- 
作者:努力的小_Time_小艾 
來源:CSDN 
原文:https://blog.csdn.net/qq_36801998/article/details/85001472 
版權宣告:本文為博主原創文章,轉載請附上博文連結!