依賴Jetty實現流轉發功能的實踐
最近在寫檔案的下載服務,主要功能是將請求URL呼叫業務介面進行解析,解析到真實的下載地址,然後將下載流透傳出去。這一部分之前使用nginx-lua實現,由於維護過於困難(優雅上下線、截流、熔斷能力缺乏, 證書相關運維支援缺失),因此改用Java實現。本文也主要論述如何依賴Jetty實現流轉發功能。由於對Jetty的依賴非常深,建議使用EmbeddedJettyServer以引入和統一Jetty依賴。
為什麼要閱讀Jetty高層IO程式碼
- 下載服務必須採用AIO,以避免客戶端或儲存端頻寬對服務表現的影響。
- 對於AIO機制,servlet-api中並沒有相關標準。
- 自建AIO事件迴圈或自實現Http協議難度大,而且需要較長時間測試以保證程式碼健壯性。
- Jetty作為被依賴Web容器,本身就具有AIO能力。
開啟Servlet的非同步支援
首先,需要在HttpServletRequest中開啟非同步支援,如果不開啟非同步支援,會造成請求提前被返回引起流中斷(servlet-api 3.1.0版本以上支援此功能)。
private void startAsync(HttpServletRequest request) {
AsyncContext asyncContext = request.startAsync();
// 非同步任務設為不會超時
asyncContext.setTimeout(0);
}
確定非同步寫入Api
之後,我們需要了解Jetty是如何寫入Http響應體的,我們能不能呼叫Jetty的方法非同步寫入Http響應體。
最直接的想法就是檢視一下servlet-api中的javax.servlet.ServletOutputStream的實現類,有沒有相關非同步寫入的方法。
通過程式碼閱讀,很容易就可以找到javax.servlet.ServletOutputStream的實現類,也就是我們的主角org.eclipse.jetty.server.HttpOutput。
它身上有以下AIO相關的方法(僅擷取三個關心的方法):
/** * Asynchronous send of whole content. * * @param content The whole content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ByteBuffer content, final Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback); _written += content.remaining(); write(content, true, new Callback.Nested(callback) { @Override public void succeeded() { closed(); super.succeeded(); } @Override public void failed(Throwable x) { abort(x); super.failed(x); } }); } /** * Asynchronous send of stream content. * The stream will be closed after reading all content. * * @param in The stream content to send * @param callback The callback to use to notify success or failure */ public void sendContent(InputStream in, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("sendContent(stream={},{})", in, callback); new InputStreamWritingCB(in, callback).iterate(); } /** * Asynchronous send of channel content. * The channel will be closed after reading all content. * * @param in The channel content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ReadableByteChannel in, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("sendContent(channel={},{})", in, callback); new ReadableByteChannelWritingCB(in, callback).iterate(); }
那麼問題來了,ByteBuffer、InputStream和ReadableByteChannel各有什麼限制呢?畢竟Jetty是一個Web容器,定位是資源的產生方,而不是消費方;產生流是沒有任何延遲的,但儲存服務的延遲是不可忽略的。那讓我們分別來剖析一下。
首先看ByteBuffer的非同步實現,實際包裝了一下org.eclipse.jetty.server.HttpChannel的write方法,方法描述如下:
/**
* <p>Non-Blocking write, committing the response if needed.</p>
* Called as last link in HttpOutput.Filter chain
* @param content the content buffer to write
* @param complete whether the content is complete for the response
* @param callback Callback when complete or failed
*/
@Override
public void write(ByteBuffer content, boolean complete, Callback callback)
可以看到,使用ByteBuffer非同步寫方法是不合適的,因為此時complete引數是true,也就是說必須要把所有內容全部放在ByteBuffer中,否則會引起EofException或WritePendingException(其後的呼叫)。
那麼InputStream的非同步實現能不能用呢?我們來看一下相關實現:
@Override
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
if (_eof)
{
if (LOG.isDebugEnabled())
LOG.debug("EOF of {}", this);
// Handle EOF
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Action.SUCCEEDED;
}
// Read until buffer full or EOF
int len = 0;
while (len < _buffer.capacity() && !_eof)
{
int r = _in.read(_buffer.array(), _buffer.arrayOffset() + len, _buffer.capacity() - len);
if (r < 0)
_eof = true;
else
len += r;
}
// write what we have
_buffer.position(0);
_buffer.limit(len);
_written += len;
write(_buffer, _eof, this);
return Action.SCHEDULED;
}
可以看到InputStream的實現是同步的讀取一個buffer,然後非同步傳送。想一下,Jetty的作為資源的生產方,讀取的延時可以忽略不記,這樣的實現是合適的。但是儲存方的延時不能忽略,仍然對業務不適用。
現在只剩下ReadableByteChannel的實現了,我們來看一下相關實現:
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
if (_eof)
{
if (LOG.isDebugEnabled())
LOG.debug("EOF of {}", this);
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Action.SUCCEEDED;
}
// Read from stream until buffer full or EOF
BufferUtil.clearToFill(_buffer);
while (_buffer.hasRemaining() && !_eof)
_eof = (_in.read(_buffer)) < 0;
// write what we have
BufferUtil.flipToFlush(_buffer, 0);
_written += _buffer.remaining();
write(_buffer, _eof, this);
return Action.SCHEDULED;
}
總算有符合要求的了!那有沒有planB呢?畢竟提供Channel的非同步HttpClient感覺不多啊…那換個思路,我能不能拿到Jetty的Channel呢?畢竟這樣我就可以用所有Channel的Api了,看了一下,HttpOutput裡還真有,666
public HttpChannel getHttpChannel()
{
return _channel;
}
4.確定HttpClient
HttpClient,第一個想到的是apache的HttpAsyncClient,看了一下,還真的提供了ReadableByteChannel(用法可以參考org.apache.http.nio.client.methods.ZeroCopyConsumer)。但是可能公司內jar包被人篡改了,也可能是HttpAsyncClient實現的channel並不標準,總之透傳的內容是損壞的….巨扎心….
那我還能用什麼Client呢?偶然間,居然發現Jetty本身就有HttpClient,還有這種操作?那就試一把
import com.meituan.xm.mbox.utils.response.RequestContext;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;
import java.io.IOException;
import java.nio.ByteBuffer;
public class StreamReadListener extends Response.Listener.Adapter {
private HttpOutput httpOutput;
public StreamReadListener(RequestContext requestContext) throws IOException {
httpOutput = (HttpOutput) requestContext.getResponse().getOutputStream();
}
@Override
public void onContent(Response response, ByteBuffer content, Callback callback) {
httpOutput.getHttpChannel().write(content, false, callback);
}
}
不愧是原裝的HttpClient,自己的Server呼叫自己Client的回撥,自己解決呼叫異常,連透傳都不用適配, 6的飛起。至此,所有關鍵問題已全部解決。