大眾點評的實時監控系統分析
今天我們從使用和實現兩個方面來聊一聊大眾點評的Java應用實時監控系統–CAT,它目前已成為一個開源專案,見Github。
目錄
CAT能做些什麼?
CAT客戶端的設計
CAT客戶端的實現
Message
MessageProducer
MessageManager
Context
TransportManager和MessageSender
最後
CAT能做些什麼?
在此之前,先來想一想對於線上應用我們希望能監控些什麼?可能有如下這些:
機器狀態資訊。CPU負載、記憶體資訊、磁碟使用率這些是必需的,另外可能還希望收集Java程序的資料,例如執行緒棧、堆、垃圾回收等資訊,以幫助出現問題時快速debug。
請求訪問情況。例如請求個數、響應時間、處理狀態,如果有處理過程中的時間分析那就更完美了。
異常情況。譬如快取服務時不時出現無響應,我們希望能夠監控到這種異常,從而做進一步的處理。
業務情況。例如訂單量統計,銷售額等等。
…
以上這些CAT都能支援。根據其官方文件,CAT支援如下5種監控訊息:
Transaction。記錄跨越系統邊界的程式訪問行為,比如遠端呼叫,資料庫呼叫,也適合執行時間較長的業務邏輯監控。
Event。用來記錄一件事發生的次數,比如記錄系統異常,它和transaction相比缺少了時間的統計,開銷比transaction要小。
Heartbeat。表示程式內定期產生的統計資訊, 如CPU%, MEM%, 連線池狀態, 系統負載等。
Metric。用於記錄業務指標、指標可能包含對一個指標記錄次數、記錄平均值、記錄總和,業務指標最低統計粒度為1分鐘。
Trace。用於記錄基本的trace資訊,類似於log4j的info資訊,這些資訊僅用於檢視一些相關資訊
在一個請求處理中可能產生有多種訊息,CAT將其組織成訊息樹的形式。在處理開始時,預設開始一個型別為URL的Transaction,在這個Transaction中業務本身可以產生子訊息。例如,產生一個數據庫訪問的子Transaction或者一個訂單統計的Metric。結構如下所示:
message-tree
CAT的使用比較簡單,介面也比較清晰,關於其使用請參考官方文件,這裡不再贅述。本文主要討論其客戶端的設計與實現。
CAT客戶端的設計
作為一個日誌上報的通用客戶端,考慮點至少有如下這些:
為了儘可能減少對業務的影響,需要對訊息進行非同步處理。即業務執行緒將訊息交給CAT客戶端與CAT客戶端上報這兩個過程需要非同步。
為了達到實時的目的以及適應高併發的情況,客戶端上報應該基於TCP而非HTTP開發。
線上程安全的前提下儘可能的資源低消耗以及低延時。我們知道,執行緒競爭的情況是由於資源共享造成的,要達到執行緒安全通常需要減少資源共享或者加鎖,而這兩點則會導致系統資源冗餘和高延時。
…
CAT客戶端實現並不複雜,但這些點都考慮到了。它的架構如下所示:
cat-architecture
大概步驟為:
業務執行緒產生訊息,交給訊息Producer,訊息Producer將訊息存放在該業務執行緒訊息棧中;
業務執行緒通知訊息Producer訊息結束時,訊息Producer根據其訊息棧產生訊息樹放置在同步訊息佇列中;
訊息上報執行緒監聽訊息佇列,根據訊息樹產生最終的訊息報文上報CAT服務端。
下面我們來一步一步分析其原始碼。
CAT客戶端的實現
CAT客戶端實現在原始碼目錄cat-client下,而cat-client的主要實現則依賴於它的com.dianping.cat.message包。該包結構如下:
category
com.dianping.cat.message中主要包含了internal、io、spi這三個目錄:
internal目錄包含主要的CAT客戶端內部實現類;
io目錄包含建立服務端連線、重連、訊息佇列監聽、上報等io實現類;
spi目錄為上報訊息工具包,包含訊息二進位制編解碼、轉義等實現類。
其uml圖如下所示(可以放大看):
uml
類的功能如下:
Message為所有上報訊息的抽象,它的子類實現有Transaction、Metric、Event、HeartBeat、Trace這五種。
MessageProducer封裝了所有介面,業務在使用CAT時只需要通過MessageProducer來操作。
MessageManager為CAT客戶端核心類,相當於MVC中的Controller。
Context類儲存訊息上下文。
TransportManager提供傳送訊息的sender,具體實現有DefaultTransportManager,呼叫其getSender介面返回一個TcpSocketSender。
TcpSocketSender類負責傳送訊息。
Message
上面說到,Message有五類,分別為Transaction、Metric、Event、HeartBeat、Trace。其中Metric、Event、HeartBeat、Trace基本相同,儲存的資料都為一個字串;而Transaction則儲存一個Message列表。換句話說,Transaction的結構為一個遞迴包含的結構,其他結構則為原子性結構。
下面為DefaultTransaction的關鍵資料成員及操作:
public class DefaultTransaction extends AbstractMessage implements Transaction {
private List m_children;
private MessageManager m_manager;
…
//新增子訊息
public DefaultTransaction addChild(Message message) {
...
}
//Transaction結束時呼叫此方法
public void complete() {
...
m_manager.end(this); //呼叫MessageManager來結束Transaction
...
}
值得一提的是,Transaction(或者其他的Message)在建立時自動開始,訊息結束時需要業務方呼叫complete方法,而在complete方法內部則呼叫MessageManager來完成訊息。
MessageProducer
MessageProducer對業務方封裝了CAT內部的所有細節,它的主要方法如下:
public void logError(String message, Throwable cause);
public void logEvent(String type, String name, String status, String nameValuePairs);
public void logHeartbeat(String type, String name, String status, String nameValuePairs);
public void logMetric(String name, String status, String nameValuePairs);
public void logTrace(String type, String name, String status, String nameValuePairs);
…
public Event newEvent(String type, String name);
public Event newEvent(Transaction parent, String type, String name);
public Heartbeat newHeartbeat(String type, String name);
public Metric newMetric(String type, String name);
public Transaction newTransaction(String type, String name);
public Trace newTrace(String type, String name);
…
logXXX方法為方法糖(造詞小能手呵呵),這些方法在呼叫時需要傳入訊息資料,方法結束後訊息自動結束。
newXXX方法返回相應的Message,業務方需要呼叫Message方法設定資料,並最終呼叫Message.complete()方法結束訊息。
MessageProducer只是介面封裝,訊息處理主要實現依賴於MessageManager這個類。
MessageManager
MessageManager為CAT的核心類,但它只是定義了介面,具體實現為DefaultMessageManager。DefaultMessageManager這個類裡面主要包含了兩個功能類,Context和TransportManager,分別用於儲存上下文和訊息傳輸。TransportManager執行期間為單例物件,而Context則包裝成ThreadLocal為每個執行緒儲存上下文。
我們通過介面來了解DefaultMessageManager的主要功能:
public void add(Message message);
public void start(Transaction transaction, boolean forked);
public void end(Transaction transaction);
public void flush(MessageTree tree);
add()方法用來新增原子性的Message,也就是Metric、Event、HeartBeat、Trace。
start()和end()方法用來開始和結束Transaction這種訊息。
flush()方法用來將當前業務執行緒的所有訊息重新整理到CAT服務端,當然,是非同步的。
Context
Context用來儲存訊息上下文,我們可以通過它的主要介面來了解它功能:
public void add(Message message) {
if (m_stack.isEmpty()) {
MessageTree tree = m_tree.copy();
tree.setMessage(message);
flush(tree);
} else {
Transaction parent = m_stack.peek();
addTransactionChild(message, parent);
}
}
add方法主要新增原子性訊息,它先判斷該訊息是否有上文訊息(即判斷是否處於一個Transaction中)。如果有則m_stack不為空並且將該訊息新增到上文Transaction的子訊息佇列中;否則直接呼叫flush來將此原子性訊息重新整理到服務端。
public void start(Transaction transaction, boolean forked) {
if (!m_stack.isEmpty()) {
…
Transaction parent = m_stack.peek();
addTransactionChild(transaction, parent);
} else {
m_tree.setMessage(transaction);
}
if (!forked) {
m_stack.push(transaction);
}
}
start方法用來開始Transaction(Transaction是訊息裡比較特殊的一種),如果當前訊息棧為空則證明該Transaction為第一個Transaction,使用訊息樹儲存該訊息,同時將該訊息壓棧;否則將當前Transaction儲存到上文Transaction的子訊息佇列中,同時將該訊息壓棧。
public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.pop();
…
if (m_stack.isEmpty()) {
MessageTree tree = m_tree.copy();
m_tree.setMessageId(null);
m_tree.setMessage(null);
...
manager.flush(tree); //重新整理訊息到CAT服務端
return true;
}
}
return false;
}
end方法用來結束Transaction,每次呼叫都會pop訊息棧,如果棧為空則呼叫flush來重新整理訊息到CAT服務端。
綜上,Context的m_stack的結構如下:
message-stack
Transaction之間是有引用的,因此在end方法中只需要將第一個Transaction(封裝在MessageTree中)通過MessageManager來flush,在拼接訊息時可以根據這個引用關係來找到所有的Transaction :)。
TransportManager和MessageSender
這兩個類用來發送訊息到服務端。MessageManager通過TransportManager獲取到MessageSender,呼叫sender.send()方法來發送訊息。 TransportManager和MessageSender關係如下:
transport
TCPSocketSender為MessageSender的具體子類,它裡面主要的資料成員為:
private MessageCodec m_codec;
private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
private ChannelManager m_manager;
MessageCodec:CAT基於TCP傳輸訊息,因此在傳送訊息時需要對字元訊息編碼成位元組流,這個編碼的工作由MessageCodec負責實現。
MessageQueue:還記得剛才說業務方在新增訊息時,CAT非同步傳送到服務端嗎?在新增訊息時,訊息會被放置在TCPSocketSender的m_queue中,如果超出queue大小則拋棄訊息。
ChannelManager:CAT底層使用netty來實現TCP訊息傳輸,ChannelManager負責維護通訊Channel。通俗的說,維護連線。
TCPSocketSender主要方法為initialize、send和run,分別介紹如下:
public void initialize() {
m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);
Threads.forGroup("cat").start(this);
Threads.forGroup("cat").start(m_manager);
...
}
initialize方法為初始化方法,在執行時主要建立兩個執行緒,一個用來執行自身run方法(TCPSocketSender實現了Runnable介面)監聽訊息佇列;另一個則用來執行ChannelManager維護通訊Channel。
public void send(MessageTree tree) {
if (isAtomicMessage(tree)) {
boolean result = m_atomicTrees.offer(tree, m_manager.getSample());
if (!result) {
logQueueFullInfo(tree);
}
} else {
boolean result = m_queue.offer(tree, m_manager.getSample());
if (!result) {
logQueueFullInfo(tree);
}
}
}
send方法被MessageManager呼叫,把訊息放置在訊息佇列中。
public void run() {
m_active = true;
while (m_active) {
ChannelFuture channel = m_manager.channel();
if (channel != null && checkWritable(channel)) {
try {
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
try {
Thread.sleep(5);
} catch (Exception e) {
// ignore it
m_active = false;
}
}
}
}
private void sendInternal(MessageTree tree) {
ChannelFuture future = m_manager.channel();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
m_codec.encode(tree, buf);
int size = buf.readableBytes();
Channel channel = future.channel();
channel.writeAndFlush(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
run方法會一直執行直到程序退出,在迴圈裡先獲取通訊Channel,然後傳送訊息。值得注意的是,sendInternal方法在執行時呼叫m_codec.encode(tree, buf),引數為訊息樹和緩衝區。訊息樹裡面其實只儲存了一個訊息,還記得剛才說的Transaction上下文引用嗎?m_codec在encode的時候會判斷訊息型別是否為Transaction,如果為Transaction則會遞迴獲取子Transaction,否則直接將該訊息編碼。具體實現可以參考原始碼的PlainTextMessageCodec類的encode方法,此處不再贅述。