Atlas kafka資料匯入失敗問題與zkUtils中Zookeeper連線解析
Atlas kafka資料匯入失敗
atlas版本:apache-atlas-1.0.0
Atlas安裝完後需要匯入hive和kafka的資料
呼叫Atlas的import-kafka.sh匯入kafka資料
結果顯示Kafka Data Model imported successfully!!!
但通過atlasWeb介面看到並沒有資料匯入
檢視import-kafka.log日誌也沒有報錯資訊
輸出日誌內容:
2018-09-27 09:31:00,812 INFO - [main:] ~ Looking for atlas-application.properties in classpath (ApplicationProperties:85) 2018-09-27 09:31:00,816 INFO - [main:] ~ Loading atlas-application.properties from jar:file:/opt/apache-atlas/apache-atlas-1.0.0/hook/kafka/atlas-kafka-plugin-impl/atlas-intg-1.0.0.jar!/atlas-application.properties (ApplicationProperties:98) 2018-09-27 09:31:00,848 INFO - [main:] ~ Property (set to default) atlas.graph.cache.db-cache = true (ApplicationProperties:242) 2018-09-27 09:31:00,848 INFO - [main:] ~ Property (set to default) atlas.graph.cache.db-cache-clean-wait = 20 (ApplicationProperties:242) 2018-09-27 09:31:00,850 INFO - [main:] ~ Property (set to default) atlas.graph.cache.db-cache-size = 0.5 (ApplicationProperties:242) 2018-09-27 09:31:00,850 INFO - [main:] ~ Property (set to default) atlas.graph.cache.tx-cache.size = 15000 (ApplicationProperties:242) 2018-09-27 09:31:00,850 INFO - [main:] ~ Property (set to default) atlas.graph.cache.tx-dirty-size = 120 (ApplicationProperties:242) 2018-09-27 09:31:04,472 INFO - [main:] ~ Client has only one service URL, will use that for all actions: http://cdh005:21000 (AtlasBaseClient:291) 2018-09-27 09:31:04,486 INFO - [ZkClient-EventThread-11-cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka:] ~ Starting ZkClient event thread. (ZkEventThread:65) 2018-09-27 09:31:04,492 INFO - [main:] ~ Client environment:zookeeper.version=3.4.5-cdh5.9.0--1, built on 10/21/2016 08:08 GMT (Environment:100) 2018-09-27 09:31:04,492 INFO - [main:] ~ Client environment:host.name=cdh005 (Environment:100) 2018-09-27 09:31:04,493 INFO - [main:] ~ Client environment:java.version=1.8.0_144 (Environment:100) 2018-09-27 09:31:04,493 INFO - [main:] ~ Client environment:java.vendor=Oracle Corporation (Environment:100) 2018-09-27 09:31:04,493 INFO - [main:] ~ Client environment:java.home=/usr/java/jdk1.8.0_144/jre (Environment:100) 2018-09-27 09:31:04,494 INFO - [main:] ~ Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (Environment:100) 2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:java.io.tmpdir=/tmp (Environment:100) 2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:java.compiler=<NA> (Environment:100) 2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:os.name=Linux (Environment:100) 2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:os.arch=amd64 (Environment:100) 2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:os.version=3.10.0-327.el7.x86_64 (Environment:100) 2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:user.name=cloudera-scm (Environment:100) 2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:user.home=/var/lib/cloudera-scm-server (Environment:100) 2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:user.dir=/opt/apache-atlas/apache-atlas-1.0.0/hook/kafka/atlas-kafka-plugin-impl (Environment:100) 2018-09-27 09:31:04,496 INFO - [main:] ~ Initiating client connection, connectString=cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka sessionTimeout=60000
[email protected] (ZooKeeper:438) 2018-09-27 09:31:04,508 INFO - [main:] ~ Waiting for keeper state SyncConnected (ZkClient:936) 2018-09-27 09:31:04,510 INFO - [main-SendThread(cdh003:2181):] ~ Opening socket connection to server cdh003/172.18.70.41:2181. Will not attempt to authenticate using SASL (unknown error) (ClientCnxn$SendThread:975) 2018-09-27 09:31:04,563 INFO - [main-SendThread(cdh003:2181):] ~ Socket connection established, initiating session, client: /172.18.80.81:51920, server: cdh003/172.18.70.41:2181 (ClientCnxn$SendThread:852) 2018-09-27 09:31:04,580 INFO - [main-SendThread(cdh003:2181):] ~ Session establishment complete on server cdh003/172.18.70.41:2181, sessionid = 0x26602e478142f04, negotiated timeout = 60000 (ClientCnxn$SendThread:1235) 2018-09-27 09:31:04,582 INFO - [main-EventThread:] ~ zookeeper state changed (SyncConnected) (ZkClient:713)
可以看到開始執行時,會現在jar包中尋找atlas-application.properties配置檔案,但是已經把配置檔案加入jar包中了。
猜測是zookeeper連線的問題,然而根據日誌顯示,zookeeper連線成功,沒有報錯,最終也顯示Kafka Data Model imported successfully!!!
判斷是因為根本沒有拿到kafka的topic資料,所以最後雖然顯示成功,但實際Atlas中並沒有資料
日誌看不到,只好把atlas原始碼擼下來看看kafka資料是怎麼匯入的了,通過IDEA匯入Atlas的原始碼。匯入kafka資料的bridge在如下路徑
檢視原始碼發現KafkaBridge中獲取topic的方式如下:
this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
通過zkUtils.getAllTopics()方法
為了方便測試,把這段邏輯單獨拉出來試試看能否獲取到topic資訊
public static void main(String[] args) throws AtlasException {
String connection = "cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka";
ZkClient client = new ZkClient(connection, 10 * 1000);
ZkUtils zkUtils = new ZkUtils(client, new ZkConnection(connection), false);
List<String> list = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
System.out.println(list);
}
執行,果然發現獲取到的list大小為0,猜測可能是連線路徑的問題,繼續深入原始碼,發現在構造ZkConnection物件時,呼叫它的connect方法會建立一個Zookeeper物件
public void connect(Watcher watcher) {
this._zookeeperLock.lock();
try {
if (this._zk != null) {
throw new IllegalStateException("zk client has already been started");
}
try {
LOG.debug("Creating new ZookKeeper instance to connect to " + this._servers + ".");
//此處建立Zookeeper物件
this._zk = new ZooKeeper(this._servers, this._sessionTimeOut, watcher);
} catch (IOException var6) {
throw new ZkException("Unable to connect to " + this._servers, var6);
}
} finally {
this._zookeeperLock.unlock();
}
}
new Zookeeper()物件構造方法如下
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
//注意這裡,解析連線的字串
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
其中呼叫了ConnectStringParser解析連線字串,ConnectStringParser的構造方法如下
public ConnectStringParser(String connectString) {
// parse out chroot, if any
//首先獲取'/'符號第一次出現的index
int off = connectString.indexOf('/');
if (off >= 0) {
//通過該index 截取出chrootPath和connectString,看到這裡原因就已經很明瞭了
//前面我們使用的連線字串為connect = "cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka"
//所以截取出來的chrootPath為/kafka,cdh005:2181/kafka,cdh006:2181/kafka
//要獲取topic資訊的zk根路徑為/kafka,因此獲取不到topic資訊
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
//截取出connectString
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
String hostsList[] = connectString.split(",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
重新修改連線字串為"cdh003:2181,cdh005:2181,cdh006:2181/kafka"
,則截取出的chroot為 /kafka
,connectString為cdh003:2181,cdh005:2181,cdh006:2181
,然後再根據 ,和: 分割出多個連線的serverAddresses
測試獲取topic資訊成功,在伺服器上修改atlas-application.properties的配置 將配置檔案重新打入jar包
zip -u kafka-bridge-1.0.0.jar atlas-application.properties