1. 程式人生 > >Jetty NIO模型

Jetty NIO模型

概述

jetty NIO是典型reactor模型,如下圖所示:


即:mainReactor負責監聽server socket,接受新連線,並將建立的socket分派給subReactor。subReactor負責多路分離已連線的socket,讀寫網路資料,扔給worker執行緒池來處理。本文主要是講解jetty中mainReactor、subReactor、執行緒池的實現。

mainReactor

jetty中的server就相當於一個容器,一個jetty容器包含多個聯結器和一個執行緒池,聯結器實現了LifeCycle介面,隨容器啟動而啟動,下圖是聯結器啟動後,監聽server socket,建立連線的過程:


可見,jetty利用了執行緒池來建立連線,每一個連線任務被當成一個job被放到了job佇列裡面,負責連線的執行緒會從佇列中取出任務來執行,將得到的ServerSocket交給subReactor,下面來看subReactor的實現。

subReactor

這裡需要提一下jetty nio很重要的一個類SelectorManager,它負責channel註冊,select,wakeup等操作。在SelectorManager中有SelectSet陣列,可以把SelectSet理解為SelectorManager的代理,因為真正做事的是SelectSet,這裡面SelectSet設計為一個數組,應該也是分而治之的思想,讓一個selector監聽更少的selectionkey。

SelectSet中有一個非常重要的成員changes,changes中存放了所有有變化的channel、endpoint、attachement。分別在以下情況觸發addChannel方法:當有新的通道加入時,當有新的事件到來時,當有資料到來時。

subReactor的執行流程如下圖:


在這裡導致addChange除了selectorManager.register之外,還有endpoint.updatekey()以及selectionkey資料有變化時等等。

ThreadPool

jetty的執行緒池相當簡單,其實mainReactor與subReactor共用同一個執行緒池,執行緒池的實現類是QueuedThreadPool,當然在jetty.xml中可以設定自己的執行緒池類。簡單看下執行緒池的run方法

private Runnable _runnable = new Runnable()
  {
      public void run()
      {
          boolean shrink=false;
          try
          {
              Runnable job=_jobs.poll();
              while (isRunning())
              {
                  // Job loop
                  while (job!=null && isRunning())
                  {
                      runJob(job);
                      job=_jobs.poll();
                  }

                  // Idle loop
                  try
                  {
                      _threadsIdle.incrementAndGet();

                      while (isRunning() && job==null)
                      {
                          if (_maxIdleTimeMs<=0)
                              job=_jobs.take();
                          else
                          {
                              // maybe we should shrink?
                              final int size=_threadsStarted.get();
                              if (size>_minThreads)
                              {
                                  long last=_lastShrink.get();
                                  long now=System.currentTimeMillis();
                                  if (last==0 || (now-last)>_maxIdleTimeMs)
                                  {
                                      shrink=_lastShrink.compareAndSet(last,now) &&
                                      _threadsStarted.compareAndSet(size,size-1);
                                      if (shrink)
                                          return;
                                  }
                              }
                              job=idleJobPoll();
                          }
                      }
                  }
                  finally
                  {
                      _threadsIdle.decrementAndGet();
                  }
              }
          }
          catch(InterruptedException e)
          {
         		...
          }
      }
  };
1、執行緒池有個最小執行緒數_minThreads=8,當執行緒池啟動時會建立_minThreads個執行緒,並啟動它們。第12行,執行緒從任務佇列中取出一個任務,並執行。這裡使用了while迴圈表示這裡會阻塞等待任務執行完,當任務佇列中沒有任務時,才會退出while迴圈;

2、退出while迴圈後,這個執行緒就空閒了,在這裡需要有個回收策略,在等待_maxIdleTimeMs時間後,如果當前執行緒數大於_minThreads時,就會回收這個執行緒。

那麼執行緒數什麼時候會大於_minThreads?來看看dispatch()方法中的核心程式碼

 // If we had no idle threads or the jobQ is greater than the idle threads
                if (idle==0 || jobQ>idle)
                {
                    int threads=_threadsStarted.get();
                    if (threads<_maxThreads)
                        startThread(threads);
                }
如果沒有空閒的執行緒或者空閒執行緒數太少,在保證執行緒數沒有超過_maxThreads時會新建執行緒。