Java高併發(五)——Lock優化,提高效能
前邊我們講了,多執行緒的世界,多執行緒的基礎操作,多執行緒協作,多執行緒管理——執行緒池。其中多執行緒為什麼麻煩,就因為執行緒並行操作,對共享資源的爭奪,會出現執行緒安全問題。而我們解決執行緒安全問題的方案是同步(鎖資源,序列使用),序列就會出現效能問題。舉個例子:大家在大道上並行前進的幾列人(多執行緒併發),突然遇到河流,只有一個獨木橋,大家只能一個個過(鎖共享資源,序列使用)。顯而易見,時間更多的消耗在過獨木橋上,這也是效能瓶頸的所在。如何進行優化呢?這篇來看看,好,先看下這篇思維導圖:
可以思考下,我們可以從那幾個方面優化?1,程式碼處理,即如何讓執行緒更高效(安全的前提)的使用共享資源;2,JVM優化,如何通過經驗,技巧更好的利用鎖,提高執行效率;3,如何利用無鎖(樂觀鎖代替悲觀鎖)替代鎖,提高執行效率。而死鎖會導致執行緒相互等待,造成非常嚴重後果。好,下邊我們展開具體分析:
一,程式碼——提高鎖效能:
1,減小鎖持有時間:鎖持有時間越長,相對的鎖競爭程度也就也激烈。方法就是在我們編碼時,只在必要的程式碼段進行同步,沒有執行緒安全的程式碼,則不要進行鎖。這樣有助於降低鎖衝突的可能性,進而提升系統的併發能力。
2,減小鎖粒度:字面意思就是如何鎖更小的資源。最典型的例子就是HashTable和ConcurrentHashMap,前者是鎖住了整個物件,而後者只是鎖住其中的要處理的Segment段(因為真正併發處理的這個Segment)。好比,人吃飯的,還可以看電視(鎖住的是嘴,吃飯的時候不能喝茶,而不是整個人)。這裡看下這個
3,讀寫鎖替換獨佔鎖:在讀多寫少的情況下,使用ReadWriteLock可以大大提高程式執行效率。相對容易理解不再贅述。
4,鎖分離:讀寫鎖分離了讀操作和寫操作,我們在進一步想,將更多的操作進行鎖資源分離,就鎖分離。典型的例子:LinkedBlockingQueue的實現,裡邊的take()和put(),雖然都操作整個佇列進行修改資料,但是分別操作的隊首和隊尾,所以理論上是不衝突的。當然JDK是通過兩個鎖進行處理的,我們這裡看下:
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/**
* put 操作
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**
* take 操作
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
5,鎖粗化:前邊我們說了:減少鎖持有時間、減小鎖粒度。這裡怎麼又有鎖粗化了。前者是從鎖佔有時間和範圍的角度去考慮,這裡我們從鎖的請求、同步、釋放的頻率進行考慮,如果頻率過高也會消耗系統的寶貴資源。典型場景:對於需要鎖的資源在迴圈當中,我們可以直接將鎖粗化到迴圈外層,而不是在內層(避免每次迴圈都申請鎖、釋放鎖的操作)。
二,看下JVM對鎖的優化:
1,偏向鎖:如果第一個執行緒獲得了鎖,則進行入偏向模式,如果接下來沒有其他執行緒獲取,則持有偏向鎖的執行緒將不需要再進行同步。節省了申請所、釋放鎖,大大提高執行效率。如果鎖競爭激烈的話,偏向鎖將失效,還不如不用。可以通過-XX:+UseBiasedLocking進行控制開關。
2,輕量級鎖:將資源物件頭部作為指標,指向持有鎖的執行緒堆疊內部,來判斷一個執行緒是否持有物件鎖,如果執行緒獲得(CAS)輕量級鎖成功,則可以順利進入同步塊繼續執行,否則輕量級鎖失敗,膨脹為重量級鎖。 其提升程式同步效能的依據是:對於絕大部分的鎖,在整個同步週期內都是不存在競爭的(前人經驗資料)。
3,自旋鎖:偏向鎖和輕量級鎖都失敗了,JVM還不會立刻掛起此執行緒,JVM認為很快就可以得到鎖,於是會做若干個空迴圈(自旋)來重新獲取鎖,如果獲取鎖成功,則進入同步塊執行,否則,才會將其掛起。
4,鎖消除:JVM在編譯時,通過對執行上下文的掃描,去除不可能存在共享資源競爭的鎖。我們在利用JDK一些類的時候,自己沒有加鎖,但是內部實現使用鎖,而程式又不會出現併發安全問題,這時JVM就會幫我們進行鎖消除,提高效能。例如:
//1,無鎖程式碼
private String spliceString(String s1,String s2,String s3){
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString()
}
//append的實現,這樣JVM的鎖消除就起作用了
@Override
public synchronized StringBuffer append(String str) {
toStringCache = null;
super.append(str);
return this;
}
三,CAS——樂觀鎖:悲觀鎖VS樂觀鎖,比較好理解,不再贅述。看下JDK通過CAS(Compare and swap)樂觀鎖實現的一些類。JDK的atomic包提供了一些直接使用CAS操作的執行緒安全的類。
1,AtomicInteger,通過CAS實現的一個執行緒安全的Integer,類似的還有AtomicBoolean、AtomicLong等。這裡看下原始碼實現:
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//volatile修飾
private volatile int value;
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
2,AtomicReference:無鎖的物件應用,和AtomicInteger非常相似的。底層原始碼實現也一直不在展開說。
3,AtomicStampedReference:帶有時間戳的物件引用。上邊幾個Atomic類都是通過compare比較修改前和修改後的值是否一樣,來保證資料正確的,但是如果就是出現了中間被人修改,但是又修改為原來的值(ABA),那麼它們是不知道。而AtomicStampedReference通過新增時間戳解決了這個問題。
/**
* 添加了新舊時間戳,資料庫我們經常用version來做時間戳
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
4,AtomicIntegerArray:無鎖陣列,還有其他:AtomicLongArray、AtomicReferenceArray。看個簡單例子吧:
//執行緒安全保證所有值都一樣
public class AtomicIntegerArrayDemo {
static AtomicIntegerArray arr = new AtomicIntegerArray(10);
public static class AddThread implements Runnable{
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
arr.getAndIncrement(i%arr.length());
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts = new Thread[10];
for(int k = 0;k<10;k++){
ts[k] = new Thread(new AddThread());
}
for(int k=0;k<10;k++){
ts[k].start();
}
for(int k=0;k<10;k++){
ts[k].join();
}
System.out.println(arr);
}
}
5,AtomicIntegerFieldUpdater:讓普通變數也享受原子操作,可以很容易幫我們以擴充套件的方式將原來不是執行緒安全的欄位屬性進行安全控制。還包括:AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。看例子理解:
/**
* 1,Updater只能修改它可見範圍內的變數,因為是通過反射獲取的,如果不可見就出錯;
* 2,為了確保變數被正確的讀取,必須為volatile型別的;
* 3,不支援static欄位操作
*/
public class AtomicIntegerFieldUpdaterDemo {
public static class Candidate{
int id;
volatile int score;
}
public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
public static AtomicInteger allSocre = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
final Candidate stu = new Candidate();
Thread[] t =new Thread[10000];
for (int i = 0; i < 10000; i++) {
t[i] = new Thread(){
@Override
public void run() {
if(Math.random()>0.4){
scoreUpdater.incrementAndGet(stu);
allSocre.incrementAndGet();
}
}
};
t[i].start();
}
for (int i = 0; i < 10000; i++) {
t[i].join();
}
System.out.println("score=" + stu.score);
System.out.println("allscore=" + allSocre);
}
}
四,避免死鎖:舉個例子:喝水需要水壺和杯子,當兩個執行緒出現一個持有水壺的鎖,一個持有杯子的鎖,而彼此都在等待彼此釋放鎖時,就會出現了死鎖。出現死鎖,執行緒就會一直永遠等待,對程式而言是非常嚴重的bug。好看個例子:
public class DeadLock extends Thread {
protected Object tool;
static Object fork1 = new Object();
static Object fork2 = new Object();
public DeadLock(Object obj){
this.tool = obj;
if(tool == fork1){
this.setName("哲學家A");
}
if(tool == fork2){
this.setName("哲學家B");
}
}
@Override
public void run() {
if(tool == fork1){
synchronized (fork1){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (fork2){
System.out.println("哲學家A開始吃飯了");
}
}
}
if(tool == fork2){
synchronized (fork2){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (fork1){
System.out.println("哲學家B開始吃飯了");
}
}
}
}
public static void main(String[] args) throws InterruptedException {
DeadLock zhexuejiaA = new DeadLock(fork1);
DeadLock zhexuejiaB = new DeadLock(fork2);
zhexuejiaA.start();
zhexuejiaB.start();
Thread.sleep(1000);
}
}
好了,多執行緒併發處理是提高了程式執行效率,但是帶了資源安全問題。鎖解決了資源安全問題,但是又有了效能問題,然後優化……程式碼級、jvm級、無鎖……又沒鎖了,但是要求編碼水平更高了,利用無鎖實現類似有鎖的功能,還是需要非常高的內功的,這樣解決方案轉轉轉……總是用合適的方法解決適合的問題。好了,多思考。繼續中……