1. 程式人生 > >Curator原始碼閱讀 - ConnectionState的管理與監聽

Curator原始碼閱讀 - ConnectionState的管理與監聽

看看Curator框架 為實現對 連線狀態ConnectionState的監聽,都是怎麼構造框架的。後面我們也可以應用到業務的各種監聽中。

Curator2.13實現

介面 Listener

Listener介面,給使用者實現stateChange()傳入新的狀態,使用者實現對這新的狀態要做什麼邏輯處理。

public interface ConnectionStateListener
{
    /**
     * Called when there is a state change in the connection
     * @param client the client
     * @param newState the new state
     */
    public void stateChanged(CuratorFramework client, ConnectionState newState);
}

介面 Listenable

/**
 * Abstracts a listenable object
 */
public interface Listenable<T>
{
    /**
     * Add the given listener. The listener will be executed in the containing instance's thread.
     *
     * @param listener listener to add
     */
    public void     addListener(T listener);

    /**
     * Add the given listener. The listener will be executed using the given
     * executor
     *
     * @param listener listener to add
     * @param executor executor to run listener in
     */
    public void     addListener(T listener, Executor executor);

    public void     removeListener(T listener);
}

抽象類 ListenerContainer<T> implements Listenable<T>

/**
 * Abstracts an object that has listeners 裝Listener的容器
 * <T> Listener型別
 */
public class ListenerContainer<T> implements Listenable<T>
{
    private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();

    @Override
    public void addListener(T listener)
    {
        addListener(listener, MoreExecutors.sameThreadExecutor());
    }

    @Override
    public void addListener(T listener, Executor executor)
    {
        listeners.put(listener, new ListenerEntry<T>(listener, executor));
    }
    
    /**
     * 對 Listener 列表的遍歷進行封裝
     * Utility - apply the given function to each listener. 
     * @param function function to call for each listener
     */
    public void forEach(final Function<T, Void> function)
    {
        for ( final ListenerEntry<T> entry : listeners.values() )
        {
            entry.executor.execute
            (
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            function.apply(entry.listener);
                        }
                        catch ( Throwable e )
                        {
                            ThreadUtils.checkInterrupted(e);
                            log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
                        }
                    }
                }
            );
        }
    }
    
    
    public void clear()
    {
        listeners.clear();
    }

    public int size()
    {
        return listeners.size();
    }
    
}

ConnectionStateManager

// to manage connection state
public class ConnectionStateManager {
    // 又是佇列? 玩訊息什麼的都是用佇列。現在是存放 ConnectionState
    BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
    // 持有 ListenerContainer
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
    
    /**
     * Start the manager,起一個執行緒去執行 processEvents(),要是這執行緒掛了怎麼辦?異常怎麼處理的?框架怎麼處理的。。
     */
    public void start()
    {
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }
    
    @Override
    public void close()
    {
        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
        {
            service.shutdownNow();
            listeners.clear();
        }
    }
    
    // 對不斷產生的 ConnectionState 進行處理,生產者?
     private void processEvents(){
        // 當 ConnectionStateManager 啟動完成
        while ( state.get() == State.STARTED )
        {
            // 不斷從佇列拿 Conection 狀態
            final ConnectionState newState = eventQueue.take();
            // 對每個 狀態監聽介面  應用 Function, 狀態監聽介面作為 主語
            // forEach 是 listeners封裝的 遍歷所有 listener 的方法而已。。。
            listeners.forEach(
                new Function<ConnectionStateListener, Void>() {
                    // ConnectionStateListener是我們自己要實現的介面,stateChanged是要實現的方法
                    @Override 
                    public Void apply(ConnectionStateListener listener)
                    {
                        listener.stateChanged(client, newState);
                        return null;
                    }
                }
            );
            /**
            上面這段
            如果沒有封裝 Listener 到 ListenerContainer 的話, 所有 Listener 就是個 List列表,就直接調 Listener 的 stateChanged 方法了吧。
            for Listener {
                listener.stateChanged(client, newState);
            }
            
            因為 封裝 Listener 到 ListenerContainer了, 上面的 forEach 方法內部就可以有些內部實現,比如 對每個 Listener 都是用對應的 executor 來執行。
            **/
        }
    }
    
    
    // 上面的方法是處理 ConnectionState 的,那 ConnectionState 是怎麼傳進來的呢? 生產者?
    /**
     * Post a state change. If the manager is already in that state the change
     * is ignored. Otherwise the change is queued for listeners.
     *
     * @param newConnectionState new state
     * @return true if the state actually changed, false if it was already at that state
     */
    public synchronized boolean addStateChange(ConnectionState newConnectionState)
    {
        // 先判斷 ConnectionStateManager 是否已經啟動好, state 是內部 Enum
        if ( state.get() != State.STARTED )
        {
            return false;
        }

        ConnectionState previousState = currentConnectionState;
        if ( previousState == newConnectionState )
        {
            return false;
        }
        ConnectionState localState = newConnectionState;
        // !!!
        notifyAll();

        while ( !eventQueue.offer(state) )
        {
            eventQueue.poll();
            log.warn("ConnectionStateManager queue full - dropping events to make room");
        }
        return true;
    }
    
}
    
   

呼叫

啟動

// 啟動 connectionStateManager,不斷檢測 connectionState 變化
connectionStateManager.start(); // must be called before client.start()
// 來個匿名預設的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
        {
            logAsErrorConnectionErrors.set(true);
        }
    }
};
this.getConnectionStateListenable().addListener(listener);

生產 ConnectionState,把zk那裡拿到的state轉一下,然後addStateChange

void validateConnection(Watcher.Event.KeeperState state)
{
    if ( state == Watcher.Event.KeeperState.Disconnected )
    {
        suspendConnection();
    }
    else if ( state == Watcher.Event.KeeperState.Expired )
    {
        connectionStateManager.addStateChange(ConnectionState.LOST);
    }
    else if ( state == Watcher.Event.KeeperState.SyncConnected )
    {
        connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
    }
    else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
    {
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
    }
}

複用?

還有其他各種Listener,都可以放到 ListenerContainer

private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;


/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener {
    /**
     * Called when a background task has completed or a watch has triggered
     * @param event the event
     * @throws Exception any errors
     */
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

public interface UnhandledErrorListener
{
    /**
     * Called when an exception is caught in a background thread, handler, etc. Before this
     * listener is called, the error will have been logged and a {@link ConnectionState#LOST}
     * event will have been queued for any {@link ConnectionStateListener}s.
     * @param message Source message
     * @param e exception
     */
    public void     unhandledError(String message, Throwable e);
}

總結一下原始碼技巧

  1. ConnectionStateManager 就是個 生產者消費者模式的程式碼,特點就是: public addStateChange() 暴露給外部使用者生產 ConnectionState,通過佇列eventQueue傳遞,private processEvents()在內部對ConnectionState進行消費。
  2. 直接new匿名類,對介面進行預設實現。
  3. Listener列表物件進行Container封裝,然後 封裝foreach方法,傳入Function介面 就是foreach每個元素要執行的業務邏輯,方法體就可以加一些其他福利。