1. 程式人生 > >Android使用訊號量Semaphore進行多執行緒任務排程

Android使用訊號量Semaphore進行多執行緒任務排程

話不多說,先上程式碼

import android.os.Handler;
import android.os.Looper;
import android.os.Message;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class TaskDispatcher {

    private LinkedList<Runnable> mTaskList;         // 任務佇列
    private ExecutorService mThreadPool;            // 執行緒池
    private Thread mPollingThead;                   // 輪詢執行緒
    private Handler mPollingHanler;                 // 輪詢執行緒中的Handler
    private static int mThreadCount = 3;            // 執行緒池的執行緒數量,預設為3
    private Type mType = Type.LIFO;                 // 佇列的排程方式,預設為LIFO
    private volatile Semaphore mPollingSemaphore;   // 訊號量,由於執行緒池內部也有一個阻塞執行緒,若加入任務的速度過快,LIFO效果不明顯
    private volatile Semaphore mSemaphore = new Semaphore(0);  //訊號量,防止mPoolThreadHander未初始化完成

    private static TaskDispatcher mInstance;

    public enum Type { FIFO, LIFO }

    /**
     * 單例獲得例項物件
     * @return   例項物件
     */
    public static TaskDispatcher getInstance() {
        if (mInstance == null) {
            synchronized (TaskDispatcher.class) {
                if (mInstance == null) {
                    mInstance = new TaskDispatcher(mThreadCount, Type.LIFO);
                }
            }
        }
        return mInstance;
    }

    /**
     * 單例獲得例項物件
     * @param threadCount    執行緒池的執行緒數量
     * @param type           佇列的排程方式
     * @return   例項物件
     */
    public static TaskDispatcher getInstance(int threadCount, Type type) {
        if (mInstance == null) {
            synchronized (TaskDispatcher.class) {
                if (mInstance == null) {
                    mInstance = new TaskDispatcher(threadCount, type);
                }
            }
        }
        return mInstance;
    }

    /**
     * 建構函式
     * @param threadCount    執行緒池的執行緒數量
     * @param type           佇列的排程方式
     */
    private TaskDispatcher(int threadCount, Type type) {
        init(threadCount, type);
    }

    /**
     * 初始化
     * @param threadCount    執行緒池的執行緒數量
     * @param type           佇列的排程方式
     */
    private void init(int threadCount, Type type) {

        mThreadPool = Executors.newFixedThreadPool(threadCount);
        mPollingSemaphore = new Semaphore(threadCount);
        mTaskList = new LinkedList<Runnable>();
        mType = type == null ? Type.LIFO : type;

        // 開啟輪詢執行緒
        mPollingThead = new Thread() {
            @Override
            public void run() {
                Looper.prepare();
                mPollingHanler = new Handler() {
                    @Override
                    public void handleMessage(Message msg) {
                        mThreadPool.execute(getTask());
                        try {
                            mPollingSemaphore.acquire();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                mSemaphore.release();   // 釋放一個訊號量
                Looper.loop();
            }
        };
        mPollingThead.start();
    }

    /**
     * 新增一個任務
     * @param task   任務
     */
    private synchronized void addTask(Runnable task) {
        try {
            // mPollingHanler為空時,請求訊號量,因為mPollingHanler建立完成會釋放一個訊號量
            if (mPollingHanler == null) mSemaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        mTaskList.add(task);

        mPollingHanler.sendEmptyMessage(0x110);
    }

    /**
     * 取出一個任務
     * @return 需要執行的任務
     */
    private synchronized Runnable getTask() {
        if (mType == Type.LIFO) {
            return mTaskList.removeLast();
        } else if (mType == Type.FIFO) {
            return mTaskList.removeFirst();
        }
        return null;
    }

    /**
     * 執行自定義的任務
     */
    public void doTask() {
        addTask(new Runnable() {
            @Override
            public void run() {

                // Todo what you like

                mPollingSemaphore.release();  //這句必須有,任務處理完成後釋放一個訊號量
            }
        });
    }
}

註釋都比較詳細了,使用起來也很簡單
TaskDispatcher.getInstance().doTask();
詳細分析,日後補充。