1. 程式人生 > >【TDH】Kafka、Flume、Slipstream基本操作

【TDH】Kafka、Flume、Slipstream基本操作

【Kafka操作:在${KAFKA_HOME}/bin下執行Kafka操作】

1、在星環TDH叢集上操作Kafka的時候首先要進行相關的賦權操作

(1)賦予當前使用者(當前使用者以hive為例,可以使用kinit進行使用者的切換)操作叢集的許可權

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --cluster

2、建立topic

./kafka-broker-topics.sh --bootstrap-server node3:9092,node2:9092,node1:9092 --create --topic yxy --partitions 3 --replication-factor 3 --consumer.config ../config/consumer.properties

(1)賦予使用者生產資料的許可權

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --producer

(2)賦予使用者消費資料的許可權

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --consumer --group root

(3)檢視該topic具有哪些許可權

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --list --topic yxy

(4)列出所有的topic

./kafka-topics.sh --zookeeper node3:2181,node2:2181,node1:2181 --list

3、開啟生產者

./kafka-console-producer.sh --broker-list node3:9092,node2:9092,node1:9092 \
--topic yxy \
--producer.config ../config/producer.properties

4、開啟消費者

./kafka-console-consumer.sh --bootstrap-server node3:9092,node2:9092,node1:9092 \
--topic yxy \
--consumer.config ../config/consumer.properties

5、Kafka刪除topic操作

(1)在manager介面將delete.topic.enable配置成true,右上角配置服務後重啟Kafka元件。(僅在第一次刪除topic時操作)

(2)進入zookeeper裡面刪除/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/目錄下相關topic資料夾

         1)kubectl get pod |grep zookeeper
         2)kubectl exec -it zookeeper-server-zookeeper1-3543915313-1dmn1 bash
         3)cd /usr/lib/zookeeper/bin/
         4)./zkCli.sh 
         ./zookeeper-shell.sh node3:2181,node2:2181,node1:2181 (等同於以上四步操作),進入zookeeper命令列之後找到/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/下對應的topic檔案,刪除即可,執行quit退出zookeeper命令列(如果執行的是上面的四步操作,還需要在退出zookeeper命令列之後執行exit退出登入的介面)。

(3)manager介面搜尋kmq.log.dirs配置目錄(目前叢集該目錄為/hadoop/kmp)
        1)進入/hadoop/kmp刪除相關topic的目錄

 

【Slipstream操作:建立stream流並接收資料】

1、登入Slipstream

beeline -u 'jdbc:hive2://node2:10010/default' -n name -p passsword

2、建流

CREATE STREAM demos(id INT, letter STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  TBLPROPERTIES("topic"="yxy",
  "kafka.zookeeper"="node3:2181",
  "kafka.broker.list"="node3:9092",
  "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
  "transwarp.consumer.sasl.mechanism"="GSSAPI",
  "transwarp.consumer.sasl.kerberos.service.name"="kafka",
  "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka/[email protected]\""
  );

3、建表(表字段及型別必須與流一致)

CREATE TABLE demot(id INT, letter STRING);

4、觸發流

INSERT INTO demot SELECT * FROM demos;

5、列出正在執行的StreamJob

LIST STREAMJOBS;

6、停止某一個StreamJob

STOP STREAMJOB id;

 

【Flume操作:在${FLUME_HOME}下執行Flume操作】

1、啟動Flume

nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/flume-topic-test.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf &