學習tomcat-如何建立連線,處理請求
tomcat如何建立連線,處理請求
學習探討tomcat如何建立網路連線協議,並處理客戶端過來的請求
建立http網路連線,指定通訊協議
tomcat在建立時,會建立連線物件,負責處理客戶端的請求,基於socket
connector 連線 protocol 協議 endpoint終端 socket插座,埠連線
建立初始化
connector -> protocol -> endpoint -> socket
接收請求建立任務
acceptor.socket.acceptor()->
socketWrapper(攜帶通訊資訊)
-> poller(socketWrapper)
-> execute(socketWrapper) 建立執行緒
建立聯結器
Conector類
org.apache.catalina.connector.Connector
空參構造connector() -> connector(http/1.1)
/**
* Defaults to using HTTP/1.1 NIO implementation.
*/
public Connector() {
this("HTTP/1.1");
}
指定網路連線協議http11
類
org.apache.coyote.http11.Http11NioProtocol
-> new Http11NioProtocol()
public Http11NioProtocol() {
super(new NioEndpoint());
}
指定服務終端處理模型非阻塞nio
類
org.apache.tomcat.util.net.NioEndpoint
-> new NioEndPoint()
建立之後如何被啟動?見springboot啟動tomcat方式
終端處理執行緒和執行緒池初始化
啟動之後
NioEndpoint執行bind()方法,
一些初始化,繫結埠
@Override public void bind() throws Exception { initServerSocket(); setStopLatch(new CountDownLatch(1)); // Initialize SSL if needed initialiseSsl(); selectorPool.open(getName()); } //socket相關 initServerSocket()具體如下 // Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { //....... //根據平臺不同,反回具體底層類物件(windows,linux,unix) serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); //繫結地址和埠號 InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); //....... }
NioEndpoint初始化之後,呼叫start()執行startInternal()
程式碼如下
// Create worker collection
if (getExecutor() == null) {
//建立執行緒池
createExecutor();
}
initializeConnectionLatch();
// Start poller thread
// 建立客戶端佇列(客戶端過來的請求)
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
//建立接收遠端請求執行緒
startAcceptorThread();
初始化執行緒池配置
-> createExecutor() 用於處理使用者請求
指定 備用執行緒,對大執行緒數,佇列型別,超時時間,和執行緒工廠
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
建立Poller執行緒
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
建立Acceptor執行緒
protected void startAcceptorThread() {
acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor";
acceptor.setThreadName(threadName);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
處理請求的相關物件(執行緒)
Acceptor
類
org.apache.tomcat.util.net.Acceptor
Acceptor 負責迴圈等待遠端請求,將請求以socket形式攜帶資訊,呼叫setSocketOptions()將socket包裝配置為socketWrapper,
setSocketOptions: 對socket包裝處理配置,使用poller物件註冊到佇列,讓poller執行緒做後續的處理
Acceptor 類的run方法:
public void run() {
int errorDelay = 0;
//......以下省略部分程式碼
try {
// Loop until we receive a shutdown command
// 一直迴圈等待遠端請求
while (!stopCalled) {
// Accept the next incoming connection from the server socket
// 1 接收請求
socket = endpoint.serverSocketAccept();
// setSocketOptions() will hand the socket off to
// 2 處理請求,setSocketOptions() 內部呼叫poller 將新請求任務放入佇列
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
}
} finally {
stopLatch.countDown();
}
state = AcceptorState.ENDED;
}
Poller
類
org.apache.tomcat.util.net.NioEndpoint.Poller
Poller負責接收包裝後的socket請求,放入佇列,
並在run方法中迴圈去poll()請求任務,將與流讀寫有關的元件IOChannel Selector socketWrapper 繫結關聯
再通過selector獲取selectionKeys
迭代迴圈獲取對應的socket,提交任務(執行緒),執行緒讀寫處理socketWrapper等後續操作
public void run() {
// Loop until destroy() is called
while (true) {
// poller佇列任務處理 將IOChannel Selector socketWrapper 關聯
hasEvents = events();
//......省略
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// 非阻塞io api 任務處理
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper != null) {
// 如果有等待處理的任務,則處理
processKey(sk, socketWrapper);
//processKey內部會呼叫processSocket方法,最終用執行緒池提交任務
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
其他
events佇列
private final SynchronizedQueue<PollerEvent> events =
new SynchronizedQueue<>(); //事件佇列(socket請求)
//註冊請求到佇列
public void rigister(final NioSocketWrapper socketWrapper)
{
event = new PollerEvent(socketWrapper, OP_REGISTER);
addEvent(event);
}
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
events()繫結及後面的 processSocket()最終提交實際處理任務到執行緒
/**
* Processes events in the event queue of the Poller.
*
* @return <code>true</code> if some events were processed,
* <code>false</code> if queue was empty
*/
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
NioSocketWrapper socketWrapper = pe.getSocketWrapper();
SocketChannel sc = socketWrapper.getSocket().getIOChannel();
int interestOps = pe.getInterestOps();
if (sc == null) {
log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
socketWrapper.close();
} else if (interestOps == OP_REGISTER) {
try {
//註冊繫結
sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = sc.keyFor(getSelector());
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socketWrapper.close();
} else {
final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
if (attachment != null) {
// We are registering the key to start with, reset the fairness counter.
try {
int ops = key.interestOps() | interestOps;
attachment.interestOps(ops);
key.interestOps(ops);
} catch (CancelledKeyException ckx) {
cancelledKey(key, socketWrapper);
}
} else {
cancelledKey(key, socketWrapper);
}
}
}
if (running && !paused && eventCache != null) {
pe.reset();//清空任務socketWrapper
eventCache.push(pe);
}
}
return result;
}
setSocketOptions 中的socket任務註冊
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
//...... 部分省略
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
poller.register(socketWrapper);
return true;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
if (socketWrapper == null) {
destroySocket(socket);
}
}