1. 程式人生 > >如何提高鎖的性能

如何提高鎖的性能

讀寫分離 temp long incr 只有一個 進入 tel 資源競爭 deque

1.減小鎖持有的時間

比如100個人去銀行辦理業務,要填一百張表,但是只有一支筆,那麽很顯然,每個人用筆的時間越短,效率也就月高:看代碼:

/*
othercode1和othercode2很耗時間,裏面沒有涉及資源同步,只有mutexMethod方法要對資源同步,
所有優化代碼讓持有鎖時間盡量短
*/

public synchronized void syncMethod(){
        othercode1();
        mutexMethod();
        othercode2();
}

public  void syncMethod(){
        othercode1();
        
synchronized(this){ mutexMethod(); } othercode2(); } //在jdk源碼裏面也很容易找到這種手段,比如處理正則表達式的Pattern類 public Matcher matcher(CharSequence input) { if (!compiled) { synchronized(this) { if (!compiled) compile(); } } Matcher m
= new Matcher(this, input); return m; } //只有在表達式未編譯的時候進行局部加鎖,這種方法大大提高了matcher的執行效率和可靠性

註意:減少鎖的持有時間有助於降低鎖沖突的可能性,進而提升系統的並發能力

2.減小鎖的力度

concurrentHashMap的實現,他的內部被分為了若幹個曉得hashmap,稱之為段(SEGMENT),默認是16段

減小鎖粒度會引入一個新的問題,當需要獲取全局鎖的時候,其消耗的資源會較多,不如concurrenthashMap的size()方法.可以看到計算size的時候需要計算全部有效的段的鎖

public
int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }

事實上計算size的時候會先使用無鎖的方式計算,如果失敗會采用這個方法,但是在高並發的場合concurrenthashmap的size依然要差於同步的hashmap.因此在類似於size獲取全局信息方法調用不頻繁的情況下,這種減小粒度的的方法才是真正意義上的提高系統並發量

註意:所謂減小鎖粒度,就是指縮小鎖定對象的範圍,從而減小鎖沖突的可能性,進而提高系統性能

3.讀寫分離來替換獨占鎖

在讀多寫少的情況下,使用讀寫鎖可以有效的提高系統性能 ReadWriteLock可以提高系統性能

package com.high.concurrency;

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author: zhangzeli
 * @date 8:43 2018/4/10
 * <P></P>
 */
public class ReadWriteLockDemo {
    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock =readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

    public Object handleRead(Lock lock) throws InterruptedException{
        try {
            lock.lock();
            Thread.sleep(1000);
            return value;
        }finally {
            lock.unlock();
        }
    }

    public void handleWrite(Lock lock,int index) throws InterruptedException{
        try {
            lock.lock();
            Thread.sleep(1000);
            value =index;
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        final ReadWriteLockDemo demo = new ReadWriteLockDemo();
        Runnable readRunnale = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(lock);
                    //demo.handleRead(readLock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Runnable write = new Runnable() {
            @Override
            public void run() {
                try {
                    //demo.handleWrite(writeLock,new Random().nextInt());
                    demo.handleWrite(lock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for(int i=0;i<18;i++){
            new Thread(readRunnale).start();
        }
        for(int i=18;i<20;i++){
            new Thread(write).start();
        }
    }
}

差異很明顯.

3.鎖分離

已LinkedBlockingQueue為例,take函數和put函數分別實現了沖隊列取和往隊列加數據,雖然兩個方法都對隊列進項了修改,但是LinkedBlockingQueue是基於鏈表的所以一個操作的是頭,一個是隊列尾端,從理論情況下將並不沖突

如果使用獨占鎖則take和put就不能完成真正的並發,所以jdk並沒有才用這種方式取而代之的是兩把不同的鎖分離了put和take的操作,下面看源碼

/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();//take函數需要持有takeLock

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();//put函數需要持有putLock

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); //不能有兩個線程同時取數據
        try {
            while (count.get() == 0) { //如果當前沒有可用數據,一直等待
                notEmpty.await();      //等待,put操作的通知
            }
            x = dequeue();         //取得第一個數據
            c = count.getAndDecrement();//數量減一,原子操作因為回合put同時訪問count.註意變量c是count減一
            if (c > 1)
                notEmpty.signal();  //通知其他take操作
        } finally {
            takeLock.unlock(); //釋放鎖
        }
        if (c == capacity)
            signalNotFull(); //通知put,已有空余空間
        return x;
    }


public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//不能有兩個線程同時進行put
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) { //如果隊列已滿
                notFull.await();   //等待
            }
            enqueue(node);  //插入數據
            c = count.getAndIncrement(); //更新總數,變量c是count加1前的值
            if (c + 1 < capacity)
                notFull.signal();   //有足夠的空間,通知其他線程
        } finally {
            putLock.unlock();  //釋放鎖
        }
        if (c == 0)
            signalNotEmpty();  //插入成功後,通知take操作
    }

4.鎖粗化

虛擬機在遇到一連串地對同一鎖不斷進行請求和釋放的操作時,便會把所有的鎖操作整合成對鎖的一次請求,從而減小對鎖的請求同步次數,這個操作叫鎖粗化,比如

for (int i=0;i<20;i++){
     synchronized (lock){
                
     }
}

//優化後
synchronized (lock){
    for (int i=0;i<20;i++){
     
                
     }
}

註意:性能優化就是根據運行時的真實情況對各個資源點進行權衡折中的過程,鎖粗話的思想和減少鎖持有時間是相反的,但是在不同的場合,他們的效果並不相同,所以大家要根據實際情況,進行權衡

5.java虛擬機對鎖優化所做的努力

5.1鎖偏向

偏向鎖,簡單的講,就是在鎖對象的對象頭中有個ThreaddId字段,這個字段如果是空的,第一次獲取鎖的時候,就將自身的ThreadId寫入到鎖的ThreadId字段內,將鎖頭內的是否偏向鎖的狀態位置1.這樣下次獲取鎖的時候,直接檢查ThreadId是否和自身線程Id一致,如果一致,則認為當前線程已經獲取了鎖,因此不需再次獲取鎖,略過了輕量級鎖和重量級鎖的加鎖階段。提高了效率。

但是偏向鎖也有一個問題,就是當鎖有競爭關系的時候,需要解除偏向鎖,使鎖進入競爭的狀態

參數-XX:+UseBiasedLocking

Java偏向鎖(Biased Locking)是Java6引入的一項多線程優化。它通過消除資源無競爭情況下的同步原語,
進一步提高了程序的運行性能。偏向鎖,顧名思義,它會偏向於第一個訪問鎖的線程,如果在接下來的運行過程中,
該鎖沒有被其他的線程訪問,則持有偏向鎖的線程將永遠不需要觸發同步。如果在運行過程中,遇到了其他線程搶占鎖,
則持有偏向鎖的線程會被掛起,JVM會嘗試消除它身上的偏向鎖,將鎖恢復到標準的輕量級鎖。(偏向鎖只能在單線程下起作用)
因此 流程是這樣的 偏向鎖->輕量級鎖->重量級鎖

5.2輕量級鎖

輕量級鎖加鎖:線程在執行同步塊之前,JVM會先在當前線程的棧楨中創建用於存儲鎖記錄的空間,並將對象頭中的Mark Word復制到鎖記錄中,官方稱為Displaced Mark Word。

然後線程嘗試使用CAS將對象頭中的Mark Word替換為指向鎖記錄的指針。如果成功,當前線程獲得鎖,如果失敗,表示其他線程競爭鎖,當前線程便嘗試使用自旋來獲取鎖。

輕量級鎖解鎖:輕量級解鎖時,會使用原子的CAS操作來將Displaced Mark Word替換回到對象頭,如果成功,則表示沒有競爭發生。

如果失敗,表示當前鎖存在競爭,鎖就會膨脹成重量級鎖。

註:輕量級鎖會一直保持,喚醒總是發生在輕量級鎖解鎖的時候,因為加鎖的時候已經成功CAS操作;而CAS失敗的線程,會立即鎖膨脹,並阻塞等待喚醒。(詳見下圖)

下圖是兩個線程同時爭奪鎖,導致鎖膨脹的流程圖。

技術分享圖片

鎖不會降級

自旋其實就是虛擬機為了避免線程真實的在操作系統層掛起,虛擬機讓當前線程做空輪詢或許是幾個cpu時間周期,如果還沒辦法獲取鎖則在掛起.

因為自旋會消耗CPU,為了避免無用的自旋(比如獲得鎖的線程被阻塞住了),一旦鎖升級成重量級鎖,
就不會再恢復到輕量級鎖狀態。
當鎖處於這個狀態下,其他線程試圖獲取鎖時,都會被阻塞住,當持有鎖的線程釋放鎖之後會喚醒這些線程,
被喚醒的線程就會進行新一輪的奪鎖之爭。

5.3鎖消除

鎖消除是Java虛擬機在JIT編譯是,通過對運行上下文的掃描,去除不可能存在共享資源競爭的鎖,通過鎖消除,可以節省毫無意義的請求鎖時間

public class TestLockEliminate {
    public static String getString(String s1, String s2) {
        StringBuffer sb = new StringBuffer();
        sb.append(s1);
        sb.append(s2);
        return sb.toString();
    }

    public static void main(String[] args) {
        long tsStart = System.currentTimeMillis();
        for (int i = 0; i < 1000000; i++) {
            getString("TestLockEliminate ", "Suffix");
        }
        System.out.println("一共耗費:" + (System.currentTimeMillis() - tsStart) + " ms");
    }
}

getString()方法中的StringBuffer數以函數內部的局部變量,進作用於方法內部,不可能逃逸出該方法,因此他就不可能被多個線程同時訪問,也就沒有資源的競爭,但是StringBuffer的append操作卻需要執行同步操作:

@Override
public synchronized StringBuffer append(String str) {
        toStringCache = null;
        super.append(str);
        return this;
}

逃逸分析和鎖消除分別可以使用參數-XX:+DoEscapeAnalysis和-XX:+EliminateLocks(鎖消除必須在-server模式下)開啟。使用如下參數運行上面的程序:

技術分享圖片

使用如下命令運行程序:-XX:+DoEscapeAnalysis -XX:+EliminateLocks

技術分享圖片

鎖的優缺點對比

優點

缺點

適用場景

偏向鎖

加鎖和解鎖不需要額外的消耗,和執行非同步方法比僅存在納秒級的差距。

如果線程間存在鎖競爭,會帶來額外的鎖撤銷的消耗。

適用於只有一個線程訪問同步塊場景。

輕量級鎖

競爭的線程不會阻塞,提高了程序的響應速度。

如果始終得不到鎖競爭的線程使用自旋會消耗CPU。

追求響應時間。

同步塊執行速度非常快。

重量級鎖

線程競爭不使用自旋,不會消耗CPU。

線程阻塞,響應時間緩慢。

追求吞吐量。

同步塊執行速度較長。

如何提高鎖的性能