Curator原始碼閱讀 - ConnectionState的管理與監聽
阿新 • • 發佈:2019-12-27
看看Curator
框架 為實現對 連線狀態ConnectionState
的監聽,都是怎麼構造框架的。後面我們也可以應用到業務的各種監聽中。
Curator2.13實現
介面 Listener
Listener
介面,給使用者實現stateChange()
傳入新的狀態,使用者實現對這新的狀態要做什麼邏輯處理。
public interface ConnectionStateListener { /** * Called when there is a state change in the connection * @param client the client * @param newState the new state */ public void stateChanged(CuratorFramework client, ConnectionState newState); }
介面 Listenable
/** * Abstracts a listenable object */ public interface Listenable<T> { /** * Add the given listener. The listener will be executed in the containing instance's thread. * * @param listener listener to add */ public void addListener(T listener); /** * Add the given listener. The listener will be executed using the given * executor * * @param listener listener to add * @param executor executor to run listener in */ public void addListener(T listener, Executor executor); public void removeListener(T listener); }
抽象類 ListenerContainer<T> implements Listenable<T>
/** * Abstracts an object that has listeners 裝Listener的容器 * <T> Listener型別 */ public class ListenerContainer<T> implements Listenable<T> { private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap(); @Override public void addListener(T listener) { addListener(listener, MoreExecutors.sameThreadExecutor()); } @Override public void addListener(T listener, Executor executor) { listeners.put(listener, new ListenerEntry<T>(listener, executor)); } /** * 對 Listener 列表的遍歷進行封裝 * Utility - apply the given function to each listener. * @param function function to call for each listener */ public void forEach(final Function<T, Void> function) { for ( final ListenerEntry<T> entry : listeners.values() ) { entry.executor.execute ( new Runnable() { @Override public void run() { try { function.apply(entry.listener); } catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); log.error(String.format("Listener (%s) threw an exception", entry.listener), e); } } } ); } } public void clear() { listeners.clear(); } public int size() { return listeners.size(); } }
ConnectionStateManager
// to manage connection state
public class ConnectionStateManager {
// 又是佇列? 玩訊息什麼的都是用佇列。現在是存放 ConnectionState
BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
// 持有 ListenerContainer
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
/**
* Start the manager,起一個執行緒去執行 processEvents(),要是這執行緒掛了怎麼辦?異常怎麼處理的?框架怎麼處理的。。
*/
public void start()
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
@Override
public void close()
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
service.shutdownNow();
listeners.clear();
}
}
// 對不斷產生的 ConnectionState 進行處理,生產者?
private void processEvents(){
// 當 ConnectionStateManager 啟動完成
while ( state.get() == State.STARTED )
{
// 不斷從佇列拿 Conection 狀態
final ConnectionState newState = eventQueue.take();
// 對每個 狀態監聽介面 應用 Function, 狀態監聽介面作為 主語
// forEach 是 listeners封裝的 遍歷所有 listener 的方法而已。。。
listeners.forEach(
new Function<ConnectionStateListener, Void>() {
// ConnectionStateListener是我們自己要實現的介面,stateChanged是要實現的方法
@Override
public Void apply(ConnectionStateListener listener)
{
listener.stateChanged(client, newState);
return null;
}
}
);
/**
上面這段
如果沒有封裝 Listener 到 ListenerContainer 的話, 所有 Listener 就是個 List列表,就直接調 Listener 的 stateChanged 方法了吧。
for Listener {
listener.stateChanged(client, newState);
}
因為 封裝 Listener 到 ListenerContainer了, 上面的 forEach 方法內部就可以有些內部實現,比如 對每個 Listener 都是用對應的 executor 來執行。
**/
}
}
// 上面的方法是處理 ConnectionState 的,那 ConnectionState 是怎麼傳進來的呢? 生產者?
/**
* Post a state change. If the manager is already in that state the change
* is ignored. Otherwise the change is queued for listeners.
*
* @param newConnectionState new state
* @return true if the state actually changed, false if it was already at that state
*/
public synchronized boolean addStateChange(ConnectionState newConnectionState)
{
// 先判斷 ConnectionStateManager 是否已經啟動好, state 是內部 Enum
if ( state.get() != State.STARTED )
{
return false;
}
ConnectionState previousState = currentConnectionState;
if ( previousState == newConnectionState )
{
return false;
}
ConnectionState localState = newConnectionState;
// !!!
notifyAll();
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
log.warn("ConnectionStateManager queue full - dropping events to make room");
}
return true;
}
}
呼叫
啟動
// 啟動 connectionStateManager,不斷檢測 connectionState 變化
connectionStateManager.start(); // must be called before client.start()
// 來個匿名預設的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
};
this.getConnectionStateListenable().addListener(listener);
生產 ConnectionState
,把zk
那裡拿到的state
轉一下,然後addStateChange
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
suspendConnection();
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
複用?
還有其他各種Listener
,都可以放到 ListenerContainer
private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
/**
* Receives notifications about errors and background events
*/
public interface CuratorListener {
/**
* Called when a background task has completed or a watch has triggered
* @param event the event
* @throws Exception any errors
*/
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
public interface UnhandledErrorListener
{
/**
* Called when an exception is caught in a background thread, handler, etc. Before this
* listener is called, the error will have been logged and a {@link ConnectionState#LOST}
* event will have been queued for any {@link ConnectionStateListener}s.
* @param message Source message
* @param e exception
*/
public void unhandledError(String message, Throwable e);
}
總結一下原始碼技巧
ConnectionStateManager
就是個 生產者消費者模式的程式碼,特點就是:public addStateChange()
暴露給外部使用者生產ConnectionState
,通過佇列eventQueue
傳遞,private processEvents()
在內部對ConnectionState
進行消費。- 直接
new
匿名類,對介面進行預設實現。 - 對
Listener
列表物件進行Container
封裝,然後 封裝foreach
方法,傳入Function
介面 就是foreach
每個元素要執行的業務邏輯,方法體就可以加一些其他福利。