軟體事務記憶體導論(四)建立事務
宣告:本文是《Java虛擬機器併發程式設計》的第六章,感謝華章出版社授權併發程式設計網站釋出此文,禁止以任何形式轉載此文。
建立事務
我們建立事務的目的是為了協調針對多個託管引用的變更。事務將會保證這些變更是原子的,也就是說,所有的託管引用要麼全部被提交要麼全部被丟棄,所以在事務之外我們將不會看到有任何區域性變更(partial changes)出現。此外,我們也可以用建立事務的方式來解決對單個ref先讀後寫所引發的相關問題。
Akka是用Scala開發出來的,所以如果我們工作中用的是Scala的話,就可以直接幸福地享用Akka簡潔明瞭的API了。對於那些日常工作中不能使用Scala開發的程式設計師,Akka同樣也提供了一組方便的API,以幫助他們通過Java語言來使用該類庫的功能。本節我們將會看到如何利用Akka在Java和Scala中建立事務。
首先我們需要選一個適合用事務來解決的例子。我們在第5章中重構的EnergySource類使用了顯式的加鎖和解鎖操作(其最終版本詳見5.7節),下面讓就我們將這些顯式的加鎖/解鎖操作換用Akka的事務API來實現。
在Java中建立事務
為了將程式碼邏輯封裝到一個事務中,我們需要建立一個Atomic類的例項並將程式碼放到該類的atomically()函式裡。隨後,我們可以通過呼叫Atomic例項的execute()函式來執行事務程式碼。類似於下面這樣:
return new Atomic<Object>() { public Object atomically() { //code to run in a transaction... return resultObject; } }.execute();
呼叫execute()函式的執行緒將負責執行atomically()函式裡的程式碼。然而如果呼叫者本身並沒有處在一個事務中的話,那麼這個呼叫將會被封裝在一個新的事務中。
下面讓我們用Akka事務來重新實現EnergySource。首先,讓我們將不可變狀態封裝到可變的Akka託管引用中去。
public class EnergySource { private final long MAXLEVEL = 100; final Ref<Long> level = new Ref<Long>(MAXLEVEL); final Ref<Long> usageCount = new Ref<Long>(0L); final Ref<Boolean> keepRunning = new Ref<Boolean>(true); private static final ScheduledExecutorService replenishTimer = Executors.newScheduledThreadPool(10);
在這段變數定義的程式碼中,level和usageCount都被宣告為Akka Ref,並且各自持有一個不可變的Long型別的值。於是在Java中我們就不能更改這些Long型別的值了,但我們仍然可以通過更改託管引用(即實體)使其安全地指向新值。
在EnergySource的上一個版本中,ScheduledExecutorService會週期性地(每秒鐘一次)呼叫replenish()函式直至整個任務結束,這就要求stopEnergySource()必須是同步的。而在這個版本中,我們不用再週期性地呼叫replenish()函式,而只會在物件例項初始化的時候執行一下排程操作。在每次呼叫replenish()函式時,我們都會根據keepRunning的值來決定該函式是否應該在1秒之後再次被排程執行。這一變化消除了stopEnergySource()函式和排程器/計時器(timer)之間的耦合。相反地,stopEnergySource()函式現在只依賴於keepRunning這個標誌,而該標誌可以很容易地通過STM事務來行管理。
在這一版的程式碼中,由於可以依賴事務來保證安全性,所以我們沒必要再對stopEnergySource()函式進行同步了。同時,由於swap()函式本身就是以事務方式執行的,所以我們也無需顯式地為其建立事務。
private EnergySource() {} private void init() { replenishTimer.schedule(new Runnable() { public void run() { replenish(); if (keepRunning.get()) replenishTimer.schedule( this, 1, TimeUnit.SECONDS); } }, 1, TimeUnit.SECONDS); } public static EnergySource create() { final EnergySource energySource = new EnergySource(); energySource.init(); return energySource; } public void stopEnergySource() { keepRunning.swap(false); }
如下所示,返回當前電量和使用次數的方法將會用到託管引用,但也只是需要呼叫一下get()函式而已。
public long getUnitsAvailable() { return level.get(); } public long getUsageCount() { return usageCount.get(); }
在getUnitsAvailable()函式和getUsageCount()函式中,由於其中的get()函式都是以事務方式執行的,所以無需顯式地將它們封裝在事務裡。
由於我們會在useEnergy()函式中同時修改電量和使用次數,所以useEnergy()函式需要使用一個顯式的事務來完成這些操作。在這裡,我們需要保證對所有被讀取的值的變更都能保持一致性,即確保對這兩個欄位的變更是原子的。為了實現這一目標,我們將使用Atomic介面,並用atomically()函式將我們的邏輯程式碼封裝到一個事務中。
public boolean useEnergy(final long units) { return new Atomic<Boolean>() { public Boolean atomically() { long currentLevel = level.get(); if(units > 0 && currentLevel >= units) { level.swap(currentLevel - units); usageCount.swap(usageCount.get() + 1); return true; } else { return false; } } }.execute(); }
useEnergy()函式的功能是從當前電量中減掉所消耗的電量(即unit——譯者注)。為了實現這一目標,我們需要保證所涉及到的get和set操作都在同一個事務中完成,所以我們把所有相關操作都用atomically()函式封裝了起來。最後,我們會呼叫execute()函式來啟動事務並順序執行的所有操作。
除了上述方法之外,我們還需要關注一下負責給電源充電的replenish()函式。由於這個方法也需要使用事務,所以其實現程式碼同樣需要用Atomic進行封裝。
private void replenish() { new Atomic() { public Object atomically() { long currentLevel = level.get(); if (currentLevel < MAXLEVEL) level.swap(currentLevel + 1); return null; } }.execute(); } }
下面是針對EnergySource類的測試程式碼。其主要功能是,用多個執行緒併發地使用電池,每使用一次消耗一格電,每個執行緒最多會消耗7格電量。
public class UseEnergySource { private static final EnergySource energySource = EnergySource.create(); public static void main(final String[] args) throws InterruptedException, ExecutionException { System.out.println("Energy level at start: " + energySource.getUnitsAvailable()); List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(); for(int i = 0; i < 10; i++) { tasks.add(new Callable<Object>() { public Object call() { for(int j = 0; j < 7; j++) energySource.useEnergy(1); return null; } }); } final ExecutorService service = Executors.newFixedThreadPool(10); service.invokeAll(tasks); System.out.println("Energy level at end: " + energySource.getUnitsAvailable()); System.out.println("Usage: " + energySource.getUsageCount()); energySource.stopEnergySource(); service.shutdown(); } }
上述程式碼需要把Akka相關的Jar新增到Java的classpath中才能編譯通過,所以首先我們需要建立一個標識jar位置的環境變數:
export AKKA_JARS="$AKKA_HOME/lib/scala-library.jar:\ $AKKA_HOME/lib/akka/akka-stm-1.1.3.jar:\ $AKKA_HOME/lib/akka/akka-actor-1.1.3.jar:\ $AKKA_HOME/lib/akka/multiverse-alpha-0.6.2.jar:\ $AKKA_HOME/config:\ ."
Classpath的定義取決於你使用的作業系統以及Akka在你的作業系統中被安裝的位置。我們可以用javac編譯器來編譯程式碼,並用java命令來負責執行,具體細節如下所示:
javac -classpath $AKKA_JARS -d . EnergySource.java UseEnergySource.java java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseEnergySource
萬事俱備,下面讓我們來編譯並執行這段程式碼。通過程式碼的實現邏輯我們知道,電源初始有100格電量,而我們建立的10個執行緒將會消耗掉其中的70格電量,所以最後電源應該淨剩30格電量。但由於電池電量會每秒回覆一格,所以每次執行結果可能會稍有不同,比如最後淨剩電量可能是31格而不是30格。
Energy level at start: 100 Energy level at end: 30 Usage: 70
預設情況下,Akka會將額外的日誌訊息列印到標準輸出上。停掉這個預設的輸出也很容易,我們只需要在$AKKA_HOME/config目錄下建立一個名為logback.xml的檔案,並在裡面新增這項配置即可。由於這個檔案位於classpath中,所以logger會自動找到這個檔案、讀取其中的配置並停掉訊息輸出。除此之外,我們還可以在這個配置檔案中設定很多其他有用的配置項。詳情請見http://logback.qos.ch/manual/configuration.html。
正如我們在本例中所看到的那樣,Akka是在後臺默默地對事務進行管理的,所以請你多花些時間研究一下上述示例程式碼,並對事務和執行緒的運作過程多做一些嘗試以便加深對這塊知識的理解。
在Scala中建立事務
我們之前已經看到了如何在Java中建立事務(並且我假設你已經閱讀過那一部分,所以這裡我們就不再贅述了),下面我們將會在Scala中用更少的程式碼來完成同樣的功能。我們之所以能兼顧簡潔與功能,部分得益於Scala自身簡潔的特點,但更多還是由於Akka API使用了閉包/函式值(closures/function values)的緣故。
相比Java的繁冗,我們在Scala中可以通過很簡潔的方法來建立事務。我們所需要做的只是呼叫一下Stm的auomic()函式就行了,如下所示:
atomic { //code to run in a transaction.... /* return */ resultObject }
其中,我們傳給atomic()的閉包/函式值僅在當前執行緒所執行的那個事務內可見。
下面就是使用了Akka事務的Scala版本的EnergySource實現程式碼:
class EnergySource private() { private val MAXLEVEL = 100L val level = Ref(MAXLEVEL) val usageCount = Ref(0L) val keepRunning = Ref(true) private def init() = { EnergySource.replenishTimer.schedule(new Runnable() { def run() = { replenish if (keepRunning.get) EnergySource.replenishTimer.schedule( this, 1, TimeUnit.SECONDS) } }, 1, TimeUnit.SECONDS) } def stopEnergySource() = keepRunning.swap(false) def getUnitsAvailable() = level.get def getUsageCount() = usageCount.get def useEnergy(units : Long) = { atomic { val currentLevel = level.get if(units > 0 && currentLevel >= units) { level.swap(currentLevel - units) usageCount.swap(usageCount.get + 1) true } else false } } private def replenish() = atomic { if(level.get < MAXLEVEL) level.swap(level.get + 1) } } object EnergySource { val replenishTimer = Executors.newScheduledThreadPool(10) def create() = { val energySource = new EnergySource energySource.init energySource } }
作為一個完全的面嚮物件語言,Scala認為靜態方法是不適合放在類的定義中的,所以工廠方法create()就被移到其伴生物件裡面去了。餘下的程式碼和Java版本非常相近,只是較之更為簡潔。同時,由於使用了優雅的atomic()函式,我們就可以拋開Atomic類和execute()函式呼叫了。
Scala版本的EnergySource的測試用例如下所示。在併發和執行緒控制的實現方面,我們既可以像Java版本那樣採用JDK的ExecutorService來管理執行緒,也可以使用Scala的角色(actor)[1] 來為每個併發任務分配執行執行緒。這裡我們將採用第二種方式。當任務完成之後,每個任務都會給呼叫者返回一個響應,而呼叫者則需要等待所有任務結束之後才能繼續執行。
object UseEnergySource { val energySource = EnergySource.create() def main(args : Array[String]) { println("Energy level at start: " + energySource.getUnitsAvailable()) val caller = self for(i <- 1 to 10) actor { for(j <- 1 to 7) energySource.useEnergy(1) caller ! true } for(i <- 1 to 10) { receiveWithin(1000) { case message => } } println("Energy level at end: " + energySource.getUnitsAvailable()) println("Usage: " + energySource.getUsageCount()) energySource.stopEnergySource() } }
我們可以採用如下命令來引入Akka相關的Jar並編譯執行上述程式碼,其中環境變數AKKA_JARS與我們在Java示例中的定義相同:
scalac -classpath $AKKA_JARS *.scala java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseEnergySource
Scala版本程式碼的輸出結果與我們在Java版本中所看到的沒什麼兩樣,並同樣依賴於電量恢復的節奏,即可能最終剩餘電量是31而不是30。
Energy level at start: 100 Energy level at end: 30 Usage: 70
[1]這裡提到Scala的角色(actor)僅僅是為了說明有這種方法可供使用。後面我們還將會學習如何使用功能更為強大的Akka actor。