大資料實戰(八十五):電商數倉(六十九)安全之Kerberos安全認證(五) Kerberos安全環境實操
0 Kerberos安全環境實操
在啟用Kerberos之後,系統與系統(flume-kafka)之間的通訊,以及使用者與系統(user-hdfs)之間的通訊都需要先進行安全認證,認證通過之後方可進行通訊。
故在啟用Kerberos後,數倉中使用的指令碼等,均需要加入一步安全認證的操作,才能正常工作。
1使用者訪問服務認證
開啟Kerberos安全認證之後,日常的訪問服務(例如訪問HDFS,消費Kafkatopic等)都需要先進行安全認證
1)在Kerberos資料庫中建立使用者主體/例項
[root@hadoop102 ~]# kadmin.local -q "addprinc hive/[email protected]"
2)進行使用者認證
[root@hadoop102 ~]# kinit hive/[email protected]
3)訪問HDFS
[root@hadoop102 ~]# hadoop fs -ls / Found 4 items drwxr-xr-x - hive hive 0 2019-10-02 01:29 /origin_data drwxrwxrwt - hdfs supergroup 0 2019-10-03 00:20 /tmp drwxr-xr-x - hdfs supergroup 0 2019-10-02 01:35 /user drwxr-xr-x - hive hive 0 2019-10-02 01:38 /warehouse
4)hive查詢
[root@hadoop102 ~]# hive WARNING: Use "yarn jar" to launch YARN applications. Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/jars/hive-common-2.1.1-cdh6.2.1.jar!/hive-log4j2.properties Async: falseWARNING: Hive CLI is deprecated and migration to Beeline is recommended. hive>
5)消費Kafkatopic
(1)修改Kafka配置
1在Kafka的配置項搜尋“security.inter.broker.protocol”,設定為SALS_PLAINTEXT。
2在Kafka的配置項搜尋“ssl.client.auth”,設定為none。
(2)建立jaas.conf檔案
[root@hadoop102hive]# vim /var/lib/hive/jaas.conf
檔案內容如下
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
(3)建立consumer.properties檔案
[root@hadoop102conf]# vim /etc/kafka/conf/consumer.properties
檔案內容如下
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
(4)宣告jass.conf檔案路徑
[root@hadoop102conf]# export KAFKA_OPTS="-Djava.security.auth.login.config=/var/lib/hive/jaas.conf"
(5)使用kafka-console-consumer消費Kafkatopic資料
[root@hadoop102~]# kafka-console-consumer --bootstrap-server hadoop102:9092 --topic topic_start --from-beginning --consumer.config /etc/kafka/conf/consumer.properties
6)HDFSWebUI瀏覽器認證
我們設定CDH支援kerberos後會出現下圖所示的情況:
可以登入9870,但是不能檢視目錄及檔案,這是由於我們本地環境沒有通過認證。
接下來我們設定本地驗證。
注意:由於瀏覽器限制問題,我們這裡使用火狐瀏覽器,其他如:谷歌,ie等均會出現問題。
(1)下載火狐
(2)設定瀏覽器
1開啟火狐瀏覽器,在位址列輸入:about:config,進入設定頁面。
2搜尋“network.negotiate-auth.trusted-uris”,修改值為自己的伺服器主機名。
3搜尋“network.auth.use-sspi”,雙擊將值變為false。
(3)安裝kfw
1安裝提供的kfw-4.1-amd64.msi。
2將叢集的/etc/krb5.conf檔案的內容複製到C:\ProgramData\MIT\Kerberos5\krb.ini中,刪除和路徑相關的配置。
[logging] [libdefaults] default_realm = HADOOP.COM dns_lookup_realm = false dns_lookup_kdc = false ticket_lifetime = 24h renew_lifetime = 7d forwardable = true udp_preference_limit = 1 [realms] HADOOP.COM = { kdc = hadoop102 admin_server = hadoop102 } [domain_realm]
3開啟MIT,輸入主體名和密碼:
(4)測試
2使用者行為數倉
1)日誌採集Flume配置
日誌採集Flume,資料被髮送到了Kafka,該Flume相當於一個Kafka生產者。所以需要我們進行上述Kafka客戶端的安全認證。但是此處不需要我們進行手動配置,在開啟Kerberos後,CM會自動進行配置。
2)消費Kafka Flume配置
消費KafkaFlume,將資料從Kafka傳輸到HDFS,該Flume相當於一個Kafka消費者。所以也需要我們進行上述Kafka客戶端的安全認證(無需手動認證,CM會自動進行配置)。除此之外,我們還需要進行HDFS客戶端的安全認證,這需要我們手動配置。
(1)生成hive使用者的keytab檔案
使用者認證的方式有“輸入密碼”和“使用keytab金鑰檔案”兩種方式,此處需使用keytab金鑰檔案進行認證。
[root@hadoop102 hive]# kadmin.local -q "xst -k /var/lib/hive/hive.keytab hive/[email protected]"
(2)增加讀許可權
chmod+r/var/lib/hive/hive.keytab
(3)分發keytab檔案
xsync/var/lib/hive/hive.keytab
(4)修改flumeagent配置檔案
## 元件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs #a1.sinks.k1.hdfs.proxyUser=hive a1.sinks.k1.hdfs.kerberosPrincipal=hive/[email protected] a1.sinks.k1.hdfs.kerberosKeytab=/var/lib/hive/hive.keytab a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs #a1.sinks.k2.hdfs.proxyUser=hive a1.sinks.k2.hdfs.kerberosPrincipal=hive/[email protected] a1.sinks.k2.hdfs.kerberosKeytab=/var/lib/hive/hive.keytab a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要產生大量小檔案 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制輸出檔案是原生檔案。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼裝 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2
3)ods層
編輯ods層資料匯入指令碼
[root@hadoop102 bin]# vim ods_db.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定義變數方便修改 APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi echo "===日誌日期為 $do_date===" sql=" load data inpath '/origin_data/gmall/log/topic_start/$do_date' into table "$APP".ods_start_log partition(dt='$do_date'); " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
4)dwd層
編輯dwd資料匯入指令碼
[root@hadoop102 bin]# vim dwd_start_log.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定義變數方便修改 APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_start_log PARTITION (dt='$do_date') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from "$APP".ods_start_log where dt='$do_date'; " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
5)dws層
編輯dws資料匯入指令碼
[root@hadoop102 bin]$ vim dws_log.sh
內容如下
!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定義變數方便修改 APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dws_uv_detail_day partition(dt='$do_date') select mid_id, concat_ws('|', collect_set(user_id)) user_id, concat_ws('|', collect_set(version_code)) version_code, concat_ws('|', collect_set(version_name)) version_name, concat_ws('|', collect_set(lang)) lang, concat_ws('|', collect_set(source)) source, concat_ws('|', collect_set(os)) os, concat_ws('|', collect_set(area)) area, concat_ws('|', collect_set(model)) model, concat_ws('|', collect_set(brand)) brand, concat_ws('|', collect_set(sdk_version)) sdk_version, concat_ws('|', collect_set(gmail)) gmail, concat_ws('|', collect_set(height_width)) height_width, concat_ws('|', collect_set(app_time)) app_time, concat_ws('|', collect_set(network)) network, concat_ws('|', collect_set(lng)) lng, concat_ws('|', collect_set(lat)) lat from "$APP".dwd_start_log where dt='$do_date' group by mid_id; " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
6)ads層
編輯ads資料匯入指令碼
[root@hadoop102 bin]#vim ads_uv_log.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定義變數方便修改 APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert into table "$APP".ads_uv_count select '$do_date' dt, daycount.ct from ( select '$do_date' dt, count(*) ct from "$APP".dws_uv_detail_day where dt='$do_date' )daycount; " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
3業務數倉
1)sqoop匯入
編輯sqoop匯入指令碼
[root@hadoop102 bin]# vim sqoop_import.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive #export HADOOP_USER_NAME=hive db_date=$2 echo $db_date db_name=gmall import_data() { sqoop import \ --connect jdbc:mysql://hadoop102:3306/$db_name \ --username root \ --password Atguigu.123456 \ --target-dir /origin_data/$db_name/db/$1/$db_date \ --delete-target-dir \ --num-mappers 1 \ --fields-terminated-by "\t" \ --query "$2"' and $CONDITIONS;' } import_sku_info(){ import_data "sku_info" "select id, spu_id, price, sku_name, sku_desc, weight, tm_id, category3_id, create_time from sku_info where 1=1" } import_user_info(){ import_data "user_info" "select id, name, birthday, gender, email, user_level, create_time from user_info where 1=1" } import_base_category1(){ import_data "base_category1" "select id, name from base_category1 where 1=1" } import_base_category2(){ import_data "base_category2" "select id, name, category1_id from base_category2 where 1=1" } import_base_category3(){ import_data "base_category3" "select id, name, category2_id from base_category3 where 1=1" } import_order_detail(){ import_data "order_detail" "select od.id, order_id, user_id, sku_id, sku_name, order_price, sku_num, o.create_time from order_info o , order_detail od where o.id=od.order_id and DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'" } import_payment_info(){ import_data "payment_info" "select id, out_trade_no, order_id, user_id, alipay_trade_no, total_amount, subject, payment_type, payment_time from payment_info where DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'" } import_order_info(){ import_data "order_info" "select id, total_amount, order_status, user_id, payment_way, out_trade_no, create_time, operate_time from order_info where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')" } case $1 in "base_category1") import_base_category1 ;; "base_category2") import_base_category2 ;; "base_category3") import_base_category3 ;; "order_info") import_order_info ;; "order_detail") import_order_detail ;; "sku_info") import_sku_info ;; "user_info") import_user_info ;; "payment_info") import_payment_info ;; "all") import_base_category1 import_base_category2 import_base_category3 import_order_info import_order_detail import_sku_info import_user_info import_payment_info ;; esacView Code
2)ods層
編輯ods層匯入指令碼
[root@hadoop102 bin]# vim ods_db.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table "$APP".ods_order_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table "$APP".ods_order_detail partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table "$APP".ods_sku_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table "$APP".ods_user_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table "$APP".ods_payment_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table "$APP".ods_base_category1 partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table "$APP".ods_base_category2 partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table "$APP".ods_base_category3 partition(dt='$do_date'); " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"View Code
3)dwd層
編輯dwd層匯入指令碼
[root@hadoop102 bin]# vim dwd_db.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定義變數方便修改 APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_order_info partition(dt) select * from "$APP".ods_order_info where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_order_detail partition(dt) select * from "$APP".ods_order_detail where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_user_info partition(dt) select * from "$APP".ods_user_info where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_payment_info partition(dt) select * from "$APP".ods_payment_info where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_sku_info partition(dt) select sku.id, sku.spu_id, sku.price, sku.sku_name, sku.sku_desc, sku.weight, sku.tm_id, sku.category3_id, c2.id category2_id , c1.id category1_id, c3.name category3_name, c2.name category2_name, c1.name category1_name, sku.create_time, sku.dt from "$APP".ods_sku_info sku join "$APP".ods_base_category3 c3 on sku.category3_id=c3.id join "$APP".ods_base_category2 c2 on c3.category2_id=c2.id join "$APP".ods_base_category1 c1 on c2.category1_id=c1.id where sku.dt='$do_date' and c2.dt='$do_date' and c3.dt='$do_date' and c1.dt='$do_date' and sku.id is not null; " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"View Code
4)dws層
編輯dws層匯入指令碼
[root@hadoop102 bin]# vim dws_db_wide.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定義變數方便修改 APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" with tmp_order as ( select user_id, count(*) order_count, sum(oi.total_amount) order_amount from "$APP".dwd_order_info oi where date_format(oi.create_time,'yyyy-MM-dd')='$do_date' group by user_id ) , tmp_payment as ( select user_id, sum(pi.total_amount) payment_amount, count(*) payment_count from "$APP".dwd_payment_info pi where date_format(pi.payment_time,'yyyy-MM-dd')='$do_date' group by user_id ) insert overwrite table "$APP".dws_user_action partition(dt='$do_date') select user_actions.user_id, sum(user_actions.order_count), sum(user_actions.order_amount), sum(user_actions.payment_count), sum(user_actions.payment_amount) from ( select user_id, order_count, order_amount, 0 payment_count, 0 payment_amount from tmp_order union all select user_id, 0 order_count, 0 order_amount, payment_count, payment_amount from tmp_payment ) user_actions group by user_id; " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"View Code
5)ads層
編輯ads層匯入指令碼
[root@hadoop102 bin]# vim ads_db_gmv.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定義變數方便修改 APP=gmall # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" insert into table "$APP".ads_gmv_sum_day select '$do_date' dt, sum(order_count) gmv_count, sum(order_amount) gmv_amount, sum(payment_amount) payment_amount from "$APP".dws_user_action where dt ='$do_date' group by dt; " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"View Code
6)sqoop匯出
編輯sqoop匯出指令碼
[root@hadoop102 bin]# vim sqoop_export.sh
內容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive db_name=gmall export_data() { sqoop export \ --connect "jdbc:mysql://hadoop102:3306/${db_name}?useUnicode=true&characterEncoding=utf-8" \ --username root \ --password Atguigu.123456 \ --table $1 \ --num-mappers 1 \ --export-dir /warehouse/$db_name/ads/$1 \ --input-fields-terminated-by "\t" \ --update-mode allowinsert \ --update-key $2 \ --input-null-string '\\N' \ --input-null-non-string '\\N' } case $1 in "ads_uv_count") export_data "ads_uv_count" "dt" ;; "ads_user_action_convert_day") export_data "ads_user_action_convert_day" "dt" ;; "ads_gmv_sum_day") export_data "ads_gmv_sum_day" "dt" ;; "all") export_data "ads_uv_count" "dt" export_data "ads_user_action_convert_day" "dt" export_data "ads_gmv_sum_day" "dt" ;; esacView Code