深入詳解美團點評CAT跨語言服務監控(二) CAT服務端初始化
Cat模組
Cat-client : cat客戶端,編譯後生成 cat-client-2.0.0.jar ,使用者可以通過它來向cat-home上報統一格式的日誌資訊,可以整合到 mybatis、spring、微服務 dubbo 的監控等等流行框架。
Cat-consumer: 用於實時分析從客戶端提供的資料。在實際開發和部署中,Cat-consumer和Cat-home是部署在一個JVM內部,每個CAT服務端都可以作為consumer也可以作為home,這樣既能減少整個層級結構,也可以增加系統穩定性。
Cat-core:Cat核心模組
Cat-hadoop : 大資料統計依賴模組。
cat-home:大眾點評CAT伺服器端主程式,編譯安裝之後生成 cat-alpha-2.0.0.war 包部署於servlet容器中,我們用的是Tomcat,war包依賴cat-client.jar、cat-consumer.jar, cat-core.jar, cat-hadoop.jar 包,通過web.xml 配置,看到Cat會啟動 cat-servlet 和 mvc-servlet , mvc-servlet 是一個類似 spring MVC 的框架,用於處理使用者WEB管理平臺請求。cat-servlet是CAT服務端監聽入口,CAT會在這裡開啟監聽埠,接收處理客戶端的日誌記錄請求,本章主要介紹cat-servlet。
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5"> ... <servlet> <servlet-name>cat-servlet</servlet-name> <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet> <servlet-name>mvc-servlet</servlet-name> <servlet-class>org.unidal.web.MVC</servlet-class> <init-param> <param-name>cat-client-xml</param-name> <param-value>client.xml</param-value> </init-param> <init-param> <param-name>init-modules</param-name> <param-value>false</param-value> </init-param> <load-on-startup>2</load-on-startup> </servlet> ....
Cat-servlet初始化
圖1 - 容器初始化類圖
CatServlet 首先會呼叫父類 AbstractContainerServlet 的init方法做初始化工作, 可以認為這是CatServlet的入口,他主要做了3件事情,首先呼叫基類HttpServlet的init方法對Servlet進行初始化,然後初始化Plexus容器,最後呼叫子類initComponents初始化Module模組。
public abstract class AbstractContainerServlet extends HttpServlet {
public void init(ServletConfig config) throws ServletException {
super.init(config);
try {
if(this.m_container == null) {
this.m_container = ContainerLoader.getDefaultContainer();
}
this.m_logger = this.m_container.getLogger();
this.initComponents(config);
} catch (Exception var3) {
...
}
}
}
plexus - IOC容器
上面講到init(...)方法在初始化完Servlet之後呼叫 ContainerLoader.getDefaultContainer() 初始化plexus容器。
注:這裡可能大家不太瞭解plexus,它相當於Spring的IoC容器,但是它和Spring框架不同,它並不是一個完整的,擁有各種元件的大型框架,僅僅是一個純粹的IoC容器,它的開發者與Maven的開發者是同一撥人,最初開發Maven的時候,Spring並不成熟,所以Maven的開發者決定使用自己維護的IoC容器Plexus,它與Spring在語法和描述方式稍有不同。在Plexus中,有ROLE的概念,相當於Spring中的一個Bean。支援元件生命週期管理。
非JAVA開發者不懂IOC容器?簡單來說,IOC容器就相當於一個物件裝載器,物件不是由程式設計師new建立,而是框架在初始化的時候從配置檔案中讀取需要例項化的類資訊,將資訊裝入一個物件裝載器,然後在需要的時候,從物件裝載器中找是否存在該類的資訊,存在則返回類的物件。
plexus容器是如何工作的呢?就上面的類圖來說,
a. AbstractContainerServlet 通過容器工廠ContainerLoader 的 getDefaultContainer方法,該方法會建立 MyPlexusContainer 容器,MyPlexusContainer是介面 PlexusContainer 的實現,MyPlexusContainer在建構函式中會建立元件管理器(ComponentManager),可以認為每個類都是容器中的一個元件,ComponentManager就是用來管理這些元件的,包括他的生命週期,元件在Plexus容器配置檔案中配置。
b.元件管理器(ComponentManager)會建立元件模型管理器(ComponentModelManager)以及元件生命週期管理器(ComponentLifecycle),ComponentModelManager用於儲存Plexus容器配置檔案中的所有component元件資訊,它的loadComponentsFromClasspath()方法會掃描各個jar包中存在的plexus容器配置檔案,如圖2,將xml內容解析之後放入PlexusModel 列表中。
public class ComponentManager {
private Map<String, ComponentBox<?>> m_components = new HashMap();
private PlexusContainer m_container;
private ComponentLifecycle m_lifecycle;
private ComponentModelManager m_modelManager;
private LoggerManager m_loggerManager;
public ComponentManager(PlexusContainer container, InputStream in) throws Exception {
this.m_container = container;
this.m_modelManager = new ComponentModelManager();
this.m_lifecycle = new ComponentLifecycle(this);
if(in != null) {
this.m_modelManager.loadComponents(in);
}
this.m_modelManager.loadComponentsFromClasspath();
this.m_loggerManager = (LoggerManager)this.lookup(new ComponentKey(LoggerManager.class, (String)null));
this.register(new ComponentKey(PlexusContainer.class, (String)null), container);
this.register(new ComponentKey(Logger.class, (String)null), this.m_loggerManager.getLoggerForComponent(""));
}
}
我們也可以將我們自己寫的類交給容器管理,只需要將類配置到容器配置檔案中,例如:cat-consumer/src/main/resources/META-INF/plexus/components-cat-consumer.xml, 只要是存在於 META-INF/plexus/ 目錄下,並且檔名以"components-" 開頭的 ".xml" 檔案,都會被 ComponentModelManager 認為是容器配置檔案。
圖2 - plexus IOC容器類配置檔案
c.然後就可以通過lookup方法找到類,並在首次使用的時候例項化,並且xml配置中的該類依賴的其它類也會被一併例項化,另外如果類方法實現了 Initializable 介面,建立物件後會執行類的 initialize() 方法做一些初始化的工作。
if(component instanceof Initializable) {
try {
((Initializable)component).initialize();
} catch (Throwable var5) {
ComponentModel model = ctx.getComponentModel();
throw new ComponentLookupException("Error when initializing component!", model.getRole(), model.getHint(), var5);
}
}
模組的載入 - 模型模式
init(...)函式最後會呼叫CatServlet的initComponents()方法初始化Module模組。
圖3 - 模組初始化類圖
initComponents()方法首先建立一個模組上下文 DefaultModuleContext物件,該物件擁有plexus容器的指標,以及server.xml、client.xml配置檔案資訊 ,服務端配置server.xml中有訊息儲存路徑、HDFS上傳等一系列配置,由於cat-home預設是服務端也是客戶端,也就是說cat-home自身也會被監控,所以我們在這裡看到有client.xml配置,配置檔案所在目錄由環境變數CAT_HOME指定,如果未指定,預設是/data/appdatas/cat。
隨後CatServlet建立一個模組初始器 DefaultModuleInitializer,並呼叫他的execute(ctx)方法建立並初始化模組。
注:DefaultModuleInitializer有一個模組管理器DefaultModelManager m_manager, 讀者可能沒有看見m_manager的建立過程,實際上,物件在components-foundation-service.xml配置檔案中配置的,然後在plexus容器例項化類物件的過程中建立的,後面還有很多物件的屬性也是通過plexus容器注入的。比如DefaultModuleManager的m_topLevelModules屬性通過以下配置注入。
<component>
<role>org.unidal.initialization.ModuleManager</role>
<implementation>org.unidal.initialization.DefaultModuleManager</implementation>
<configuration>
<topLevelModules>cat-home</topLevelModules>
</configuration>
</component>
上面XML配置顯示m_topLevelModules 指定為cat-home,這樣DefaultModuleInitializer通過DefaultModelManager的getTopLevelModules()方法獲取的就是CatHomeModule模組物件,可以認為cat-home是一個頂層模組,所有Module都包含getDependencies方法,該方法會找到當前模組所依賴的其他模組,並例項化模組,比如下面cat-home就依賴cat-consumer模組,
public class CatHomeModule extends AbstractModule {
@Override
public Module[] getDependencies(ModuleContext ctx) {
return ctx.getModules(CatConsumerModule.ID);
}
}
從cat-consumer的getDependencies看出他依賴cat-core模組,cat-core模組又依賴cat-client模組,這樣子我們就從頂層模組引出了所有依賴的其它模組,在例項化模組的同時呼叫模組的setup方法安裝模組。在所有模組安裝完成之後,依次呼叫模組的execute方法完成初始化,但是初始化順序則是按照安裝順序反著來的,cat-client -> cat-core -> cat-consumer -> cat-home ,Modules之間的設計使用了典型的模板模式。
cat-home的setup
在上一章講到模組初始化的時候, 講到setup安裝cat-home模組,對於客戶端的請求的監聽處理,就是在這裡完成的。
@Named(type = Module.class, value = CatHomeModule.ID)
public class CatHomeModule extends AbstractModule {
@Override
protected void setup(ModuleContext ctx) throws Exception {
if (!isInitialized()) {
File serverConfigFile = ctx.getAttribute("cat-server-config-file");
ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);
serverConfigManager.initialize(serverConfigFile);
messageReceiver.init();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
messageReceiver.destory();
}
});
}
}
}
1、讀取 server.xml 配置,裝進配置管理器(ServerConfigManager)。
2、建立訊息接收器 final TcpSocketReceiver messageReceiver;
3、messageReceiver.init() 初始化服務,採用的經典的 netty reactor 模型。
4、註冊一個JVM關閉的鉤子,在程序掛掉的時候,執行一些清理現場的程式碼。
TcpSocketReceiver--- netty reactor 模式的應用
我們來看看CatHomeModule對TcpSocketReceiver的初始化做了什麼,如下原始碼:
public final class TcpSocketReceiver implements LogEnabled {
public void init() {
try {
startServer(m_port);
} catch (Throwable e) {
m_logger.error(e.getMessage(), e);
}
}
public synchronized void startServer(int port) throws InterruptedException {
boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
int threads = 24;
ServerBootstrap bootstrap = new ServerBootstrap();
m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
bootstrap.group(m_bossGroup, m_workerGroup);
bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode", new MessageDecoder());
}
});
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
try {
m_future = bootstrap.bind(port).sync();
m_logger.info("start netty server!");
} catch (Exception e) {
m_logger.error("Started Netty Server Failed:" + port, e);
}
}
}
1、建立EventLoopGroup物件, EventLoopGroup是用來處理IO操作的多執行緒事件迴圈器,m_bossGroup作為一個acceptor負責接收來自客戶端的請求,然後分發給m_workerGroup用來所有的事件event和channel的IO。
2、建立ServerBootstrap物件,ServerBootstrap 是一個啟動Epoll(非Linux為NIO)服務的輔助啟動類,他將設定bossGroup和workerGroup兩個多執行緒時間迴圈器。
3、接下來的channel()方法設定了ServerBootstrap 的 ChannelFactory,這裡傳入的引數是EpollServerSocketChannel.class (非Linux為NioServerSocketChannel.class),也就是說這個ChannelFactory建立的就是EpollServerSocketChannel/NioServerSocketChannel的例項。
Channel是Netty的核心概念之一,它是Netty網路通訊的主體,他從EventLoopGroup獲得一個EventLoop,並註冊到該EventLoop,channel生命週期內都和該EventLoop在一起,由它負責對網路通訊連線的開啟、關閉、連線和讀寫操作。如果是對於讀寫事件,執行執行緒排程pipeline來處理使用者業務邏輯。
4、接下來bootstrap.childHandler的目的是新增一個handler,用來監聽已經連線的客戶端的Channel的動作和狀態,傳入的 ChannelInitializer重寫了initChannel方法,這個方法在Channel被註冊到EventLoop的時候會被呼叫。
5、initChannel會建立ChannelPipeline物件,並呼叫addLast新增ChannelHandler。有網路請求時,ChannelPipeline會呼叫ChannelHandler來處理,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannelPipeline會從頭到尾順序呼叫ChannelInboundHandler處理網路請求內容,從尾到頭呼叫ChannelOutboundHandler處理網路請求內容。這也是Netty用來靈活處理網路請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網路協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。
在這裡,ChannelPipeline新增的 ChannelHandler 是MessageDecoder ,MessageDecoder的祖先類實現了ChannelHandler介面,他本質上還是一個Handler,是網路IO事件具體處理類,當客戶端將日誌資料上傳到伺服器之後,會交給MessageDecoder 解碼資料,然後進行後續處理。
6、呼叫 childOption 設定 channel 的引數。
7、最後呼叫bind()方法啟動服務。
關於netty ,我就講到這裡,網上關於netty框架的文章非常多,大家可以自行去查。
訊息的解碼
上一章我們講到Netty將接收到的訊息交給 MessageDecoder 去做解碼,解碼是交由PlainTextMessageCodec物件將接收到的位元組碼反序列化為MessageTree物件(所有的訊息都是由訊息樹來組織),具體的解碼邏輯在這裡暫不做詳細闡述,在第三章我們會闡述編碼過程,解碼只是編碼的一個逆過程。
解碼之後呼叫 DefaultMessageHandler 的 handle方法對訊息進行處理,handle方法就幹了一件事情,就是呼叫 m_consumer.consume(tree) 方法去消費訊息樹,在消費模組,CAT實現了佇列化,非同步化,在訊息消費章節會詳細闡述。
當然netty handler也是支援非同步處理的,我們也可以將 DefaultMessageHandler 像 MessageDecoder那樣向netty註冊handler, 再由netty來做執行緒池分發。
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (buffer.readableBytes() < 4) {
return;
}
buffer.markReaderIndex();
int length = buffer.readInt();
...
ByteBuf readBytes = buffer.readBytes(length + 4);
...
DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);
readBytes.resetReaderIndex();
tree.setBuffer(readBytes);
m_handler.handle(tree);
m_processCount++;
...
}
}