1. 程式人生 > 其它 >【工具】簡陋的非同步轉同步佇列

【工具】簡陋的非同步轉同步佇列

前言

最近碰到一個場景,在開發一個需求的過程中將系統的介面封裝了一層,但是系統介面全部是以非同步的形式回撥的,這導致兩個問題:

  1. 外部操作傳入封裝類的時候,上一次的操作還未完成,導致封裝內部物件狀態並不正確
  2. 某些場景下外部操作會併發地建立,因此需要一個佇列來完成有序執行和生產者阻塞

基於以上兩點,需要實現一個簡易的佇列,滿足開發中的需要

過程分析

首先需要建立一個佇列,用來儲存 Runnable 任務,在這裡選擇的是 LinkedBlockingQueue 佇列,當然也可以使用 ArrayBlockingQueue 佇列限制任務數,或者在 SimpleAsyncToSyncQueue 內部自己實現

SimpleAsyncToSyncQueue 本身作為任務分配者,單獨執行在一個執行緒內,此時生產者可能在不同執行緒中同時向 LinkedBlockingQueue 傳入任務,這部分存在多執行緒操作的情況,好在 LinkedBlockingQueue 內部已經實現

因為生產者或消費者可能處於不同的執行緒中,因此實際控制的邏輯均放在 SimpleAsyncToSyncQueue 中,此處用到的是 Java 提供的執行緒鎖工具 LockSupport,在運行了 Runnable 之後,SimpleAsyncToSyncQueue 會呼叫 LockSupport.park() 將自身執行緒鎖定,等待任務完成

Runnable 執行完成並不一定是任務完成,真正的任務可能在多個呼叫鏈之後才結束,因此在任務的呼叫鏈中無論是正常結束、非法值返回又或者丟擲異常,都需要呼叫 unlock() 方法解鎖SimpleAsyncToSyncQueue 執行緒

結論

package xyz.slkagura.thread;

import android.os.Handler;
import android.os.Looper;

import androidx.annotation.NonNull;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;

public class SimpleAsyncToSyncQueue {
    private static final String SIMPLE_ASYNC_TO_SYNC_QUEUE_TAG = SimpleAsyncToSyncQueue.class.getSimpleName();
    
    private final LinkedBlockingQueue<Runnable> mQueue = new LinkedBlockingQueue<>();
    
    private final Thread mThread = new Thread(this::run);
    
    private final Handler mHandler = new Handler(Looper.getMainLooper());
    
    private boolean mIsAuto = false;
    
    public void start() {
        if (!mThread.isAlive()) {
            mThread.start();
        }
    }
    
    public void stop() {
        if (mThread.isAlive() && !mThread.isInterrupted()) {
            mThread.interrupt();
        }
    }
    
    public void setAuto(boolean isAuto) {
        mIsAuto = isAuto;
    }
    
    private void run() {
        try {
            if (mIsAuto) {
                mQueue.take().run();
            }
            while (!mThread.isInterrupted()) {
                LockSupport.park();
                Runnable runnable = mQueue.take();
                runnable.run();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
    public void offer(@NonNull Runnable runnable, boolean isMain) {
        if (isMain) {
            mQueue.offer(() -> {
                mHandler.post(runnable);
            });
        } else {
            mQueue.offer(runnable);
        }
    }
    
    public void offer(@NonNull Runnable runnable) {
        offer(runnable, false);
    }
    
    public void unlock() {
        LockSupport.unpark(mThread);
    }
    
    public static void test() {
        SimpleAsyncToSyncQueue consumer = new SimpleAsyncToSyncQueue();
        consumer.setAuto(true);
        consumer.start();
        for (int i = 0; i < 100; i++) {
            final int id = i;
            boolean isSync = Math.random() < 0.9D;
            String groupId = isSync ? "group-1" : "group-2";
            consumer.offer(() -> {
                Log.d(SIMPLE_ASYNC_TO_SYNC_QUEUE_TAG, "task: ", id, " group: ", groupId, " sync: ", String.valueOf(isSync), " start: ", System.nanoTime());
                new Thread(() -> {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    consumer.unlock();
                    Log.d(SIMPLE_ASYNC_TO_SYNC_QUEUE_TAG, "task: ", id, " group: ", groupId, " sync: ", String.valueOf(isSync), " unlock: ", System.nanoTime());
                }).start();
                Log.d(SIMPLE_ASYNC_TO_SYNC_QUEUE_TAG, "task: ", id, " group: ", groupId, " sync: ", String.valueOf(isSync), " end: ", System.nanoTime());
            });
        }
    }
}