樂觀鎖-無鎖併發
阿新 • • 發佈:2021-01-28
文章目錄
無鎖
問題提出
一個賬戶內有10000,1000個執行緒每個執行緒減去10,最後正確的結果應該是0。
public interface Account {
Integer getAmount();
void withdraw(Integer amount);
public static void demo(Account account) {
ArrayList<Thread> threadList = new ArrayList<>();
long start = System.nanoTime();
for(int i = 0;i < 1000; i++){
Thread t = new Thread(()->{
account.withdraw(10);
});
threadList.add(t);
}
threadList. forEach(Thread ::start); //遍歷1000個執行緒,並呼叫start方法啟動
threadList.forEach(t->{
try {
t.join(); //保證1000個執行緒在主執行緒之前執行完畢以確保執行時間的正確
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime ();
System.out.println(account.getAmount()+"cost:"+TimeUnit.NANOSECONDS.toMillis(end-start)+"ms");
}
}
為什麼不安全
class SafeAccount implements Account{
private Integer amount;
public SafeAccount(Integer amount) {
this.amount = amount;
}
@Override
public Integer getAmount() {
return amount;
}
@Override
public void withdraw(Integer amount) {
this.amount -= amount;
}
public static void main(String[] args) {
Account account = new SafeAccount(10000);
Account.demo(account);
}
}
最後結果並不是0,發生錯誤的原因是指令交錯的原因造成的,比如執行緒一獲得amout開始-10操作時另一個執行緒也開始同樣的操作,使得結果發生錯誤。
解決思路-鎖
使用synchronized
關鍵字保護共享變數amount
,當一個執行緒完成了-10操作並且釋放了鎖其他執行緒才能執行,讓執行緒之間序列執行。這樣不會造成指令交錯,確保了操作的原子性。
public synchronized void withdraw(Integer amount) {
this.amount -= amount;
}
解決思路-無鎖
class UnlockAccount implements Account{
private AtomicInteger balance; //使用原子整數
publicnlockAccount(Integer amount) {
balance = new AtomicInteger(amount);
}
@Override
public Integer getAmount() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true){
int prev = balance.get(); //獲取舊值
int next = except - amount; //操作後的新值
//將舊值prev與此時的最新值比較如果相等則將最新設定為next返回true並結束迴圈,如果不相等就返回false繼續嘗試此操作直到成功。
if (balance.compareAndSet(prev,next))
break;
}
}
public static void main(String[] args) {
Account account = new unlockAccount(10000);
Account.demo(account);
}
}
無鎖方式可以允許指令的交錯,但是當發生指令交錯後會讓這次操作失效,繼續嘗試直到成功。
CAS與volatile
CAS
使用原子整數在不加鎖的情況下可以實現對共享變數安全的操作。
以下是AtomicInteger
的部分原始碼
public class AtomicInteger extends Number implements java.io.Serializable {
public AtomicInteger(int initialValue) {
value = initialValue; //給value賦初始值
}
//獲取Unsafe物件,提供了非常底層的,操作執行緒,記憶體的方法。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
//獲得value屬性在AtomicInteger中的偏移量,用來定位value的地址,直接操作它。
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
//將value設定為volatile確保執行緒間的可見性
private volatile int value;
public final boolean compareAndSet(int expect, int update) {
/*通過vaueOffet定位到value並且比較和舊值expect是否相等。
如果相等將value值設定為update
如果不等表示這次操作失敗,返回false。
*/
return unsafe.compareAndSwapInt(this, valueOffset, expect, update); //這個操作是原子性的,不會被其他執行緒干擾
}
}
其中關鍵的就是compareAndSet
,它的簡稱就是CAS,它屬於原子操作。CAS的底層是lock cmpxchg
指令(X86架構),在單核和多核CPU下都能保證【比較-交換】的原子性。
volatile
獲取共享變數時為了保證變數的可見性需要使用volatile
修飾,保證一個執行緒對變數的修改另一個執行緒可見。CAS操作必須藉助volatile才能讀取共享變數的最新值保證【比較-交換】的有效性。
為什麼無鎖效率高
- 當cas操作不成功時使用while(true)迴圈不斷嘗試,該執行緒一直處於running狀態不會停歇。而synchronized會讓沒獲得鎖的執行緒陷入等待,發生上下文切換。
- 無鎖情況下要保持執行緒不斷的執行需要多個cpu的支援,如果cpu數小於執行緒數,雖然不會陷入阻塞,但由於沒有分到時間片,仍然會進入Runnable狀態,還是會導致上下文切換。所以在多核cpu下才由效果(執行緒數<cpu數)。
樂觀鎖和悲觀鎖
- CAS基於樂觀鎖的思想:最客觀的估計,在當前執行緒執行時不怕別的執行緒來修改共享變數,即使修該了也沒關係,可以再繼續重試
synchronized
基於悲觀鎖的思想:最悲觀的估計,在我執行時就上鎖不給其他執行緒修改共享變數的機會,只有我執行完解鎖,其他執行緒才可以修改共享變數。- CAS體現的是無鎖併發,無阻塞併發
- 沒有使用
synchronized
,執行緒不會陷入阻塞,這時效率提升的因素之一。 - 但是如果競爭激烈,重試頻繁發生,反而影響效率。
- 沒有使用
原子類
1、原子引用類
AtomicReference
AtomicMarkableReference
給共享變數一個標記,記錄是否被修改過AtomicStampedReference
給共享變數附加一個版本號,記錄被修改過的次數
2、ABA
多個執行緒將共享變數的值由A–>B,再由B–>A。其他執行緒在操作時不會知道共享變數是否被修改過。
static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
String prev = ref.get();
other();
Thread.sleep(2000);
log.debug("change A->B{}",ref.compareAndSet(prev,"B"));
}
public static void other(){
new Thread(()->{
log.debug("change A->B {}",ref.compareAndSet(ref.get(),"B"));
},"t1").start();
new Thread(()->{
log.debug("change B->A {}",ref.compareAndSet(ref.get(),"A"));
},"t2").start();
}
如果主執行緒希望只要其他執行緒修改了共享變數,那麼就算自己CAS失敗,這樣僅比較值是不夠的,需要再加一個版本號。
AtomicStampedReference
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
String prev = ref.getReference();
int stamp = ref.getStamp();
log.debug("start:{},stamp:{}",prev,stamp);
other();
Thread.sleep(2000);
log.debug("change A->B{}",ref.compareAndSet(prev,"B",stamp,ref.getStamp()+1));
}
public static void other(){
new Thread(()->{
log.debug("change A->B {},stamp:{}",ref.compareAndSet(ref.getReference(),"B",ref.getStamp(),ref.getStamp()+1),ref.getStamp());
},"t1").start();
new Thread(()->{
log.debug("change B->A {},stamp:{}",ref.compareAndSet(ref.getReference(),"A",ref.getStamp(),ref.getStamp()+1),ref.getStamp());
},"t2").start();
}
如果只關心是否被修改過,並不關係修改過幾次,可以使用
AtomicMarkableReference
static AtomicMarkableReference<String> ref = new AtomicMarkableReference<>("A",false); //初始值設為false,即沒有被修改過。
public static void main(String[] args) throws InterruptedException {
String prev = ref.getReference();
other();
Thread.sleep(2000);
log.debug("change A->B {}",ref.compareAndSet(prev,"B",false,true));
}
public static void other(){
new Thread(()->{
log.debug("change A->B {}",ref.compareAndSet(ref.getReference(),"B",false,true));
},"t1").start();
new Thread(()->{
log.debug("change B->A {}",ref.compareAndSet(ref.getReference(),"A",true,true));
},"t2").start();
}
3、原子陣列
/*10個執行緒對長度為10的陣列的每個元素分別進行1000次累加操作,執行緒安全的情況下陣列每個元素值是1000
引數1,提供陣列。原子陣列或普通陣列
引數2,獲得陣列長度的方法
引數3,自增方法,回傳array,index
引數4,列印陣列的方法
*/
public class SafeArray {
public static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lenFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer
) {
ArrayList<Thread> threadList = new ArrayList<>();
T array = arraySupplier.get();
int len = lenFun.apply(array);
for(int i = 0; i < len; i++){
threadList.add(new Thread(()->{
for(int j = 0; j < 1000; j++) {
putConsumer.accept(array,j%len);
}
}));
}
threadList.forEach(Thread::start);//啟動所有的執行緒
threadList.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
}
public static void main(String[] args) {
SafeArray.demo(
()-> new AtomicIntegerArray(10),
(array)->array.length(),
(array,index)->array.getAndIncrement(index), //累加操作
(array) -> System.out.println(array)
);
}
Unsafe
Unsafe物件提供了非常底層的,操作記憶體,執行緒的方法,不能直接呼叫,只能通過反射獲取。
自定義一個原子類:
class AtomicData{
//使用volatile來保證可見性
private volatile Integer data;
private static Unsafe unsafe;
//data屬性的偏移量,Unsafe可以通過偏移地址直接操作data
final static long OFFSET;
public AtomicData(Integer data) {
this.data = data;
}
static {
unsafe = UnsafeAccessor.getUnsafe();
try {
OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public int getAndUpdate(ResultOperator resultOperator){
int prev,update;
do{
prev = get();
update = resultOperator.applyAsInt(prev);
System.out.println(update);
}while (!(unsafe.compareAndSwapInt(this,OFFSET,prev,update)));
return prev;
}
public final int get(){
return data;
}
}