C#多線程編程(6)--線程安全2 互鎖構造Interlocked
在線程安全1中,我介紹了線程同步的意義和一種實現線程同步的方法:volatile。volatile關鍵字屬於原子操作的一種,若對一個關鍵字使用volatile,很多時候會顯得很“浪費”,因為只有在並發訪問的情況下才需要“易變”讀寫,單線程訪問時並不需要。在命名空間System.Threading命名空間中提供了InterLock類,該類中提供了一些原子方法。本文來介紹如何使用這些方法。
- 互鎖
在搶占式系統中,一個線程在執行到任何階段都有可能被其他線程“打斷”,原子操作能夠保證該操作不被其他線程打斷,其他線程的“打斷”只可能發生在該操作的之前或之後。InterLock類中的方法就是”原子“式的,最常用的方法有:
public static class InterLock{ // return (++location) public static int Increment(ref int location); // return(--location) public static int Decrement(ref int location); // return(location += value) //註意,也可能是負數,從而實現減法運算。 public static int Add(ref int location, int value) // int old = location; location = value; return old;public static int Exchange(ref int location, int value); //old = location1; //if(location1 = comparand) location1 = value; //return old; public static int CompareExchange(ref int location1, int value, int comparand); ... }
《CLR via C#》的作者在書中說他喜歡Interlocked中的方法,因為它不但很快,而且不會阻塞線程。為了驗證InterLock真的很快,我們對變量進行一百萬次的寫,與Volatile.Write()來進行對比,看看是否真的快。
static void Main(string[] args){ TestInterLock(); Console.ReadLine(); } private static volatile int v = 0; private static int n = 0; static void TestInterLock(){ Stopwatch sw = Stopwatch.StartNew(); for (int i = 0; i < 1000000; i++) v++; Console.WriteLine("volatile write 1000000 times takes:{0}", sw.ElapsedMilliseconds); sw = Stopwatch.StartNew(); for (int i = 0; i < 1000000; i++) Interlocked.Increment(ref n); Console.WriteLine("InterLock write 1000000 times takes:{0}", sw.ElapsedMilliseconds); n = 0; sw = Stopwatch.StartNew(); for (int i = 0; i < 1000000; i++) n++; Console.WriteLine("n++ write 1000000 times takes:{0}", sw.ElapsedMilliseconds); }
運行結果如下:
volatile write 1000000 times takes:2
InterLock write 1000000 times takes:8
n++ write 1000000 times takes:2
我運行了好幾次,結果會有些出入,但是大部分的結果都是volatile的寫入速度和原生的n++的速度是一樣的。InterLock也確實如Jeffrey Richter所說,很快,只是沒有volatile關鍵字修飾的變量的讀寫快。這是在非並發情況下,下面來看一下並發情況下是否還是很快。
static void TestInterLock1(){ Stopwatch sw = Stopwatch.StartNew(); Task[] t2 = new[] { new Task(() => { for (int i = 0; i < 1000000; i++) Interlocked.Increment(ref n); }), new Task(() => { for (int i = 0; i < 1000000; i++) Interlocked.Increment(ref n);}) }; Task[] t1 = new[] { new Task(() => { for (int i = 0; i < 1000000; i++) v++; }), new Task(() => { for (int i = 0; i < 1000000; i++) v++;}) }; Task.WhenAll(t1) .ContinueWith(t => Console.WriteLine("volatile write 2000000 times takes:{0}", sw.ElapsedMilliseconds)); Task.WhenAll(t2) .ContinueWith(t => Console.WriteLine("InterLock write 2000000 times takes:{0}", sw.ElapsedMilliseconds)); t2[0].Start(); t1[0].Start(); t1[1].Start(); t2[1].Start(); }
先看運行結果,
volatile write 2000000 times takes:94
InterLock write 2000000 times takes:101
多次運行,運行時間會有不同,但是在並發情況下,volatile的寫入和InterLock的寫入速度幾乎相同。上述代碼寫的如此醜陋,而不是直接寫Task.Run(),是為了保證初始化部分都運行完成後,再Start(),且兩個任務的先後順序進行了打亂,最大限度減少誤差。可以看到並發情況下,volatile和InterLock幾乎一樣,且在InterLock中的方法要比Volatile的功能要全,但是在串行時,Volatile的性能要比InterLock要好。結論是,若只對變量讀寫,沒有替換或者其他復雜操作時,可以使用volatile關鍵字,但是一些復雜操作,需要原子操作時,就得使用InterLock中的方法了,如果使用volatile關鍵字修飾的變量來進行交換的話,很難保證原子性,只有引入鎖才能保證線程同步。且InterLock中提供了幾個重載方法,能夠接受object類型,還有泛型版本。
可以利用InterLock來實現一個簡單的自旋鎖,代碼如下:
public struct SimpleSpinLock{ private int m_Lock; public void Enter(){ while (true){ if (Interlocked.Exchange(ref m_Lock, 1) == 0) return; //此處可以添加黑科技 } } public void Leave(){ Volatile.Write(ref m_Lock, 0); } } //下面是如何使用SimpleSpinLock的例子 public class Simple{ private SimpleSpinLock m_lock = new SimpleSpinLock(); public void AccessResource(){ m_lock.Enter(); //執行某些程序,只有一個線程可以進入這裏 m_lock.Leave(); } }
這個簡單的自旋鎖在一個線程調用Enter()時,其他線程在調用m_lock.Enter()方法時,if (Interlocked.Exchange(ref m_lock, 1) == 0)會失敗,因為該方法會將m_lock和1交換,並返回舊值,在已有線程調用m_lock.Enter()時,m_lock的舊值是1,因此該方法會在whle(true)處自旋,不斷嘗試獲得鎖。該鎖的問題是,該線程沒有被阻塞(掛起),而是一直在占用CPU資源,其他需要CPU資源的線程無法運行(可以在while內,我加註釋的地方,加入”黑科技“來嘗試解決此問題。其思路是在線程自旋的過程中,立刻交出CPU資源,可通過調用Thread.Sleep(0)或者Thread.Yield()來實現。嘗試獲得鎖的線程交出時間片,這樣當前獲得鎖的線程能夠有更多的資源來運行程序,從而運行結束並交出鎖,具體細節在這裏不展開)。因此,自旋鎖只適合那些運行非常快的方法。
- Interlocked Anything 模式
Interlocked中全部是原子性操作,那是否提供了一個方法,該方法可以接受一個委托,保證該委托在運行時是原子的。答案是沒有,但是可以利用Interlocked.CompareExchange來自己實現一個。我們先來看一下利用CompareExchange來實現原子性的Maximum方法。
public static int Maximum(ref int target, int value){ int currentValue = target, startValue, desireValue; do{ startValue = currentValue; //可以在此處添加任何想要保證“原子性”的操作,此處是求最大值。 desireValue = Math.Max(startValue, value); //註意,此處有可能被其他線程搶占,也有可能target的值被修改,因此if()語句會出問題,要使用Interlocked的方法。 //if (startValue = target) target = desireValue; currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue); } //若在此時target已被修改,則重新計算最大值 while (startValue != currentValue); return currentValue; }
此方法的思路是:從CPU將target的值讀到寄存器中,到計算最大值結束,期間的任何時間target都有可能被其他線程修改。因此保證原子性就被轉換成保證計算最大值時,target的值沒有變過,如果變過,就重新計算。因此,在最開始的時候,startValue=currentValue,currentValue是開始計算時target的值。然後求得最大值,並保存到desireValue中。註意,此時target有可能被修改,因此調用CompareExchange方法,該方法會將target與startValue比較,如果此時兩值相等,那相當於我之前說的,target從開始到最後沒有改變,那麽這個最大值是準確的,並將target的舊值付給currentValue,最後如果startValue==currentValue,則計算完成,否則繼續循環。
《CLR via C#》的作者Jeff很喜歡上面的方法,他在實際開發中,都是使用上面的方法,並對其進行了包裝,使之能夠支持Interlocked Anything。其原理就是在desireValue=Math.Max()處替換成其他方法,只要在返回結果時保證舊值和最開始讀取的值一致就可以。我們來看一下他的封裝:
delegate int Morpher<TResult, in TArgument>(int startValue, TArgument argument, out TResult result); static TResult Morph<TResult, TArgumen>(ref int target, TArgumen argumen, Morpher<TResult, TArgumen> morpher){ TResult mophorResult; int currentValue = target, desireValue, startValue; do{ startValue = currentValue; desireValue = morpher(startValue, argumen, out mophorResult); currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue); } while (currentValue != startValue); return mophorResult; }
說實話,我並不能非常好了理解這個封裝,並不是不能理解做法,而是不能確定此方法到底能不能實現效果,那我們來測試一下。測試的基本思路是對一個變量執行1000次的result+=10。分別是不帶線程同步的和利用Morph方法對result+=10的方法進行互鎖,保證其原子性。我省去了Morpher和Morph的聲明部分。之所以要在DelayAdd和DelayAdd1方法中調用ThreadSleep(20),是為了模擬當在運行較長的方法時,Morph方法是否還能夠保證該方法運行的原子性。
static void Main(string[] args){ Test(new TestAction(Add), "Add"); Test(new TestAction(MorphAdd), "MorphAdd"); Console.ReadLine(); } //DelayAdd1的變種,使之能夠符合Morpher的簽名 static int DelayAdd(int startValue, int argument, out int result){ Thread.Sleep(20); result = startValue + argument; return result; } static void DelayAdd1(int argument, ref int result){ Thread.Sleep(20); result += argument; } //測試的不具有線程同步的方法。 static void Add(ref int result){ DelayAdd1(10, ref result); } //具有線程同步的方法。 static void MorphAdd(ref int result){ Morph(ref result, 10, new Morpher<int, int>(DelayAdd)); } //要測試的委托簽名 delegate void TestAction(ref int result); //公共測試方法 static void Test(TestAction action, string actionName){ int result = 0; var tList = new Task[1000]; for (int i = 0; i < 1000; i++) tList[i] = Task.Run(() => { action(ref result); }); Task.WhenAll(tList).GetAwaiter().OnCompleted( () => Console.WriteLine("{0}, Result is {1}", actionName, result)); }
運行,得到的結果是:
Add, Result is 8440
MorphAdd, Result is 10000
運行1000次result += 10,普通的Add不能夠得到正確結果,但是MorphAdd可以。這是因為在1000次的Add中,某幾個Add是同時調用的,result+=10在同一時間調用了多次,因此有156次的Add因為並行而被吞噬了。值得註意的是,MorphAdd方法因為需要線程同步,因此執行時間要慢很多。但是這些付出是值得的,因為這保證了結果的正確。
上述例子證明了Morph方法能夠保證委托的原子性,且該方法既不會阻塞線程也不會長時間的自旋,推薦大家在實際中使用該方法。
本文中,我先介紹了Interlocked類中較常用的方法,以及Interlocked.Increment()方法與volatile關鍵字的對比,結論是雖然將變量設置為所有讀寫都是“易變的”看起來很浪費,但是該關鍵字能夠保證在單線程時幾乎沒有性能損失,大部分情況下和原生的讀寫是一樣的速度,且volatile比Interlocked類中提供的寫要快2-4倍,但是在並發狀態下其性能和volatile關鍵字是沒有差別的。之後我介紹了用Interlocked類中的方法來實現簡單的自旋鎖,該鎖的優點是在非竟態情況下非常快,但是在竟態情況下,未獲得鎖的線程會一直處於自旋狀態,白白浪費CPU。最後介紹了《CLR via C#》書中提到的Interlocked Anything的方法(文中其他的知識點大多也是提取自《CLR via C#》),並測試了該方法確實可以保證委托的原子性,且不會阻塞線程,沒有鎖,不會造成死鎖。至此線程同步中互鎖構造就講完了,後面我會給大家介紹內核構造的信號量和其他鎖。
C#多線程編程(6)--線程安全2 互鎖構造Interlocked