【TDH】Kafka、Flume、Slipstream基本操作
【Kafka操作:在${KAFKA_HOME}/bin下執行Kafka操作】
1、在星環TDH叢集上操作Kafka的時候首先要進行相關的賦權操作
(1)賦予當前使用者(當前使用者以hive為例,可以使用kinit進行使用者的切換)操作叢集的許可權
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --cluster
2、建立topic
./kafka-broker-topics.sh --bootstrap-server node3:9092,node2:9092,node1:9092 --create --topic yxy --partitions 3 --replication-factor 3 --consumer.config ../config/consumer.properties
(1)賦予使用者生產資料的許可權
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --producer
(2)賦予使用者消費資料的許可權
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --consumer --group root
(3)檢視該topic具有哪些許可權
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --list --topic yxy
(4)列出所有的topic
./kafka-topics.sh --zookeeper node3:2181,node2:2181,node1:2181 --list
3、開啟生產者
./kafka-console-producer.sh --broker-list node3:9092,node2:9092,node1:9092 \ --topic yxy \ --producer.config ../config/producer.properties
4、開啟消費者
./kafka-console-consumer.sh --bootstrap-server node3:9092,node2:9092,node1:9092 \
--topic yxy \
--consumer.config ../config/consumer.properties
5、Kafka刪除topic操作
(1)在manager介面將delete.topic.enable配置成true,右上角配置服務後重啟Kafka元件。(僅在第一次刪除topic時操作)
(2)進入zookeeper裡面刪除/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/目錄下相關topic資料夾
1)kubectl get pod |grep zookeeper
2)kubectl exec -it zookeeper-server-zookeeper1-3543915313-1dmn1 bash
3)cd /usr/lib/zookeeper/bin/
4)./zkCli.sh
./zookeeper-shell.sh node3:2181,node2:2181,node1:2181 (等同於以上四步操作),進入zookeeper命令列之後找到/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/下對應的topic檔案,刪除即可,執行quit退出zookeeper命令列(如果執行的是上面的四步操作,還需要在退出zookeeper命令列之後執行exit退出登入的介面)。
(3)manager介面搜尋kmq.log.dirs配置目錄(目前叢集該目錄為/hadoop/kmp)
1)進入/hadoop/kmp刪除相關topic的目錄
【Slipstream操作:建立stream流並接收資料】
1、登入Slipstream
beeline -u 'jdbc:hive2://node2:10010/default' -n name -p passsword
2、建流
CREATE STREAM demos(id INT, letter STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="yxy",
"kafka.zookeeper"="node3:2181",
"kafka.broker.list"="node3:9092",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.mechanism"="GSSAPI",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka/[email protected]\""
);
3、建表(表字段及型別必須與流一致)
CREATE TABLE demot(id INT, letter STRING);
4、觸發流
INSERT INTO demot SELECT * FROM demos;
5、列出正在執行的StreamJob
LIST STREAMJOBS;
6、停止某一個StreamJob
STOP STREAMJOB id;
【Flume操作:在${FLUME_HOME}下執行Flume操作】
1、啟動Flume
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/flume-topic-test.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf &