1. 程式人生 > 實用技巧 >Kotlin 協程和 Android SQLite API 中的執行緒模型

Kotlin 協程和 Android SQLite API 中的執行緒模型

從 Room 2.1 版本之後,開發者們可以通過定義 suspend DAO 函式來使用 Kotlin 協程了。協程在處理非同步操作時表現得異常優秀,它可以讓您用順序自然的程式碼處理諸如操作資料庫一類的耗時操作,而不再需要專門線上程之間來回切換任務、處理結果或錯誤了。Room 支援協程後,可以在資料庫操作中使用由併發作用域、生命週期、巢狀所帶來的一些便利。

在我們為 Room 新增協程的支援期間,我們遇到並解決了在協程模型和 Android SQL API 中沒想到的一些問題。在本篇文章中,我們會向您闡述我們遇到的這些問題,以及我們的解決方案。

意料外的問題

先看一下下面這段程式碼,看似是安全的,但實際上這段程式碼存在著問題:

/**
 * 將指定的 [amount] 的金額,從 [accountA] 轉移到 [accountB]
 */
suspend fun transferMoney(accountA: String, accountB: String, amount: Int) {
   // 使用了 IO dispatcher,所以該 DB 的操作在 IO 執行緒上進行
    withContext(Dispatchers.IO) {
        database.beginTransaction() //在 IO-Thread-1 執行緒上開始執行事務
        try {
    // 協程可以在與排程器(這裡就是 Dispatchers.IO)相關聯的任何執行緒上繫結並繼續執行。同時,由於事務也是在 IO-Thread-1 中開始的,因此我們可能恰好可以成功執行查詢。
            moneyDao.decrease(accountA, amount) //掛起函式

    // 如果協程又繼續在 IO-Thread-2 上執行,那麼下列操作資料庫的程式碼可能會引起死鎖,因為它需要等到 IO-Thread-1 的執行緒執行結束後才可以繼續。
            moneyDao.increase(accountB, amount) //掛起函式

            database.setTransactionSuccessful() //永遠不會執行這一行
        } finally {
            database.endTransaction() //永遠不會執行這一行
        }
    }
}

Android 的 SQLite 事務受制於單個執行緒

上述程式碼中的問題在於 Android 的 SQLite 事務是受制於單個執行緒的。當一個正在進行的事務中的某個查詢在當前執行緒中被執行時,它會被視為是該事務的一部分並允許繼續執行。但當這個查詢在另外一個執行緒中被執行時,那它就不再屬於這個事務的一部分了,這樣的話就會導致這個查詢被阻塞,直到事務在另外一個執行緒執行完成。這也是 beginTransaction 和 endTransaction 這兩個 API 能夠保證原子性的一個前提。當資料庫的事務操作都是在一個執行緒上完成的,這樣的 API 不會有任何問題,但是使用協程之後問題就來了,因為協程是不繫結在任何特定的執行緒上的。也就是說,問題的根源就是在協程掛起之後會繼續執行所繫結的那個執行緒,而這樣是不能保證和掛起之前所繫結的執行緒是同一個執行緒。

在協程中使用資料庫事務操作可能會引起死鎖

簡單實現

為了解決 Android SQLite 的這個限制,我們需要一個類似於 runInTransaction 這樣可以接受掛起程式碼塊的 API,這個 API 實現起來就像寫一個單執行緒的排程器一樣:

suspend fun <T> RoomDatabase.runInTransaction(
    block: suspend () -> T
): T = withContext(newSingleThreadContext("DB")) {
    beginTransaction()
    try {
        val result = block.invoke(this)
        setTransactionSuccessful()
        return@runBlocking result
    } finally {
        endTransaction()
    }
}

以上實現僅僅是個開始,但是當在掛起程式碼塊中使用另一個排程器的話就會出問題了:

// 一個很簡單的退稅函式
suspend fun sendTaxRefund(federalAccount: String, taypayerList: List<Taxpayer>) {
    database.runInTransaction {
        val refundJobs = taypayerList.map { taxpayer ->
            coroutineScope {
             // 並行去計算退稅金額
                async(Dispatchers.IO) {
                    val amount = irsTool.calculateRefund(taxpayer)
                    moneyDao.decrease(federalAccount, amount)
                    moneyDao.increase(taxpayer.account, amount)
                }
            }
        }
        // 等待所有計算任務結束
        refundJobs.joinAll()
    }
}

因為接收的引數是一個掛起程式碼塊,所以這部分程式碼就有可能使用一個不同的排程器來啟動子協程,這樣就會導致執行資料庫操作的是另外的一個執行緒。因此,一個比較好的實現是應該允許使用類似於 async、launch 或 withContext 這樣的標準協程構造器。而在實際應用中,只有資料庫操作才需要被排程到單事務執行緒。

介紹 withTransaction

為了解決上面的問題,我們構建了withTransaction API,它模仿了 withContext API,但是提供了專為安全執行 Room 事務而構建的協程上下文,您可以按照如下方式編寫程式碼:

fun transferMoney(
    accountA: String,
    accountB: String,
    amount: Int
) = GlobalScope.launch(Dispatchers.Main) {
    roomDatabase.withTransaction {
        moneyDao.decrease(accountA, amount)
        moneyDao.increase(accountB, amount)
    }
    Toast.makeText(context, "Transfer Completed.", Toast.LENGTH_SHORT).show()
}

在深入研究 Room withTransaction API 的實現前,讓我們先回顧一下已提到的一些協程的概念。CoroutineContext包含了需要對協程任務進行排程的資訊,它攜帶了當前的CoroutineDispatcher和Job 物件,以及一些額外的資料,當然也可以對它進行擴充套件來使其包含更多資訊。CoroutineContext 的一個重要特徵是它們被同一協程作用域下的子協程所繼承,比如 withContext 程式碼塊的作用域。這一機制能夠讓子協程繼續使用同一個排程器,或在父協程被取消時,它們會被一起取消。本質上,Room 提供的掛起事務 API 會建立一個專門的協程上下文來在同一個事務作用域下執行資料庫操作。

withTransaction API 在上下文中建立了三個關鍵元素:

  • 單執行緒排程器,用於執行資料庫操作;
  • 上下文元素,幫助 DAO 函式判斷其是否處在事務中;
  • ThreadContextElement,用來標記事務協程中所使用的排程執行緒。

事務排程器

CoroutineDispatcher 會決定協程該繫結到哪個執行緒中執行。比如,Dispatchers.IO會使用一個共享執行緒池分流執行那些會發生阻塞的操作,而Dispatchers.Main會在 Android 主執行緒中執行協程。由 Room 建立的事務排程器能夠從Room 的 Executor獲取單一執行緒,並將事務分發給該執行緒,而不是分發給一個隨意建立的新執行緒。這一點很重要,因為 executor 可以由使用者來配置,並且可作為測試工具使用。在事務開始時,Room 會獲得 executor 中某個執行緒的控制權,直到事務結束。在事務執行期間,即使排程器因子協程發生了變化,已執行的資料庫操作仍會被分配到該事務執行緒上。

獲取一個事務執行緒並不是一個阻塞操作,它也不應該是阻塞操作,因為如果沒有可用執行緒的話,應該執行掛起操作,然後通知呼叫方,避免影響其他協程的執行。它還會將一個 runnable 插入佇列,然後等待其執行,這也是執行緒可執行的一個標誌。suspendCancellableCoroutine函式為我們搭建了連線基於回撥的 API 和協程之間的橋樑。在這種情況下,一旦之前入佇列的 runnable 執行了,就代表著一個執行緒可用,我們會使用 runBlocking 啟動一個事件迴圈來獲取此執行緒的控制權。然後runBlocking所建立的排程器會將要執行的程式碼塊分發給已獲得的執行緒。另外,Job 被用來掛起和保持執行緒的可用性,直到事務執行完成為止。要注意的是,一旦協程被取消了或者是無法獲取到執行緒,就要有防範措施。獲取事務執行緒的相關程式碼如下:

/**
 *構建並返回一個 [ContinuationInterceptor] 用來將協程分發到獲取到的執行緒中,並執行事務。[controlJob] 用來通過取消任務來控制執行緒的釋放。
 */
private suspend fun Executor.acquireTransactionThread(
    controlJob: Job
): ContinuationInterceptor = suspendCancellableCoroutine { continuation ->
    continuation.invokeOnCancellation {
        // 當我們在等待獲取到可用執行緒時,如果失敗了或者任務取消,我們是不能夠停止等待這一動作的,但我們可以取消 controlJob,這樣一旦獲取到控制權,很快就會被釋放。
        controlJob.cancel()
    }
    try {
        execute {
            // runBlocking 建立一個 event loop 來執行協程中的任務程式碼
            runBlocking {
               // 獲取到執行緒後,通過返回有 runBlocking 建立的攔截器來恢復 suspendCancellableCoroutine,攔截器將會被用來攔截和分發程式碼塊到獲取的執行緒中
                continuation.resume(coroutineContext[ContinuationInterceptor]!!)

               // 掛起 runBlocking 協程,直到 controlJob 完成。由於協程是空的,所以這將會阻止 runBlocking 立即結束。
                controlJob.join()
            }
        }
    } catch (ex: RejectedExecutionException) {
       // 無法獲取執行緒,取消協程
        continuation.cancel(
            IllegalStateException(
                "Unable to acquire a thread to perform the transaction.", ex)
        )
    }
}

事務上下文元素

有了排程器後,我們就可以建立事務中的元素來新增到上下文中,並保持著對排程器的引用。如果在事務作用域內呼叫了 DAO 函式,就可以把 DAO 函式重新路由到相應的執行緒中。我們建立的事務元素如下:

internal class TransactionElement(
    private val transactionThreadControlJob: Job,
    internal val transactionDispatcher: ContinuationInterceptor
) : CoroutineContext.Element {

   // Singleton key 用於檢索此上下文中的 element
    companion object Key : CoroutineContext.Key<TransactionElement>

    override val key: CoroutineContext.Key<TransactionElement>
        get() = TransactionElement

    /**
     *這個 element 用來統計事務數量(包含巢狀事務)。呼叫 [acquire] 來增加計數,呼叫 [release] 來減少計數。如果在呼叫 [release] 時計數達到 0,則事務被取消,事務執行緒會被釋放
     */
    private val referenceCount = AtomicInteger(0)

    fun acquire() {
        referenceCount.incrementAndGet()
    }

    fun release() {
        val count = referenceCount.decrementAndGet()
        if (count < 0) {
            throw IllegalStateException(
                "Transaction was never started or was already released.")
        } else if (count == 0) {
           // 取消控制事務執行緒的 job 會導致它被 release
            transactionThreadControlJob.cancel()
        }
    }
}

TransactionElement 函式中的 acquire 和 release 是用來跟蹤巢狀事務的。由於 beginTransaction 和 endTransaction 允許巢狀呼叫,我們也想保留這個特性,但是我們只需要在最外層事務完成時釋放事務執行緒即可。這些功能的用法在稍後的 withTransaction 實現中會介紹。

事務執行緒標記

上文中提到的建立事務上下文中所需的最後一個關鍵元素是ThreadContextElement。CoroutineContext 中的這個元素類似於ThreadLocal,它能夠跟蹤執行緒中是否有正在進行的事務。這個 element 是由一個 ThreadLocal 支援,對於排程器所用的每個執行緒,它都會在 ThreadLocal 上設定一個值來執行協程程式碼塊。執行緒一旦完成任務後,這個值會被重置。在我們的例子中,這個值是沒有意義的,在 Room 中也只需要確定這個值是否存在即可。如果協程上下文可以訪問平臺中存在的ThreadLocal<SQLiteSession>,則可以從協程所繫結的任何執行緒向其分發 begin/ends 命令,如果做不到,那在事務完成前只能阻塞執行緒。但我們仍然需要追蹤每個阻塞的資料庫方法是在哪個事務上執行,以及哪個執行緒負責平臺事務。

Room 的 withTransaction API 中使用的 ThreadContextElement 會標識資料庫中的阻塞函式。Room 中的阻塞函式,包含 DAO 生成的那些,在它們被事務協程呼叫後會被特殊處理,用來保證它們不會在其他的排程器上執行。如果您的 DAO 同時具有這兩種型別的功能,則可以在 withTransaction 塊中將阻塞函式與掛起函式混合和匹配。通過將 ThreadContextElement 新增到協程上下文中,並從 DAO 函式中訪問它,我們可以驗證阻塞函式是否處於正確的作用域中。如果不是,我們會丟擲異常而不是造成死鎖。在之後,我們計劃將阻塞函式也重新路由到事務執行緒中。

private final ThreadLocal<Integer> mSuspendingTransactionId = new ThreadLocal<>();
public void assertNotSuspendingTransaction() {
    if (!inTransaction() && mSuspendingTransactionId.get() != null) {
        throw new IllegalStateException("Cannot access database on a different"
                + " coroutine context inherited from a suspending transaction.");
    }
}

這三個元素的組合構成了我們的事務上下文:

private suspend fun RoomDatabase.createTransactionContext(): CoroutineContext {
    val controlJob = Job()
    val dispatcher = queryExecutor.acquireTransactionThread(controlJob)
    val transactionElement = TransactionElement(controlJob, dispatcher)
    val threadLocalElement =
        suspendingTransactionId.asContextElement(controlJob.hashCode())
    return dispatcher + transactionElement + threadLocalElement
}

事務 API 的實現

建立了事務上下文之後,我們終於可以提供一個安全的 API 用於在協程中執行資料庫事務。接下來要做的就是將這個上下文和通常的 begin/end 事務模式結合起來:

suspend fun <R> RoomDatabase.withTransaction(
    block: suspend () -> R
): R {
    // 如果可以的話就使用繼承的事務上下文,這樣允許巢狀掛起的事務
    val transactionContext =
        coroutineContext[TransactionElement]?.transactionDispatcher 
            ?: createTransactionContext()
    return withContext(transactionContext) {
        val transactionElement = coroutineContext[TransactionElement]!!
        transactionElement.acquire()
        try {
            beginTransaction()
            try {
                // 在一個新的 scope 中封裝 suspend 程式碼塊,來等待子協程
                val result = coroutineScope {
                    block.invoke(this)
                }
                setTransactionSuccessful()
                return@withContext result
            } finally {
                endTransaction()
            }
        } finally {
            transactionElement.release()
        }
    }
}

Android 中 SQLite 的執行緒限制是合理的,這在 Kotlin 還沒出現時已然如此設計了。協程引入了新的程式設計正規化,改變了傳統 Java 併發程式設計的一些思維模式。直接取消 Android 執行緒對 SQLite 事務的限制是不可行的,因為我們希望提供一個向後相容的解決方案,而上述這些方法的組合最終讓我們在使用協程和 Fluent API 的解決方案中發揮了創造性。

來源:烏蘭察布SEO