Vert.x系列(五)--ContextImpl原始碼分析
前言:
執行緒安全是Vert.x的重要特性,但這一特性是由它依賴的netty實現的,Vert.x只是直接拿過來使用。
這裡涉及到很多個類。
ContextImpl、EventLoopContext、NioEventLoop、和NioEventLoop的父類SingleThreadEventLoop、和NioEventLoop的爺爺類SingleThreadEventExecutor。
原理:
Netty定義了EventExecutor事件執行器,用做對任務處理的封裝。執行器內部維護了Queue<Runable>
,實現了任務的順序執行。還定義了MultithreadEventExecutorGroup類,維護陣列變數EventExecutor[] children,實現了多核CPU的利用; (陣列佇列結構,非常像Hashmap的陣列連結串列結構)。一個Verticle和一個ContextImpl對應,再有一個ContextImpl和一個EventExecutor對應,使所有對Verticle的操作都在一個Queue<Runable>中依次執行,實現了執行緒安全。
程式碼:
程式碼1.構造器
對於佔了大部分的普通Verticle來說一般來說,會依次由VertxImpl.getOrCreateContext()、createEventLoopContext()、EventLoopContext構造方法、ContextImpl構造方法呼叫後,進入ContextImpl類
在建立ContextImpl 時 ,這下面的三個方法(或構造方法),
// 利用next(),從group中取一個。next()也實現了對group的平衡獲取
private static EventLoop getEventLoop(VertxInternal vertx) { EventLoopGroup group = vertx.getEventLoopGroup(); if (group != null) { return group.next(); } else { return null; } }
// 需要注意this的第2個引數是getEventLoop(vertx)方法的呼叫。才
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config, ClassLoader tccl) { this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl); }
// 簡單的賦值
protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) { if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) { log.warn("You have disabled TCCL checks but you have a custom TCCL to set."); } this.deploymentID = deploymentID; this.config = config; this.eventLoop = eventLoop; this.tccl = tccl; this.owner = vertx; this.workerPool = workerPool; this.internalBlockingPool = internalBlockingPool; this.orderedTasks = new TaskQueue(); this.internalOrderedTasks = new TaskQueue(); this.closeHooks = new CloseHooks(log); }
完成對屬性private final EventLoop eventLoop;的賦值,即對ContextImpl和EventLoop的1對1繫結。
VertxImpl的構造方法中,會對它的成員變數 eventLoopGroup 賦值
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
在eventLoopGroup()方法為:
public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory threadFactory, int ioRatio) { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory); eventLoopGroup.setIoRatio(ioRatio); return eventLoopGroup; }
可以看到例項化了一個 NioEventLoopGroup 作為返回值。NioEventLoopGroup 就是若干個NioEventLoop的封裝,主要還是看NioEventLoop。
用ctrl+alt+U檢視下類圖,發現NioEventLoop的繼承結構有點複雜。可以看到 Executor、SingleThreadEventExecutor。
Executor 定義了 void execute(Runnable command); -- 處理任務的方法
SingleThreadEventExecutor 實現了void execute(Runnable command);
並定義了重要的任務佇列 private final Queue<Runnable> taskQueue;
也看看 NioEventLoopGroup的類圖:
在他的父類MultithreadEventExecutorGroup,定義了private final EventExecutor[] children;
那麼,對前面的eventLoopGroup()方法裡的
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);
這句話歸納,就是擁有 EventExecutor[] children; 的物件,而EventExecutor實現了對Queue<Runnable> taskQueue;的操作。就是“原理”裡說的陣列佇列結構。
程式碼2. runOnContext
對Verticle的操作,最後都會統一到 ContextImpl.runOnContext()方法處理,比如EventBusImpl.deliverToHandler()
runOnContext作為入口方法很簡單:
// Run the task asynchronously on this same context @Override public void runOnContext(Handler<Void> task) { try { executeAsync(task); } catch (RejectedExecutionException ignore) { // Pool is already shut down } }
executeAsync 有 abstract 關鍵字修飾,需要檢視 ContextImpl 的子類EventLoopContext ,看看它是怎麼實現的
public void executeAsync(Handler<Void> task) { // No metrics, we are on the event loop. nettyEventLoop().execute(wrapTask(null, task, true, null)); }
這個wrapTask(程式碼略)方法把屬於Vertx的Handler封裝成JDK的Runable,傳給netty框架處理。再使用execute()執行。下面的邏輯就是netty如何處理Runnable.
程式碼3 SingleThreadEventExecutor.execute()
execute() 最上層的介面Executor定義的。NioEventLoop的父類SingleThreadEventExecutor 重寫了此方法.SingleThreadEventExecutor去執行execute() ,自己仍然還是一個代理,會把真正執行執行執行緒的邏輯(類似方法名doExecute做的事情)的邏輯交給 private final Executor executor;執行
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); // 對Queue新增 addTask(Runnable task)--offerTask(Runnable task) --taskQueue.offer(task); 這一系列操作, // 完成了對 Queue<Runnable>的新增操作。 addTask(task); if (!inEventLoop) { // 執行 //SingleThreadEventExecutor.execute--> SingleThreadEventExecutor.startThread--> // SingleThreadEventExecutor.doStartThread. -->成員 Executor executor的execute(),實現是ThreadPerTaskExecutor的execute() startThread(); // 對Queue減少 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
這個Executor executor 是在SingleThreadEventExecutor的構造方法中例項化的ThreadPerTaskExecutor,是屬於Netty框架的。但是,ThreadPerTaskExecutor包含一個介面屬性ThreadFactory threadFactory。針對Vertx框架的場景,new ThreadPerTaskExecutor(threadFactory) 中的 threadFactory是屬於Vertx框架的VertxThreadFactory。
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { // 粗體程式碼 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler); }
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; // 這個實際是Vertx框架下的VertxThreadFactory public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); // 最最最底層的Thread.start()方法。 } }
這個變數的源頭,很早很早前,由VertxImpl在呼叫時傳入的
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
所以,弄到現在,各種Factory包裹,N層邏輯。才最終還是使用抽象工廠模式,呼叫了Vertx實現的工廠。
需要注意的是 , NioEventLoop重寫了newTaskQueue()方法,
@Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
所以Queue<Runnable> taskQueue 擁有的不是在SingleThreadEventExecutor.newTaskQueue()裡的 LinkedBlockingQueue , 而是 MpscUnboundedArrayQueue。
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }