1. 程式人生 > >Atlas kafka資料匯入失敗問題與zkUtils中Zookeeper連線解析

Atlas kafka資料匯入失敗問題與zkUtils中Zookeeper連線解析

Atlas kafka資料匯入失敗

atlas版本:apache-atlas-1.0.0

Atlas安裝完後需要匯入hive和kafka的資料

呼叫Atlas的import-kafka.sh匯入kafka資料

結果顯示Kafka Data Model imported successfully!!! 但通過atlasWeb介面看到並沒有資料匯入

檢視import-kafka.log日誌也沒有報錯資訊

輸出日誌內容:

2018-09-27 09:31:00,812 INFO  - [main:] ~ Looking for atlas-application.properties in classpath (ApplicationProperties:85)
2018-09-27 09:31:00,816 INFO  - [main:] ~ Loading atlas-application.properties from jar:file:/opt/apache-atlas/apache-atlas-1.0.0/hook/kafka/atlas-kafka-plugin-impl/atlas-intg-1.0.0.jar!/atlas-application.properties (ApplicationProperties:98)
2018-09-27 09:31:00,848 INFO  - [main:] ~ Property (set to default) atlas.graph.cache.db-cache = true (ApplicationProperties:242)
2018-09-27 09:31:00,848 INFO  - [main:] ~ Property (set to default) atlas.graph.cache.db-cache-clean-wait = 20 (ApplicationProperties:242)
2018-09-27 09:31:00,850 INFO  - [main:] ~ Property (set to default) atlas.graph.cache.db-cache-size = 0.5 (ApplicationProperties:242)
2018-09-27 09:31:00,850 INFO  - [main:] ~ Property (set to default) atlas.graph.cache.tx-cache.size = 15000 (ApplicationProperties:242)
2018-09-27 09:31:00,850 INFO  - [main:] ~ Property (set to default) atlas.graph.cache.tx-dirty-size = 120 (ApplicationProperties:242)
2018-09-27 09:31:04,472 INFO  - [main:] ~ Client has only one service URL, will use that for all actions: http://cdh005:21000 (AtlasBaseClient:291)
2018-09-27 09:31:04,486 INFO  - [ZkClient-EventThread-11-cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka:] ~ Starting ZkClient event thread. (ZkEventThread:65)
2018-09-27 09:31:04,492 INFO  - [main:] ~ Client environment:zookeeper.version=3.4.5-cdh5.9.0--1, built on 10/21/2016 08:08 GMT (Environment:100)
2018-09-27 09:31:04,492 INFO  - [main:] ~ Client environment:host.name=cdh005 (Environment:100)
2018-09-27 09:31:04,493 INFO  - [main:] ~ Client environment:java.version=1.8.0_144 (Environment:100)
2018-09-27 09:31:04,493 INFO  - [main:] ~ Client environment:java.vendor=Oracle Corporation (Environment:100)
2018-09-27 09:31:04,493 INFO  - [main:] ~ Client environment:java.home=/usr/java/jdk1.8.0_144/jre (Environment:100)
2018-09-27 09:31:04,494 INFO  - [main:] ~ Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (Environment:100)
2018-09-27 09:31:04,495 INFO  - [main:] ~ Client environment:java.io.tmpdir=/tmp (Environment:100)
2018-09-27 09:31:04,495 INFO  - [main:] ~ Client environment:java.compiler=<NA> (Environment:100)
2018-09-27 09:31:04,495 INFO  - [main:] ~ Client environment:os.name=Linux (Environment:100)
2018-09-27 09:31:04,495 INFO  - [main:] ~ Client environment:os.arch=amd64 (Environment:100)
2018-09-27 09:31:04,496 INFO  - [main:] ~ Client environment:os.version=3.10.0-327.el7.x86_64 (Environment:100)
2018-09-27 09:31:04,496 INFO  - [main:] ~ Client environment:user.name=cloudera-scm (Environment:100)
2018-09-27 09:31:04,496 INFO  - [main:] ~ Client environment:user.home=/var/lib/cloudera-scm-server (Environment:100)
2018-09-27 09:31:04,496 INFO  - [main:] ~ Client environment:user.dir=/opt/apache-atlas/apache-atlas-1.0.0/hook/kafka/atlas-kafka-plugin-impl (Environment:100)
2018-09-27 09:31:04,496 INFO  - [main:] ~ Initiating client connection, connectString=cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka sessionTimeout=60000 
[email protected]
(ZooKeeper:438) 2018-09-27 09:31:04,508 INFO - [main:] ~ Waiting for keeper state SyncConnected (ZkClient:936) 2018-09-27 09:31:04,510 INFO - [main-SendThread(cdh003:2181):] ~ Opening socket connection to server cdh003/172.18.70.41:2181. Will not attempt to authenticate using SASL (unknown error) (ClientCnxn$SendThread:975) 2018-09-27 09:31:04,563 INFO - [main-SendThread(cdh003:2181):] ~ Socket connection established, initiating session, client: /172.18.80.81:51920, server: cdh003/172.18.70.41:2181 (ClientCnxn$SendThread:852) 2018-09-27 09:31:04,580 INFO - [main-SendThread(cdh003:2181):] ~ Session establishment complete on server cdh003/172.18.70.41:2181, sessionid = 0x26602e478142f04, negotiated timeout = 60000 (ClientCnxn$SendThread:1235) 2018-09-27 09:31:04,582 INFO - [main-EventThread:] ~ zookeeper state changed (SyncConnected) (ZkClient:713)

可以看到開始執行時,會現在jar包中尋找atlas-application.properties配置檔案,但是已經把配置檔案加入jar包中了。

猜測是zookeeper連線的問題,然而根據日誌顯示,zookeeper連線成功,沒有報錯,最終也顯示Kafka Data Model imported successfully!!! 判斷是因為根本沒有拿到kafka的topic資料,所以最後雖然顯示成功,但實際Atlas中並沒有資料

日誌看不到,只好把atlas原始碼擼下來看看kafka資料是怎麼匯入的了,通過IDEA匯入Atlas的原始碼。匯入kafka資料的bridge在如下路徑

在這裡插入圖片描述 檢視原始碼發現KafkaBridge中獲取topic的方式如下:

this.zkUtils         = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());

通過zkUtils.getAllTopics()方法

為了方便測試,把這段邏輯單獨拉出來試試看能否獲取到topic資訊

public static void main(String[] args) throws AtlasException {
    String connection = "cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka";
    
    ZkClient client = new ZkClient(connection, 10 * 1000);
    ZkUtils zkUtils = new ZkUtils(client, new ZkConnection(connection), false);

    List<String> list = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
    System.out.println(list);
}

執行,果然發現獲取到的list大小為0,猜測可能是連線路徑的問題,繼續深入原始碼,發現在構造ZkConnection物件時,呼叫它的connect方法會建立一個Zookeeper物件

public void connect(Watcher watcher) {
    this._zookeeperLock.lock();

    try {
        if (this._zk != null) {
            throw new IllegalStateException("zk client has already been started");
        }

        try {
            LOG.debug("Creating new ZookKeeper instance to connect to " + this._servers + ".");
            //此處建立Zookeeper物件
            this._zk = new ZooKeeper(this._servers, this._sessionTimeOut, watcher);
        } catch (IOException var6) {
            throw new ZkException("Unable to connect to " + this._servers, var6);
        }
    } finally {
        this._zookeeperLock.unlock();
    }

}

new Zookeeper()物件構造方法如下

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly)
    throws IOException
{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    watchManager.defaultWatcher = watcher;

    //注意這裡,解析連線的字串
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

其中呼叫了ConnectStringParser解析連線字串,ConnectStringParser的構造方法如下

public ConnectStringParser(String connectString) {
    // parse out chroot, if any
    //首先獲取'/'符號第一次出現的index
    int off = connectString.indexOf('/');
    if (off >= 0) {
        //通過該index 截取出chrootPath和connectString,看到這裡原因就已經很明瞭了
        //前面我們使用的連線字串為connect = "cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka"
        //所以截取出來的chrootPath為/kafka,cdh005:2181/kafka,cdh006:2181/kafka
        //要獲取topic資訊的zk根路徑為/kafka,因此獲取不到topic資訊
        String chrootPath = connectString.substring(off);
        // ignore "/" chroot spec, same as null
        if (chrootPath.length() == 1) {
            this.chrootPath = null;
        } else {
            PathUtils.validatePath(chrootPath);
            this.chrootPath = chrootPath;
        }
        //截取出connectString
        connectString = connectString.substring(0, off);
    } else {
        this.chrootPath = null;
    }

    String hostsList[] = connectString.split(",");
    for (String host : hostsList) {
        int port = DEFAULT_PORT;
        int pidx = host.lastIndexOf(':');
        if (pidx >= 0) {
            // otherwise : is at the end of the string, ignore
            if (pidx < host.length() - 1) {
                port = Integer.parseInt(host.substring(pidx + 1));
            }
            host = host.substring(0, pidx);
        }
        serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
    }
}

重新修改連線字串為"cdh003:2181,cdh005:2181,cdh006:2181/kafka",則截取出的chroot為 /kafka,connectString為cdh003:2181,cdh005:2181,cdh006:2181,然後再根據 ,和: 分割出多個連線的serverAddresses

測試獲取topic資訊成功,在伺服器上修改atlas-application.properties的配置 在這裡插入圖片描述 將配置檔案重新打入jar包

zip -u kafka-bridge-1.0.0.jar atlas-application.properties