1. 程式人生 > >依賴Jetty實現流轉發功能的實踐

依賴Jetty實現流轉發功能的實踐

最近在寫檔案的下載服務,主要功能是將請求URL呼叫業務介面進行解析,解析到真實的下載地址,然後將下載流透傳出去。這一部分之前使用nginx-lua實現,由於維護過於困難(優雅上下線、截流、熔斷能力缺乏, 證書相關運維支援缺失),因此改用Java實現。本文也主要論述如何依賴Jetty實現流轉發功能。由於對Jetty的依賴非常深,建議使用EmbeddedJettyServer以引入和統一Jetty依賴。

為什麼要閱讀Jetty高層IO程式碼

  1. 下載服務必須採用AIO,以避免客戶端或儲存端頻寬對服務表現的影響。
  2. 對於AIO機制,servlet-api中並沒有相關標準。
  3. 自建AIO事件迴圈或自實現Http協議難度大,而且需要較長時間測試以保證程式碼健壯性。
  4. Jetty作為被依賴Web容器,本身就具有AIO能力。

開啟Servlet的非同步支援

首先,需要在HttpServletRequest中開啟非同步支援,如果不開啟非同步支援,會造成請求提前被返回引起流中斷(servlet-api 3.1.0版本以上支援此功能)。

private void startAsync(HttpServletRequest request) {
    AsyncContext asyncContext = request.startAsync();
    // 非同步任務設為不會超時
    asyncContext.setTimeout(0);
} 

確定非同步寫入Api

之後,我們需要了解Jetty是如何寫入Http響應體的,我們能不能呼叫Jetty的方法非同步寫入Http響應體。

最直接的想法就是檢視一下servlet-api中的javax.servlet.ServletOutputStream的實現類,有沒有相關非同步寫入的方法。

通過程式碼閱讀,很容易就可以找到javax.servlet.ServletOutputStream的實現類,也就是我們的主角org.eclipse.jetty.server.HttpOutput。

它身上有以下AIO相關的方法(僅擷取三個關心的方法):

 /**
 * Asynchronous send of whole content.
 *
 * @param content  The whole content to send
 * @param callback The callback to use to notify success or failure
 */
public void sendContent(ByteBuffer content, final Callback callback)
{
    if (LOG.isDebugEnabled())
        LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback);

    _written += content.remaining();
    write(content, true, new Callback.Nested(callback)
    {
        @Override
        public void succeeded()
        {
            closed();
            super.succeeded();
        }

        @Override
        public void failed(Throwable x)
        {
            abort(x);
            super.failed(x);
        }
    });
}

/**
 * Asynchronous send of stream content.
 * The stream will be closed after reading all content.
 *
 * @param in       The stream content to send
 * @param callback The callback to use to notify success or failure
 */
public void sendContent(InputStream in, Callback callback)
{
    if (LOG.isDebugEnabled())
        LOG.debug("sendContent(stream={},{})", in, callback);

    new InputStreamWritingCB(in, callback).iterate();
}

/**
 * Asynchronous send of channel content.
 * The channel will be closed after reading all content.
 *
 * @param in       The channel content to send
 * @param callback The callback to use to notify success or failure
 */
public void sendContent(ReadableByteChannel in, Callback callback)
{
    if (LOG.isDebugEnabled())
        LOG.debug("sendContent(channel={},{})", in, callback);

    new ReadableByteChannelWritingCB(in, callback).iterate();
}

那麼問題來了,ByteBuffer、InputStream和ReadableByteChannel各有什麼限制呢?畢竟Jetty是一個Web容器,定位是資源的產生方,而不是消費方;產生流是沒有任何延遲的,但儲存服務的延遲是不可忽略的。那讓我們分別來剖析一下。

首先看ByteBuffer的非同步實現,實際包裝了一下org.eclipse.jetty.server.HttpChannel的write方法,方法描述如下:

/**
 * <p>Non-Blocking write, committing the response if needed.</p>
 * Called as last link in HttpOutput.Filter chain
 * @param content  the content buffer to write
 * @param complete whether the content is complete for the response
 * @param callback Callback when complete or failed
 */
@Override
public void write(ByteBuffer content, boolean complete, Callback callback)

可以看到,使用ByteBuffer非同步寫方法是不合適的,因為此時complete引數是true,也就是說必須要把所有內容全部放在ByteBuffer中,否則會引起EofException或WritePendingException(其後的呼叫)。

那麼InputStream的非同步實現能不能用呢?我們來看一下相關實現:

@Override
protected Action process() throws Exception
{
    // Only return if EOF has previously been read and thus
    // a write done with EOF=true
    if (_eof)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("EOF of {}", this);
        // Handle EOF
        _in.close();
        closed();
        _channel.getByteBufferPool().release(_buffer);
        return Action.SUCCEEDED;
    }

    // Read until buffer full or EOF
    int len = 0;
    while (len < _buffer.capacity() && !_eof)
    {
        int r = _in.read(_buffer.array(), _buffer.arrayOffset() + len, _buffer.capacity() - len);
        if (r < 0)
            _eof = true;
        else
            len += r;
    }

    // write what we have
    _buffer.position(0);
    _buffer.limit(len);
    _written += len;
    write(_buffer, _eof, this);
    return Action.SCHEDULED;
}

可以看到InputStream的實現是同步的讀取一個buffer,然後非同步傳送。想一下,Jetty的作為資源的生產方,讀取的延時可以忽略不記,這樣的實現是合適的。但是儲存方的延時不能忽略,仍然對業務不適用。

現在只剩下ReadableByteChannel的實現了,我們來看一下相關實現:

protected Action process() throws Exception
{
    // Only return if EOF has previously been read and thus
    // a write done with EOF=true
    if (_eof)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("EOF of {}", this);
        _in.close();
        closed();
        _channel.getByteBufferPool().release(_buffer);
        return Action.SUCCEEDED;
    }

    // Read from stream until buffer full or EOF
    BufferUtil.clearToFill(_buffer);
    while (_buffer.hasRemaining() && !_eof)
        _eof = (_in.read(_buffer)) < 0;

    // write what we have
    BufferUtil.flipToFlush(_buffer, 0);
    _written += _buffer.remaining();
    write(_buffer, _eof, this);

    return Action.SCHEDULED;
}

總算有符合要求的了!那有沒有planB呢?畢竟提供Channel的非同步HttpClient感覺不多啊…那換個思路,我能不能拿到Jetty的Channel呢?畢竟這樣我就可以用所有Channel的Api了,看了一下,HttpOutput裡還真有,666

public HttpChannel getHttpChannel()
{
    return _channel;
}

4.確定HttpClient

HttpClient,第一個想到的是apache的HttpAsyncClient,看了一下,還真的提供了ReadableByteChannel(用法可以參考org.apache.http.nio.client.methods.ZeroCopyConsumer)。但是可能公司內jar包被人篡改了,也可能是HttpAsyncClient實現的channel並不標準,總之透傳的內容是損壞的….巨扎心….

那我還能用什麼Client呢?偶然間,居然發現Jetty本身就有HttpClient,還有這種操作?那就試一把

import com.meituan.xm.mbox.utils.response.RequestContext;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;

import java.io.IOException;
import java.nio.ByteBuffer;

public class StreamReadListener extends Response.Listener.Adapter {
    private HttpOutput httpOutput;

    public StreamReadListener(RequestContext requestContext) throws IOException {
        httpOutput = (HttpOutput) requestContext.getResponse().getOutputStream();
    }

    @Override
    public void onContent(Response response, ByteBuffer content, Callback callback) {
        httpOutput.getHttpChannel().write(content, false, callback);
    }

}

不愧是原裝的HttpClient,自己的Server呼叫自己Client的回撥,自己解決呼叫異常,連透傳都不用適配, 6的飛起。至此,所有關鍵問題已全部解決。