1. 程式人生 > >多執行緒設計模式——Half-sync/Half-async(半同步/半非同步)模式

多執行緒設計模式——Half-sync/Half-async(半同步/半非同步)模式

這些都是根據我最近看的《Java實戰指南多執行緒程式設計(設計模式篇)》所得整理。

模式名稱

Half-sync/Half-async(半同步/半非同步)模式

模式解決的問題

同步和非同步各有各的優勢,有沒有一個方法,能夠既保持了同步程式設計的簡單性,又充分發揮非同步程式設計在提高系統併發性方面的優勢。

模式的實現思路

Half-sync/Half-async(半同步/半非同步)模式是一個分層架構。它包含三個層次:非同步任務層、同步任務層和佇列層。Half-sync/Half-async(半同步/半非同步)模式的核心思想是如何將系統中的任務進行恰當的分解,使各個子任務落入合適的層次中。
低階的任務或者耗時較短的任務可以安排在非同步任務層。而高階的任務或者耗時較長的任務可以安排在同步任務層。而非同步任務層和同步任務層這兩層之間的寫作通過佇列層進行解耦:佇列層負責非同步任務層和同步任務層之間的資料交換。
Half-sync/Half-async(半同步/半非同步)模式的參與者:
AsybcTask非同步任務,負責接收來自客戶端的輸入,對其進行初步處理,並通過對壘與相應的同步任務通訊:duspatch()對輸入進行初步處理,並構造相應訊息放入佇列由相應的同步任務進行處理。
Queue佇列,非同步任務層和同步任務層進行通訊的中介:enqueue()訊息入佇列,dequeue()訊息出佇列
SyncTask同步任務,負責處理佇列中訊息鎖對應的計算:run執行同步任務

Created with Raphaël 2.1.0ClientClientAsyncTaskAsyncTaskQueueQueueSyncTaskSyncTask1dispatch2enqueue()345run()6dequeue()7

例項程式碼

某系統的告警功能被封裝在一個模組中。告警模組的主要功能是將其接收到的告警資訊傳送到告警伺服器上。它有一下兩個問題:一、告警資訊最終要通過網路連結傳送高告警伺服器上的。二、當某個告警產生的條件持續存在是,相應的告警資訊會反覆地被髮送到告警模組。
這兩個問題前者是一個較慢的操作,後者是一個較快的操作。比較適合本模式。
示例業務模組對告警模組的呼叫

public class SampleAlarmClient {
    private final static Logger logger = Logger
        .getLogger(SampleAlarmClient.class);
    //告警日誌抑制閥值
    private static final int ALARM_MSG_SUPRESS_THRESHOLD = 10;
    static{
        //初始化告警模組
        AlarmMgr.getInstance().init();
    }

    public static void main
(String[]args){ SampleAlarmClient alarmClient = new SampleAlarmClient(); Connection dbConn = null; try{ dbConn = alarmClient.retrieveDBConnection(); }catch(Exception e){ final AlarmMgr alarmMgr = AlarmMgr.getInstance(); //告警被重複傳送至告警模組的次數 int duplicateSubmissionCount; String alarmId = "0000010000020"; final String alarmExtraInfo = "operation=GetDBConnection;detail=Failed" + "to get DB connection:" + e.getMessage(); dumplicateSubmissionCount = alarmMgr.sendAlarm(AlarmType.FAULT, alarmId, alarmExtraInfo); if(duplicateSubmissionCount <ALARM_MSG_SUPERSS_THRESHOLD){ logger.error("Alarm[" + alarmId +"] was raised more than" + alarmExtraInfo); }else{ if(duplicateSubmissionCount == ALARM_MSG_SUPRESS_THRESHOLD){ logger.error("Alarm[" + alarmId +"] was raised more than" + ALARM_MSG_SUPRESS_HRESHOLD + " times, it will no longer be logged."); } }return ; } alarmClient.doSomething(dbConn); } //獲取資料庫連線 private Connection retrieveDBCpnnection()throws Exception{ Connection cnn = null; //省略一些和模式無關的程式碼 return cnn; } private void doSomething(Connection conn){ //省略一些和模式無關的程式碼 } }

AlarmMgr

public class AlarmMgr {
    //儲存AlarmMgr類的唯一例項
    private static final AlarmMgr INSTANCE = new AlarmMgr();
    private volatile boolean shutdownRequested = false;
    //告警傳送執行緒
    private final AlarmSendint alarmSendingThread;
    private  AlarmMgr(){
        alarmSendingThread = new AlarmSendingThread();
    }

    public static AlarmMgr getInstance(){
        return INSTANCE;
    }

    //傳送告警
    public int sendAlarm(AlarmType type,Stirng id,String extraInfo){
        Debug.info("Trigger alarm " + type + "," + id + ',' + extraInfo);
        int duplicateSubmissionCount = 0;
        try{
            AlarmInfo alarmInfo =  new AlarmInfo(id,type);
            alarmInfo.setExtraInfo(extraInfo);
            duplicateSubmissionCount = alarmSendingThread.sendAlarm(alarmInfo);
        }catch(Throwable t){
            t.printStackTrace();
        }
        return duplicateSubmissionCount;
    }

    public void init(){
        alarmSendingThread.start();
    }

    public synchronized void shutdown(){
        if(shutdownRequested){
            throw new IllegalStateException("shutdown already requested!");
        }
        alarmSendingThread.terminate();
        shutdownRequested = true;
    }
}

AlarmSendingThread

public class AlarmSendingThread {
    private final AlarmAgent alarmAgent = new AlarmAgent();
    //告警佇列。模式角色:HalfSync/HalfAsync.Queue
    private final BlockingQueue<AlarmInfo> alaAtomicIntegerivate final ConcurrentMap<String,AtomicInteger> submittedAlarmRegistry;
    public AlarmSendingThread(){
        alarmQueue = new ArrayBlockingQueue<AlarmInfo>(100);
        submittedAlarmRegistry = new ConcurrentHashMap<String,AtomicInteger>();
        alarmAgent.init();
    }

    protected void doRun() throws Exception{
        AlarmInfo alarm;
        alarm = alarmQueue.take();
        terminationToken.reservations.decrementAndGet();
        try{
            //將告警資訊傳送至告警伺服器
            alarmAgent.sendAlarm(alarm);
        }catch(Exception e){
            e.printStackTrace();
        }
        /*處理恢復告警:將相應的故障告警從登錄檔中刪除,使得相應故障恢復後再次出現相同的故障,
         * 該故障資訊能夠上報到伺服器
         */
        if(AlarmType.RESUME == alarm.type){
            String key = AlarmType.FAULT.toString() + ':' + alarm.getId() + '@'
                + alarm.getExtraInfo();
            submittedAlarmRegistry.remove(key);
            key = AlarmType.RESUME.toString() + ':' + alarm.getId() + '@'
                + alarm.getExtraInfo();
            submittedAlarmRegistry.remove(key);
        }
    }

    public int sendAlarm(final AlarmInfo alarmInfo){
        AlarmType type = alarmInfo.type;
        String id = alarmInfo.getId();
        String extraInfo = alarmInfo.getExtraInfo();
        if(terminationToken.isToShutdown()){
            //記錄告警
            System.err.println("rejected alarm:" + id + "," + extraInfo);
            return -1;
        }
        int duplicateSubmissionCount = 0;
        try{
            AtomicInteger prevSibmittedCounter;
            prevSubmittedCounter
            submittedAlarmRegistry.putIfAbsent(type.toString()
                    + ':' + id + '@' + extraInfo,new AtomicInteger(0));
            if(null == preSubmittedCounter){
                terminationToken.reservations.incrementAndGet();
                alarmQueue.put(alarmInfo);
            }else{
                //故障未恢復,不用重複傳送該井資訊給伺服器,故僅增加計數
                duplicateSubmissionCount = prevSubmittedCounter.incrementAndGet();
            }
        }catch(Throwable t){
            t.printStackTrace();
        }
        return duplicateSubmissionCount;
    }

    protected void doCleanup(Exception exp){
        if(null !=exp && !(exp instanceof InterruptedException)){
            exp.printStackTrace();
        }
        alarmAgent.disconnecte();
    }
}

AlarmAgent

//負責連結告警伺服器,併發送告警資訊至告警伺服器
public class AlarmAgnet {
    //用於記錄AlarmAgent是否連線上告警伺服器
    private volatile boolean connectedToServer = false;

    //模式角色: GuardedSuspension.Blocker
    private final Predicate agentConnected = new Predicate(){
        @Override
        public boolean evaluate(){
            return connectedToServer;
        }
    };

    //模式角色;GuardedSuspension.Blocker
    private final Blocker blocker = new ConditionVarBlocker();
    //心跳定時器
    private final Timer heartbeatTimer = new Timer(true);
    //省略一些和模式無關的程式碼
    //傳送告警資訊
    public void sendAlarm(final AlarmInfo alarm) throws Exception{
        //可能需要等待,直到AlarmAgent連線上告警伺服器(或者連線中斷後重新連線上伺服器)
        //模式角色:GuardedSuspension.GuardedAction
        GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected){
            public Void call()throws Exception{
                doSendAlarm(alarm);
                return null;
            }
        };
        blocker.callWhisGuard(guardedAction);
    }

    //通過網路連線將告警資訊傳送給告警伺服器
    private void doSendAlarm(AlarmInfo alarm){
        //忽略其他與設計模式無關的程式碼
        Debug.info("sending alarm " + alarm);

        //模擬傳送告警至告警伺服器的耗時
        try{
            Thread.sleep(50);
        }catch(Exception e){

        }
    }

    public void init(){
        //忽略其他與設計模式無關的程式碼
        //告警連結執行緒
        Thread connectingThread = nerw Thread(new ConnectingTask());
        connectingThread.start();
        heartbeatTimer.schedule(new HeartbeatTask(), 60000,2000);
    }

    public void disconnect(){
        //忽略其他與設計模式無關的程式碼
        Debug.info("disconnected from alarm server.");
        connectedToServer = false;
    }

    protected void onConnected(){
        try{
            blocker.signalAfter(new Callable<Boolean>(){
                @Override
                public Boolean call(){
                    connectedtoServer = true;
                    Debug.info("connected to server");
                    return Boolean.TRUE;
                }
            });
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    protected void onDisconnected(){
        connectedToServer = false;
    }

    //負責與告警伺服器建立網路連線
    private class ConnectingTask implemnets Runnable{
        @Override
        public void run(){
            //忽略其他與設計模式無關的程式碼
            //模擬連結操作耗時
            try{
                Thread.sleep(100);
            }catch (InterruptedException e){
                ;
            }
            onConnected();
        }
    }

    //心跳定時任務:定時檢查與告警伺服器的連線是否正常,發現連線已超後自動重新連線
    private class HeartbaatTask extends TimerTask{
        //忽略其他與設計模式無關的程式碼

        public void run(){
            //忽略其他與設計模式無關的程式碼
            if(!testConnection()){
                onDisconnected();
                reconnect();
            }
        }
        private boolean testConnection(){
            //忽略其他與設計模式無關的程式碼
            return true;
        }

        private void reconnect(){
            ConnectingTask connectingThread = new ConnectingTask();
            //直接在心跳定時器執行緒中執行
            connectingThread.run();
        }
    }
}

模式的優點

本模式既發揮了非同步程式設計的優勢——增加系統的併發性,減少不必要的等待,又保持了同步程式設計的簡單性。Half-sync/Half-async(半同步/半非同步)模式通過把低階任務或者耗時較短的任務安排在同步層,減少了客戶端的等待,有利於提升系統的吞吐率。而高階任務或耗時較長的任務被安排在同步層,這使得我們可以在不影響客戶端程式碼處理效能的情況下保持了同步程式設計的簡單性。

模式需要注意的問題

  1. 佇列積壓:Half-sync/Half-async(半同步/半非同步)模式可以看成是生產者消費者模式的一個例項,也有可能在Queue中積壓物件,所以同步和非同步的平衡處理問題也是一大問題,具體處理和生產者消費者模式一樣。
  2. 避免同步層處理過慢:同步層中的高階任務往往涉及I/O這種比較慢的操作,這些任務執行的快慢往往取決於其依賴的外部資源,而這些外部資源又往往不在我們的控制範圍之內,因此要避免同步層處理過慢,還要從同步層自身的涉及入手。