hbase與redis的區別
Java中的幾種鎖解決方案
樂觀鎖與悲觀鎖
樂觀鎖
樂觀鎖呢,它是假設一個執行緒在取資料的時候不會被其他執行緒更改資料,但是在更新資料的時候會校驗資料有沒有被修改過。它是一種比較交換的機制,簡稱CAS(Compare And Swap)機制。一旦檢測到有衝突產生,比如說資料版本號或者最後更新時間不一致,它就會進行重試,直到沒有衝突為止。
在JAVA中樂觀鎖並沒有確定的方法,或者關鍵字,它只是一個處理的流程、策略。
下面以修改資料庫資料為例,使用樂觀鎖:
- 1、在檢索資料,將資料的版本號(version)或者最後更新時間一併檢索出來;
- 2、操作員更改資料以後,點選儲存,在資料庫執行update操作;
- 3、執行update操作時,用步驟1檢索出的版本號或者最後更新時間與資料庫中的記錄作比較;
- 4、如果版本號或最後更新時間一致,則可以更新;
- 5、如果不一致,就不執行更新;
悲觀鎖
悲觀鎖與樂觀鎖恰恰相反,悲觀鎖從讀取資料的時候就顯示的加鎖,直到資料更新完成,釋放鎖為止。在這期間只能有一個執行緒去操作,其他的執行緒只能等待。在JAVA中,悲觀鎖可以使用 synchronized
ReentrantLock
類來實現。
synchronized 關鍵字
synchronized 鎖有兩種使用方式, 一種是加在方法上,它表示這個方法是加了鎖的,當多個執行緒同時呼叫這個方法時,只有獲得鎖的執行緒才可以執行:
public synchronized void method(){
// 方法體。。。
}
另一種是加在程式碼塊上:
synchronized (物件鎖){
……
}
我們將需要加鎖的語句都寫在 synchronized 塊內,而在物件鎖的位置,需要填寫加鎖的物件,它的含義是,當多個執行緒併發執行時,只有獲得你寫的這個物件的鎖,才能執行後面的語句,其他的執行緒只能等待。
synchronized塊通常的寫法是synchronized(this)
,這個this是當前類的例項,也就是說獲得當前這個類的物件的鎖,才能執行這個方法,這樣寫的效果和synchronized方法是一樣的。
ReentrantLock 類
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test{
private Lock lock = new ReentrantLock();
public void method(){
lock.lock();
// 業務程式碼
lock.unlock();
}
}
使用lock.lock()
進行加鎖,使用lock.unlock()
進行解鎖,加鎖後只會有一個執行緒能夠執行加鎖後的業務程式碼。
公平鎖與非公平鎖
公平鎖與非公平鎖是從另一個維度對鎖的理解。
公平鎖
公平鎖故名思意,在多執行緒的情況下,對待每一個執行緒都是公平的。多個執行緒同時執行方法,執行緒A搶到了鎖,A可以執行方法。其他執行緒則在佇列裡進行排隊,A執行完方法後,會從佇列裡取出下一個執行緒B,再去執行方法。以此類推,對於每一個執行緒來說都是公平的,不會存在後加入的執行緒先執行的情況。
非公平鎖
非公平鎖與之相反,多個執行緒同時執行方法,執行緒A搶到了鎖,A可以執行方法。其他的執行緒並沒有排隊,A執行完方法,釋放鎖後,其他的執行緒誰搶到了鎖,誰去執行方法。會存在後加入的執行緒,反而先搶到鎖的情況。
ReentrantLock 類的實現
公平鎖與非公平鎖都在ReentrantLock類裡給出了實現,我們看一下ReentrantLock的原始碼。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock有兩個構造方法,預設的構造方法中,sync = new NonfairSync();我們可以從字面意思看出它是一個非公平鎖。再看看第二個構造方法,它需要傳入一個引數,引數是一個布林型,true是公平鎖,false是非公平鎖。
分散式鎖
尚文提到的鎖,都是有JDK官方提供的鎖的解決方案,也就是說這些鎖只能在一個JVM程序內有效,我們把這種鎖叫做單體應用鎖。然而在我們的分散式系統中,這種單體應用鎖就無法滿足我們需求了,因為分散式系統是由多應用組成,會有多個JVM程序,單應用鎖無法跨JVM、跨程序。那麼分散式鎖的定義就出來了,分散式鎖就是可以跨越多個JVM、跨越多個程序的鎖,這種鎖就叫做分散式鎖。
目前存在的分散式鎖的方案
通過第三方的元件實現跨JVM、跨程序的分散式鎖。這就是分散式鎖的解決思路,找到所有JVM可以共同訪問的第三方元件,通過第三方元件實現分散式鎖。
目前比較流行的分散式鎖的解決方案有:
- 資料庫,通過資料庫可以實現分散式鎖,但是在高併發的情況下對資料庫壓力較大,所以很少使用;
- Redis,藉助Redis也可以實現分散式鎖,而且Redis的Java客戶端種類很多,使用的方法也不盡相同;
- Zookeeper,Zookeeper也可以實現分散式鎖,同樣Zookeeper也存在多個Java客戶端,使用方法也不相同。
1、基於資料庫悲觀鎖的分散式鎖
通過for update語句實現鎖:
select .... for update
for update是在資料庫中上鎖用的,可以為資料庫中的行上一個排它鎖。當一個事務的操作未完成時候,其他事務可以讀取但是不能寫入或更新。
2、基於Redis的Setnx實現分散式鎖
獲取鎖的redis命令
SET resource_name my_random_value NX PX 30000
resource_name(key):資源名稱,可根據不同的業務區分不同的鎖。
my_random_value(value):隨機值,每個執行緒的隨機值都不相同,用於釋放鎖時的校驗。
NX:表示 key不存在時設定成功,key存在則設定不成功。是一個原子性的操作,多執行緒併發時只有一個執行緒可以設定成功。設定成功才可以執行業務程式碼,不成功就不給執行。
PX:自動失效時間,即使出現異常情況,鎖也可以通過過期來失效。
釋放鎖
釋放鎖時要校驗之前設定的隨機值(my_random_value),相同才能釋放。因為需要校驗,所以普通的redis的delete key命令不能滿足,我們需要使用LUA指令碼:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
程式碼實現
封裝RedisLock
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
public class RedisLock implements AutoCloseable {
private RedisTemplate redisTemplate;
private String key;
private String value;
//單位:秒
private int expireTime;
public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
this.redisTemplate = redisTemplate;
this.key = key;
this.expireTime=expireTime;
this.value = UUID.randomUUID().toString();
}
/**
* 獲取分散式鎖
* @return
*/
public boolean getLock(){
RedisCallback<Boolean> redisCallback = connection -> {
//設定NX
RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
//設定過期時間
Expiration expiration = Expiration.seconds(expireTime);
//序列化key
byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
//序列化value
byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
//執行setnx操作
Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
return result;
};
//獲取分散式鎖
Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
return lock;
}
public boolean unLock() {
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
List<String> keys = Arrays.asList(key);
Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
return result;
}
@Override
public void close() throws Exception {
unLock();
}
}
使用RedisLock
import com.distribute.lock.lock.RedisLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class RedisLockController {
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("redisLock")
public String redisLock(){
log.info("我進入了方法!");
try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
if (redisLock.getLock()) {
log.info("我進入了鎖!!");
Thread.sleep(15000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法執行完成");
return "方法執行完成";
}
}
3、基於Zookeeper的分散式鎖
原理
利用了zookeeper的瞬時有序節點的特性,多執行緒併發建立順勢節點時,得到有序的序列,我們規定序號最小的執行緒獲得鎖。其他執行緒則監聽自己序號的前一個序號,前一個執行緒執行完成,刪除自己的序號的節點,下一個節點的執行緒得到通知,繼續執行,以此類推。
程式碼實現
封裝鎖
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@Slf4j
public class ZkLock implements Watcher,AutoCloseable {
private ZooKeeper zooKeeper;
private String businessName;
private String znode;
public ZkLock(String connectString,String businessName) throws IOException {
this.zooKeeper = new ZooKeeper(connectString,30000,this);
this.businessName = businessName;
}
public boolean getLock() throws KeeperException, InterruptedException {
//建立業務 根節點
Stat existsNode = zooKeeper.exists("/" + businessName, false);
if (existsNode == null){
zooKeeper.create("/" + businessName,businessName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
//建立瞬時有序節點 /order/order_00000001
znode = zooKeeper.create("/" + businessName + "/" + businessName + "_", businessName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
znode = znode.substring(znode.lastIndexOf("/")+1);
//獲取業務節點下 所有的子節點
List<String> childrenNodes = zooKeeper.getChildren("/" + businessName, false);
//子節點排序
Collections.sort(childrenNodes);
//獲取序號最小的(第一個)子節點
String firstNode = childrenNodes.get(0);
//如果建立的節點是第一個子節點,則獲得鎖
//不是第一個子節點,則監聽前一個節點
if (!firstNode.equals(znode)){
String lastNode = firstNode;
for (String node:childrenNodes){
if (!znode.equals(node)){
lastNode = node;
}else {
zooKeeper.exists("/"+businessName+"/"+lastNode,true);
break;
}
}
synchronized (this){
wait();
}
}
return true;
}
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
@Override
public void close() throws Exception {
zooKeeper.delete("/"+businessName+"/"+znode,-1);
zooKeeper.close();
log.info("我釋放了鎖");
}
}
呼叫
import com.example.distributelock.lock.ZkLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ZkLockController {
@RequestMapping("zkLock")
public String zkLock(){
log.info("我進入了方法!");
try (ZkLock zkLock = new ZkLock("localhost:2181","order")){
if (zkLock.getLock()) {
log.info("我進入了鎖!!");
Thread.sleep(15000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法執行完成");
return "方法執行完成";
}
}
Redisson 實現分散式鎖
Redisson是一個具有記憶體資料網格功能的Redis Java客戶端,功能很多很強大,這裡只介紹下它的分散式鎖功能使用,在spring boot專案下。
我們日常開發中,推薦使用Redisson實現分散式鎖。
https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter
pom
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.2</version>
</dependency>
yml 連線redis
spring:
redis:
database: 0
host: 192.168.10.125
port: 6379
password: 123456
使用分散式鎖
@Autowired
private RedissonClient redisson;
public void redissonLock() {
// 設定鎖的業務分類
RLock rLock = redisson.getLock("xxx");
// 設定過期時間
rLock.lock(30, TimeUnit.SECONDS);
// 業務程式碼
rLock.unlock();
}