1. 程式人生 > >有助於提高"鎖"效能的幾點建議

有助於提高"鎖"效能的幾點建議

from:https://my.oschina.net/u/3703858/blog/1791973?p=1

最近閱讀《java高併發程式設計一書》大概總結幾條,也是書中的內容

1.減小鎖持有的時間

比如100個人去銀行辦理業務,要填一百張表,但是隻有一支筆,那麼很顯然,每個人用筆的時間越短,效率也就月高:看程式碼:

  1. /*

  2. othercode1和othercode2很耗時間,裡面沒有涉及資源同步,只有mutexMethod方法要對資源同步,

  3. 所有優化程式碼讓持有鎖時間儘量短

  4. */

  5. public synchronized void syncMethod(){

  6. othercode1();

  7. mutexMethod();

  8. othercode2();

  9. }

  10. public void syncMethod(){

  11. othercode1();

  12. synchronized(this){

  13. mutexMethod();

  14. }

  15. othercode2();

  16. }

  17. //在jdk原始碼裡面也很容易找到這種手段,比如處理正則表示式的Pattern類

  18. public Matcher matcher(CharSequence input) {

  19. if (!compiled) {

  20. synchronized(this) {

  21. if (!compiled)

  22. compile();

  23. }

  24. }

  25. Matcher m = new Matcher(this, input);

  26. return m;

  27. }

  28. //只有在表示式未編譯的時候進行區域性加鎖,這種方法大大提高了matcher的執行效率和可靠性

注意:減少鎖的持有時間有助於降低鎖衝突的可能性,進而提升系統的併發能力

2.減小鎖的力度

concurrentHashMap的實現,他的內部被分為了若干個曉得hashmap,稱之為段(SEGMENT),預設是16段

減小鎖粒度會引入一個新的問題,當需要獲取全域性鎖的時候,其消耗的資源會較多,不如concurrenthashMap的size()方法.可以看到計算size的時候需要計算全部有效的段的鎖

  1. public int size() {

  2. long n = sumCount();

  3. return ((n < 0L) ? 0 :

  4. (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

  5. (int)n);

  6. }

  7. final long sumCount() {

  8. CounterCell[] as = counterCells; CounterCell a;

  9. long sum = baseCount;

  10. if (as != null) {

  11. for (int i = 0; i < as.length; ++i) {

  12. if ((a = as[i]) != null)

  13. sum += a.value;

  14. }

  15. }

  16. return sum;

  17. }

事實上計算size的時候會先使用無鎖的方式計算,如果失敗會採用這個方法,但是在高併發的場合concurrenthashmap的size依然要差於同步的hashmap.因此在類似於size獲取全域性資訊方法呼叫不頻繁的情況下,這種減小粒度的的方法才是真正意義上的提高系統併發量

注意:所謂減小鎖粒度,就是指縮小鎖定物件的範圍,從而減小鎖衝突的可能性,進而提高系統性能

3.讀寫分離來替換獨佔鎖

在讀多寫少的情況下,使用讀寫鎖可以有效的提高系統性能 ReadWriteLock可以提高系統性能

  1. package com.high.concurrency;

  2. import java.util.Random;

  3. import java.util.concurrent.locks.Lock;

  4. import java.util.concurrent.locks.ReentrantLock;

  5. import java.util.concurrent.locks.ReentrantReadWriteLock;

  6. /**

  7. * @author: zhangzeli

  8. * @date 8:43 2018/4/10

  9. * <P></P>

  10. */

  11. public class ReadWriteLockDemo {

  12. private static Lock lock = new ReentrantLock();

  13. private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

  14. private static Lock readLock =readWriteLock.readLock();

  15. private static Lock writeLock = readWriteLock.writeLock();

  16. private int value;

  17. public Object handleRead(Lock lock) throws InterruptedException{

  18. try {

  19. lock.lock();

  20. Thread.sleep(1000);

  21. return value;

  22. }finally {

  23. lock.unlock();

  24. }

  25. }

  26. public void handleWrite(Lock lock,int index) throws InterruptedException{

  27. try {

  28. lock.lock();

  29. Thread.sleep(1000);

  30. value =index;

  31. }finally {

  32. lock.unlock();

  33. }

  34. }

  35. public static void main(String[] args) {

  36. final ReadWriteLockDemo demo = new ReadWriteLockDemo();

  37. Runnable readRunnale = new Runnable() {

  38. @Override

  39. public void run() {

  40. try {

  41. demo.handleRead(lock);

  42. //demo.handleRead(readLock);

  43. } catch (InterruptedException e) {

  44. e.printStackTrace();

  45. }

  46. }

  47. };

  48. Runnable write = new Runnable() {

  49. @Override

  50. public void run() {

  51. try {

  52. //demo.handleWrite(writeLock,new Random().nextInt());

  53. demo.handleWrite(lock,new Random().nextInt());

  54. } catch (InterruptedException e) {

  55. e.printStackTrace();

  56. }

  57. }

  58. };

  59. for(int i=0;i<18;i++){

  60. new Thread(readRunnale).start();

  61. }

  62. for(int i=18;i<20;i++){

  63. new Thread(write).start();

  64. }

  65. }

  66. }

差異很明顯.

3.鎖分離

    已LinkedBlockingQueue為例,take函式和put函式分別實現了衝佇列取和往佇列加資料,雖然兩個方法都對佇列進項了修改,但是LinkedBlockingQueue是基於連結串列的所以一個操作的是頭,一個是佇列尾端,從理論情況下將並不衝突

    如果使用獨佔鎖則take和put就不能完成真正的併發,所以jdk並沒有才用這種方式取而代之的是兩把不同的鎖分離了put和take的操作,下面看原始碼

  1. /** Lock held by take, poll, etc */

  2. private final ReentrantLock takeLock = new ReentrantLock();//take函式需要持有takeLock

  3. /** Wait queue for waiting takes */

  4. private final Condition notEmpty = takeLock.newCondition();

  5. /** Lock held by put, offer, etc */

  6. private final ReentrantLock putLock = new ReentrantLock();//put函式需要持有putLock

  7. /** Wait queue for waiting puts */

  8. private final Condition notFull = putLock.newCondition();

  1. public E take() throws InterruptedException {

  2. E x;

  3. int c = -1;

  4. final AtomicInteger count = this.count;

  5. final ReentrantLock takeLock = this.takeLock;

  6. takeLock.lockInterruptibly(); //不能有兩個執行緒同時取資料

  7. try {

  8. while (count.get() == 0) { //如果當前沒有可用資料,一直等待

  9. notEmpty.await(); //等待,put操作的通知

  10. }

  11. x = dequeue(); //取得第一個資料

  12. c = count.getAndDecrement();//數量減一,原子操作因為回合put同時訪問count.注意變數c是count減一

  13. if (c > 1)

  14. notEmpty.signal(); //通知其他take操作

  15. } finally {

  16. takeLock.unlock(); //釋放鎖

  17. }

  18. if (c == capacity)

  19. signalNotFull(); //通知put,已有空餘空間

  20. return x;

  21. }

  22. public void put(E e) throws InterruptedException {

  23. if (e == null) throw new NullPointerException();

  24. // Note: convention in all put/take/etc is to preset local var

  25. // holding count negative to indicate failure unless set.

  26. int c = -1;

  27. Node<E> node = new Node<E>(e);

  28. final ReentrantLock putLock = this.putLock;

  29. final AtomicInteger count = this.count;

  30. putLock.lockInterruptibly();//不能有兩個執行緒同時進行put

  31. try {

  32. /*

  33. * Note that count is used in wait guard even though it is

  34. * not protected by lock. This works because count can

  35. * only decrease at this point (all other puts are shut

  36. * out by lock), and we (or some other waiting put) are

  37. * signalled if it ever changes from capacity. Similarly

  38. * for all other uses of count in other wait guards.

  39. */

  40. while (count.get() == capacity) { //如果佇列已滿

  41. notFull.await(); //等待

  42. }

  43. enqueue(node); //插入資料

  44. c = count.getAndIncrement(); //更新總數,變數c是count加1前的值

  45. if (c + 1 < capacity)

  46. notFull.signal(); //有足夠的空間,通知其他執行緒

  47. } finally {

  48. putLock.unlock(); //釋放鎖

  49. }

  50. if (c == 0)

  51. signalNotEmpty(); //插入成功後,通知take操作

  52. }

4.鎖粗化

虛擬機器在遇到一連串地對同一鎖不斷進行請求和釋放的操作時,便會把所有的鎖操作整合成對鎖的一次請求,從而減小對鎖的請求同步次數,這個操作叫鎖粗話,比如

  1. for (int i=0;i<20;i++){

  2. synchronized (lock){

  3. }

  4. }

  5. //優化後

  6. synchronized (lock){

  7. for (int i=0;i<20;i++){

  8. }

  9. }

注意:效能優化就是根據執行時的真實情況對各個資源點進行權衡折中的過程,鎖粗話的思想和減少鎖持有時間是相反的,但是在不同的場合,他們的效果並不相同,所以大家要根據實際情況,進行權衡

5.java虛擬機器對鎖優化所做的努力

5.1鎖偏向

偏向鎖,簡單的講,就是在鎖物件的物件頭中有個ThreaddId欄位,這個欄位如果是空的,第一次獲取鎖的時候,就將自身的ThreadId寫入到鎖的ThreadId欄位內,將鎖頭內的是否偏向鎖的狀態位置1.這樣下次獲取鎖的時候,直接檢查ThreadId是否和自身執行緒Id一致,如果一致,則認為當前執行緒已經獲取了鎖,因此不需再次獲取鎖,略過了輕量級鎖和重量級鎖的加鎖階段。提高了效率。

但是偏向鎖也有一個問題,就是當鎖有競爭關係的時候,需要解除偏向鎖,使鎖進入競爭的狀態

引數-XX:+UseBiasedLocking

Java偏向鎖(Biased Locking)是Java6引入的一項多執行緒優化。它通過消除資源無競爭情況下的同步原語,
進一步提高了程式的執行效能。偏向鎖,顧名思義,它會偏向於第一個訪問鎖的執行緒,如果在接下來的執行過程中,
該鎖沒有被其他的執行緒訪問,則持有偏向鎖的執行緒將永遠不需要觸發同步。如果在執行過程中,遇到了其他執行緒搶佔鎖,
則持有偏向鎖的執行緒會被掛起,JVM會嘗試消除它身上的偏向鎖,將鎖恢復到標準的輕量級鎖。(偏向鎖只能在單執行緒下起作用)
因此 流程是這樣的 偏向鎖->輕量級鎖->重量級鎖

5.2輕量級鎖

輕量級鎖加鎖:執行緒在執行同步塊之前,JVM會先在當前執行緒的棧楨中建立用於儲存鎖記錄的空間,並將物件頭中的Mark Word複製到鎖記錄中,官方稱為Displaced Mark Word。

然後執行緒嘗試使用CAS將物件頭中的Mark Word替換為指向鎖記錄的指標。如果成功,當前執行緒獲得鎖,如果失敗,表示其他執行緒競爭鎖,當前執行緒便嘗試使用自旋來獲取鎖。

輕量級鎖解鎖:輕量級解鎖時,會使用原子的CAS操作來將Displaced Mark Word替換回到物件頭,如果成功,則表示沒有競爭發生。

如果失敗,表示當前鎖存在競爭,鎖就會膨脹成重量級鎖。

注:輕量級鎖會一直保持,喚醒總是發生在輕量級鎖解鎖的時候,因為加鎖的時候已經成功CAS操作;而CAS失敗的執行緒,會立即鎖膨脹,並阻塞等待喚醒。(詳見下圖)

下圖是兩個執行緒同時爭奪鎖,導致鎖膨脹的流程圖。

鎖不會降級

自旋其實就是虛擬機器為了避免執行緒真實的在作業系統層掛起,虛擬機器讓當前執行緒做空輪詢或許是幾個cpu時間週期,如果還沒辦法獲取鎖則在掛起.

因為自旋會消耗CPU,為了避免無用的自旋(比如獲得鎖的執行緒被阻塞住了),一旦鎖升級成重量級鎖,
就不會再恢復到輕量級鎖狀態。
當鎖處於這個狀態下,其他執行緒試圖獲取鎖時,都會被阻塞住,當持有鎖的執行緒釋放鎖之後會喚醒這些執行緒,
被喚醒的執行緒就會進行新一輪的奪鎖之爭。

5.3鎖消除

鎖消除是Java虛擬機器在JIT編譯是,通過對執行上下文的掃描,去除不可能存在共享資源競爭的鎖,通過鎖消除,可以節省毫無意義的請求鎖時間

  1. public class TestLockEliminate {

  2. public static String getString(String s1, String s2) {

  3. StringBuffer sb = new StringBuffer();

  4. sb.append(s1);

  5. sb.append(s2);

  6. return sb.toString();

  7. }

  8. public static void main(String[] args) {

  9. long tsStart = System.currentTimeMillis();

  10. for (int i = 0; i < 1000000; i++) {

  11. getString("TestLockEliminate ", "Suffix");

  12. }

  13. System.out.println("一共耗費:" + (System.currentTimeMillis() - tsStart) + " ms");

  14. }

  15. }

getString()方法中的StringBuffer數以函式內部的區域性變數,進作用於方法內部,不可能逃逸出該方法,因此他就不可能被多個執行緒同時訪問,也就沒有資源的競爭,但是StringBuffer的append操作卻需要執行同步操作:

  1. @Override

  2. public synchronized StringBuffer append(String str) {

  3. toStringCache = null;

  4. super.append(str);

  5. return this;

  6. }

逃逸分析和鎖消除分別可以使用引數-XX:+DoEscapeAnalysis和-XX:+EliminateLocks(鎖消除必須在-server模式下)開啟。使用如下引數執行上面的程式:

這裡寫圖片描述

使用如下命令執行程式:-XX:+DoEscapeAnalysis -XX:+EliminateLocks

這裡寫圖片描述

鎖的優缺點對比 

優點

缺點

適用場景

偏向鎖

加鎖和解鎖不需要額外的消耗,和執行非同步方法比僅存在納秒級的差距。

如果執行緒間存在鎖競爭,會帶來額外的鎖撤銷的消耗。

適用於只有一個執行緒訪問同步塊場景。

輕量級鎖

競爭的執行緒不會阻塞,提高了程式的響應速度。

如果始終得不到鎖競爭的執行緒使用自旋會消耗CPU。

追求響應時間。

同步塊執行速度非常快。

重量級鎖

執行緒競爭不使用自旋,不會消耗CPU。

執行緒阻塞,響應時間緩慢。

追求吞吐量。

同步塊執行速度較長。