小程聊微服務-自己動手擴充套件分散式呼叫鏈
一、說在前面
微服務是當下最火的詞語,現在很多公司都在推廣微服務,當服務越來越多的時候,我們是否會糾結以下幾個問題:
- 面對一筆超時的訂單,究竟是哪一步處理時間超長呢?
- 資料由於併發莫名篡改,到底都誰有重大嫌疑呢?
- 處理遺漏了一筆訂單,曾經是哪個環節出錯把它落下了?
- 系統莫名的報錯,究竟是哪一個服務報的錯誤?
- 每個服務那麼多例項伺服器,如何快速定位到是哪一個例項伺服器報錯的呢?
現在很多系統都要求可用性達到99.9%以上,那麼我們除了增加系統健壯性減少故障的同時,我們又如何在真正發生故障的時候,快速定位和解決問題,也將是我們的重中之重。
在做微服務框架選擇的時候,Spring Cloud無疑是當下最火的,但是因為Spring Cloud是近二年的後起新秀,以及在使用方式上面的差別,目前在很多中小企業還是以dubbo為主,不過遺憾的是,dubbo從官方來講已經不維護了,很多公司都是自己再去維護,那麼今天我就來給大家介紹一下,我們是如何通過修改dubbo原始碼實現了分散式呼叫鏈的第一階段:呼叫鏈日誌的列印。
二、什麼是分散式呼叫鏈
1、什麼是呼叫鏈
基於Google Dapper論文,使用者每次請求都會生成一個全域性ID(traceId),通過它將不同系統的“孤立”的日誌串在一起,重組成呼叫鏈。
2、呼叫鏈的呼叫過程
- 當用戶發起一個請求時,首先到達前端A服務,然後分別對B服務和C服務進行RPC呼叫。
- B服務處理完給A做出響應,但是C服務還需要和後端的D服務和E服務互動之後再返還給A服務,最後由A服務來響應使用者的請求。
3、對整個呼叫過程的追蹤
- 請求到來生成一個全域性TraceID,通過TraceID可以串聯起整個呼叫鏈,一個TraceID代表一次請求。
- 除了TraceID外,還需要SpanID用於記錄呼叫父子關係。每個服務會記錄下Parent id和Span id,通過他們可以組織一次完整呼叫鏈的父子關係。
- 一個沒有Parent id的span成為root span,可以看成呼叫鏈入口。
- 所有這些ID可用全域性唯一的64位整數表示;
- 整個呼叫過程中每個請求都要透傳TraceID和SpanID。
- 每個服務將該次請求附帶的TraceID和附帶的SpanID作為Parent id記錄下,並且將自己生成的SpanID也記錄下。
- 要檢視某次完整的呼叫則只要根據TraceID查出所有呼叫記錄,然後通過Parent id和Span id組織起整個呼叫父子關係。
最終的TraceId和SpanId的呼叫關係圖如下所示:
三、基於Dubbo的實現
1、Dubbo的呼叫過程
在我們分析原始碼的時候,有一行程式碼是:
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
這行程式碼實際上是利用SPI機制,動態載入指定的Protocol注入到ProtocolFilterWrapper中,再通過Wrapper訪問到可執行的Invoker物件,Dubbo預設使用的是DubboProtocol最終通過netty的方式進行通訊,具體呼叫過程請看下圖:
可以看到基本的流程是:
InvokerInvocationHandler ->ClusterInvoker ->LoadBalance -> ProtocolFilterWrapper -> Protocol -> DubboInvoker
而在呼叫鏈的實現過程中技術難點主要是有二個:
- 在哪裡暫存呼叫鏈
- 呼叫鏈資訊如何傳遞
2、Dubbo協議下的呼叫鏈傳遞過程
那麼在預設的Dubbo協議下,實現呼叫鏈的過程很簡單隻需要在應用專案或者Dubbo原始碼中使用如下程式碼就可以實現呼叫鏈的傳遞。
RpcContext.getContext().setAttachment(CallChainContext.TRACEID, traceIdValue);
RpcInvocation rpcInvocation = (RpcInvocation) inv;
rpcInvocation.setAttachment(CallChainContext.TRACEID, traceIdValue);
rpcInvocation.setAttachment(CallChainContext.SPANID, spanIdValue);
在DubboInvoker中最終通訊的時候會將上述程式碼的RpcInvocation物件傳遞出去,那麼我們只需要在接收端獲取既可。
3、Hessian協議下的呼叫鏈傳遞過程
大家都知道,Dubbo在實現通訊的協議上使用的有Netty、Hessian、Rest等方式,由於我們專案的特殊性,目前採用的是Dubbo的Hessian協議。
先看如下程式碼:
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
if ("httpclient".equals(client)) {
hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
} else if (client != null && client.length() > 0 && ! Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
}
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}
通過程式碼可以看到,實際上在使用Hessian通訊的時候並沒有將RpcInvocation裡面設定的TraceId和SpanId傳遞出去,呼叫在這一塊中止了。
那我們如何自己來實現呢?
- 第一步、我們在Dubbo原始碼中自己實現了一個Filter(不是Dubbo的Filter),用來產生TraceId和SpanId,以及最後的清理工作,請看程式碼如下:
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
// 將請求轉換成HttpServletRequest請求
HttpServletRequest httpServletRequest = (HttpServletRequest) request;
try {
archieveId(request);
} catch (Throwable e) {
log.log(Level.SEVERE, "traceId或spanId解析出錯!", e);
}
try {
chain.doFilter(request, response);
} catch (IOException e) {
//還原執行緒名稱
throw e;
} catch (ServletException e) {
//還原執行緒名稱
throw e;
} finally {
CallChainContext.getContext().clearContext();
}
}
在Filter中產生TraceId和SpanId以後,會將二個值放到我們封裝好的CallChainContext中進行暫存。
- 第二步、我們將HessianProxyFactory進行繼承改造
public class HessianProxyWrapper extends HessianProxy {
private static final long serialVersionUID = 353338409377437466L;
private static final Logger log = Logger.getLogger(HessianProxyWrapper.class
.getName());
public HessianProxyWrapper(URL url, HessianProxyFactory factory, Class<?> type) {
super(url, factory, type);
}
protected void addRequestHeaders(HessianConnection conn) {
super.addRequestHeaders(conn);
conn.addHeader("traceId", CallChainContext.getContext().getTraceId());
conn.addHeader("spanId", CallChainContext.getContext().getSpanId());
}
}
我們將CallChainContext中暫存的TraceId和SpanId放入到Hessian的header中。
繼承Dubbo的HessianProxyFactory這個類,新類名是HessianProxyFactoryWrapper,在create方法中將HessianProxy替換為新封裝的HessianProxyWrapper,程式碼如下:
public Object create(Class<?> api, URL url, ClassLoader loader) {
if (api == null)
throw new NullPointerException(
"api must not be null for HessianProxyFactory.create()");
InvocationHandler handler = null;
//將HessianProxy修改為HessianProxyWrapper
handler = new HessianProxyWrapper(url, this, api);
return Proxy.newProxyInstance(loader, new Class[] { api,
HessianRemoteObject.class }, handler);
}
修改後的HessianProtocol的程式碼如下:
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
//新繼承的
HessianProxyFactoryWrapper hessianProxyFactory = new HessianProxyFactoryWrapper();
String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
if ("httpclient".equals(client)) {
hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
} else if (client != null && client.length() > 0 && ! Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
}
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}
通過以上方式可以將我們產生的TraceId和SpanId通過Hessian的方式傳遞出去,我們在接收請求的時候,只需要使用如下程式碼的方式就可以獲取到二個值。
String traceIdValue = request.getHeader("traceId");
String spanIdValue = request.getHeader("spanId");
- 第三步、如何列印呼叫鏈資訊
我們在專案中使用的是Logback的方式列印日誌,首先想到的是繼承一個ClassicConverter物件,實現Logback的自定義格式轉換器,參考程式碼如下:
public class CallChainConverter extends ClassicConverter {
@Override
public String convert(ILoggingEvent event) {
Map<String,String> globalMap = CallChainContext.getContext().get();
StringBuilder builder = new StringBuilder();
if(null == globalMap) {
globalMap = new HashMap<String, String>();
CallChainContext.getContext().add(globalMap);
} else {
String traceId = globalMap.get("traceId");
String spainId = globalMap.get("spanId");
if(traceId == null) {
traceId = String.valueOf(Thread.currentThread().getId());
}
if(spainId == null) {
spainId = "1";
}
builder.append("GUID[");
builder.append(traceId);
builder.append("] - LEVEL[");
builder.append(spainId);
builder.append("] ");
}
return builder.toString();
}
}
在Logback配置檔案中進行如下修改:
<conversionRule conversionWord="callContext" converterClass="com.ulpay.dubbox.core.util.CallChainConverter" />
<layout class="com.ulpay.dubbox.core.util.CallChainPatternLayout">
<pattern>%d %-5p %c [%t] %callContext - %m%n</pattern>
</layout>
最終列印的日誌格式如下樣式:
[RMI TCP Connection(127.0.0.1:2181)] GUID[760a1fedd7ab4ff8a309cebaa01cc61d] - LEVEL[15.27.1] - [執行時間] - [xxx專案] - [xxx服務.xxx方法] - 耗時[7]毫秒
4、採集日誌資訊實現分散式呼叫鏈介面展示
一個最簡單的demo示意圖如下:
- 通過logstash採集日誌到kafka
- kafka負責提供資料給Hbase
- 通過Hbase進行資料分析
最終效果展示圖如下:
四、總結
對於分散式呼叫鏈來說,目前市面上有很多開源的工具,比如:pinpoint,Cat以及sky-walking等等,將這些工具與我們擴充套件的呼叫鏈日誌結合起來將起到更好的效果。
出於公司的考慮,以上的程式碼採用的是虛擬碼,但也具有一定參考價值,我寫這篇文章的目的也是希望能夠給大家提供一些思路,希望大家能夠多提建議,我會持續改進。