1. 程式人生 > >使用Thrift API監控Storm叢集和Topology

使用Thrift API監控Storm叢集和Topology

如要監控Storm叢集和執行在其上的Topology,該如何做呢?

Storm已經為你考慮到了,Storm支援Thrift的C/S架構,在部署Nimbus元件的機器上啟動一個Thrift Server程序來提供服務,我們可以通過編寫一個Thrift Client來請求Thrift Server,來獲取你想得到的叢集和Topology的相關資料,來接入監控平臺,如Zabbix等,我目前使用的就是Zabbix。

整體的流程已經清楚了,下面就來實踐吧。

1 安裝Thrift

由於我們要使用Thrift來編譯Storm的原始碼來獲得Thrift Client相關的Java原始碼,所以需要先安裝Thrift,這裡選取的版本為0.9.2。

到官網下載好安裝包:http://thrift.apache.org/

編譯安裝:configure && make && make install

驗證:thrift --version

如果打印出Thrift version 0.9.2,代表安裝成功。

2 編譯Thrift Client程式碼

首先下載Storm原始碼,這裡使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz

解壓後進行編譯:thrift -gen java  apache-storm-0.9.3/storm-core/src/storm.thrift

在當前目錄下出現gen-java資料夾,此資料夾下就是Thrift Client的Java原始碼了。

3 使用Thrift Client API

然後建立一個Maven專案來進行執行監控資料的獲取。

專案生成一個Jar檔案,輸入一些命令和自定義引數,然後輸出結果。

以命令列的形式進行呼叫,這樣可以方便的接入監控系統,當然使用形式可以根據自身情況施行。

建立好後,把gen-java生成的程式碼拷貝進來。

在pom.xml裡引入Thrift對應版本的庫:

<dependency>
	<groupId>org.apache.thrift</groupId>
	<artifactId>libthrift</artifactId>
	<version>0.9.2</version>
</dependency>

首先寫一些Thrift相關的輔助類。

ClientInfo.java

package com.damacheng009.storm.monitor.thrift;

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;

import backtype.storm.generated.Nimbus;

/**
 * 代表一個Thrift Client的資訊
 * @author jb-xingchencheng
 *
 */
public class ClientInfo {
	private TSocket tsocket;
	private TFramedTransport tTransport;
	private TBinaryProtocol tBinaryProtocol;
	private Nimbus.Client client;

	public TSocket getTsocket() {
		return tsocket;
	}

	public void setTsocket(TSocket tsocket) {
		this.tsocket = tsocket;
	}

	public TFramedTransport gettTransport() {
		return tTransport;
	}

	public void settTransport(TFramedTransport tTransport) {
		this.tTransport = tTransport;
	}

	public TBinaryProtocol gettBinaryProtocol() {
		return tBinaryProtocol;
	}

	public void settBinaryProtocol(TBinaryProtocol tBinaryProtocol) {
		this.tBinaryProtocol = tBinaryProtocol;
	}

	public Nimbus.Client getClient() {
		return client;
	}

	public void setClient(Nimbus.Client client) {
		this.client = client;
	}
}
ClientManager.java
package com.damacheng009.storm.monitor.thrift;

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;

import backtype.storm.generated.Nimbus;

/**
 * Thrift Client管理類
 * @author jb-xingchencheng
 *
 */
public class ClientManager {
	public static ClientInfo getClient(String nimbusHost, int nimbusPort) throws TTransportException {
		ClientInfo client = new ClientInfo();
		TSocket tsocket = new TSocket(nimbusHost, nimbusPort);
		TFramedTransport tTransport = new TFramedTransport(tsocket);
		TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
		Nimbus.Client c = new Nimbus.Client(tBinaryProtocol);
		tTransport.open();
		client.setTsocket(tsocket);
		client.settTransport(tTransport);
		client.settBinaryProtocol(tBinaryProtocol);
		client.setClient(c);
		
		return client;	
	}
	
	public static void closeClient(ClientInfo client) {
		if (null == client) {
			return;
		}
		
		if (null != client.gettTransport()) {
			client.gettTransport().close();
		}
		
		if (null != client.getTsocket()) {
			client.getTsocket().close();
		}
	}
}
然後就可以寫自己的邏輯去獲取叢集和拓撲的資料了,Storm提供的UI介面上展示的資料基本都可以獲取到,這裡只舉出一個簡單的例子,我們想獲得某個拓撲發生異常的次數,和發生的異常的堆疊。剩下的專案你可以隨意的定製。

下面是入口類:

Main.java

package com.damacheng009.storm.monitor;

import com.damacheng009.storm.monitor.logic.Logic;

/**
 * 入口類
 * @author jb-xingchencheng
 *
 */
public class Main {
	// NIMBUS的資訊
	public static String NIMBUS_HOST = "192.168.180.36";
	public static int NIMBUS_PORT = 6627;

	/**
	 * 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多引數)
	 * @param args
	 */
	public static void main(String[] args) {
		if (args.length < 3) {
			return;
		}
		
		NIMBUS_HOST = args[0];
		NIMBUS_PORT = Integer.parseInt(args[1]);
		
		String cmd = args[2];
		String result = "-1";
		if (cmd.equals("get_topo_exp_size")) {
			String topoName = args[3];
			result = Logic.getTopoExpSize(topoName);
		} else if (cmd.equals("get_topo_exp_stack_trace")) {
			String topoName = args[3];
			result = Logic.getTopoExpStackTrace(topoName);
		}
		
		System.out.println(result);
	}
}

測試的時候把具體的HOST和PORT改一下即可。

然後是具體的邏輯類。

Logic.java

package com.damacheng009.storm.monitor.logic;

import java.util.Date;
import java.util.List;
import java.util.Set;

import com.damacheng009.storm.monitor.Main;
import com.damacheng009.storm.monitor.thrift.ClientInfo;
import com.damacheng009.storm.monitor.thrift.ClientManager;

import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.ErrorInfo;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologySummary;

public class Logic {
	/**
	 * 取得某個拓撲的異常個數
	 * @param topoName
	 * @return
	 */
	public static String getTopoExpSize(String topoName) {
		ClientInfo client = null;
		int errorTotal = 0;
		
		try {
			client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);
			
			ClusterSummary clusterSummary = client.getClient().getClusterInfo();
			List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();
			for (TopologySummary ts : topoSummaryList) {
				if (ts.getName().equals(topoName)) {
					TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());
					Set<String> errorKeySet = topologyInfo.getErrors().keySet();
					for (String errorKey : errorKeySet) {
						List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);
						errorTotal += listErrorInfo.size();
					}
					break;
				}
			}
			
			return String.valueOf(errorTotal);
		} catch (Exception e) {
			return "-1";
		} finally {
			ClientManager.closeClient(client);
		}	
	}

	/**
	 * 返回某個拓撲的異常堆疊
	 * @param topoName
	 * @return
	 */
	public static String getTopoExpStackTrace(String topoName) {
		ClientInfo client = null;
		StringBuilder error = new StringBuilder();
		
		try {
			client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);
			
			ClusterSummary clusterSummary = client.getClient().getClusterInfo();
			List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();
			for (TopologySummary ts : topoSummaryList) {
				if (ts.getName().equals(topoName)) {
					TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());
					// 得到錯誤資訊
					Set<String> errorKeySet = topologyInfo.getErrors().keySet();
					for (String errorKey : errorKeySet) {
						List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);
						for (ErrorInfo ei : listErrorInfo) {
							// 發生異常的時間
							long expTime = (long) ei.getError_time_secs() * 1000;
							// 現在的時間
							long now = System.currentTimeMillis();
							
							// 由於獲取的是全量的錯誤堆疊,我們可以設定一個範圍來獲取指定範圍的錯誤,看情況而定
							// 如果超過5min,那麼就不用記錄了,因為5min檢查一次
							if (now - expTime > 1000 * 60 * 5) {
								continue;
							}
							
							error.append(new Date(expTime) + "\n");
							error.append(ei.getError() + "\n");
						}
					}
					
					break;
				}
			}
			
			return error.toString().isEmpty() ? "none" : error.toString();
		} catch (Exception e) {
			return "-1";
		} finally {
			ClientManager.closeClient(client);
		}
	}
}

最後打成一個Jar包,就可以跑起來接入監控系統了,如在Zabbix中,可以把各個監控項設定為自定義的item,在Zabbix Client中配置命令列來執行Jar取得資料。

接下來的測試過程先略過。

對於Storm監控的實踐,目前就是這樣了。