1. 程式人生 > 實用技巧 >大資料實戰(八十五):電商數倉(六十九)安全之Kerberos安全認證(五) Kerberos安全環境實操

大資料實戰(八十五):電商數倉(六十九)安全之Kerberos安全認證(五) Kerberos安全環境實操

0 Kerberos安全環境實操

在啟用Kerberos之後,系統與系統(flume-kafka之間的通訊,以及使用者與系統(user-hdfs之間的通訊都需要先進行安全認證,認證通過之後方可進行通訊。

故在啟用Kerberos後,數倉中使用的指令碼等,均需要加入一步安全認證的操作,才能正常工作。

1使用者訪問服務認證

開啟Kerberos安全認證之後,日常的訪問服務(例如訪問HDFS,消費Kafkatopic等)都需要先進行安全認證

1)在Kerberos資料庫中建立使用者主體/例項

[root@hadoop102 ~]# kadmin.local -q "addprinc hive/[email protected]"

2)進行使用者認證

[root@hadoop102 ~]# kinit hive/[email protected]

3)訪問HDFS

[root@hadoop102 ~]# hadoop fs -ls /
Found 4 items
drwxr-xr-x   - hive hive                0 2019-10-02 01:29 /origin_data
drwxrwxrwt   - hdfs supergroup          0 2019-10-03 00:20 /tmp
drwxr-xr-x   - hdfs supergroup          0 2019-10-02 01:35 /user
drwxr
-xr-x - hive hive 0 2019-10-02 01:38 /warehouse

4hive查詢

[root@hadoop102 ~]# hive 
WARNING: Use "yarn jar" to launch YARN applications.

Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/jars/hive-common-2.1.1-cdh6.2.1.jar!/hive-log4j2.properties Async: false
WARNING: Hive CLI is deprecated and migration to Beeline is recommended. hive>

5)消費Kafkatopic

1)修改Kafka配置

1Kafka的配置項搜尋“security.inter.broker.protocol”,設定為SALS_PLAINTEXT。

2Kafka的配置項搜尋“ssl.client.auth”,設定為none。

2)建立jaas.conf檔案

[root@hadoop102hive]# vim /var/lib/hive/jaas.conf

檔案內容如下

KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true;

};

3)建立consumer.properties檔案

[root@hadoop102conf]# vim /etc/kafka/conf/consumer.properties

檔案內容如下

security.protocol=SASL_PLAINTEXT

sasl.kerberos.service.name=kafka

4)宣告jass.conf檔案路徑

[root@hadoop102conf]# export KAFKA_OPTS="-Djava.security.auth.login.config=/var/lib/hive/jaas.conf"

5)使用kafka-console-consumer消費Kafkatopic資料

[root@hadoop102~]# kafka-console-consumer --bootstrap-server hadoop102:9092 --topic topic_start --from-beginning --consumer.config /etc/kafka/conf/consumer.properties

6HDFSWebUI瀏覽器認證

我們設定CDH支援kerberos後會出現下圖所示的情況:

可以登入9870,但是不能檢視目錄及檔案,這是由於我們本地環境沒有通過認證。

接下來我們設定本地驗證。

注意:由於瀏覽器限制問題,我們這裡使用火狐瀏覽器,其他如:谷歌,ie等均會出現問題。

1下載火狐

2)設定瀏覽器

1開啟火狐瀏覽器,在位址列輸入:about:config,進入設定頁面。

2搜尋“network.negotiate-auth.trusted-uris”,修改值為自己的伺服器主機名。

3搜尋“network.auth.use-sspi”,雙擊將值變為false

3)安裝kfw

1安裝提供的kfw-4.1-amd64.msi。

2將叢集的/etc/krb5.conf檔案的內容複製到C:\ProgramData\MIT\Kerberos5\krb.ini中,刪除和路徑相關的配置。

[logging]

 [libdefaults]
  default_realm = HADOOP.COM
  dns_lookup_realm = false
  dns_lookup_kdc = false
  ticket_lifetime = 24h
  renew_lifetime = 7d
  forwardable = true
  udp_preference_limit = 1

[realms]
 HADOOP.COM = {
  kdc = hadoop102  
  admin_server = hadoop102
 }

[domain_realm]

3開啟MIT,輸入主體名和密碼:

4)測試

2使用者行為數倉

1)日誌採集Flume配置

日誌採集Flume,資料被髮送到了Kafka,該Flume相當於一個Kafka生產者。所以需要我們進行上述Kafka客戶端的安全認證。但是此處不需要我們進行手動配置,在開啟Kerberos後,CM會自動進行配置。

2)消費Kafka Flume配置

消費KafkaFlume,將資料從Kafka傳輸到HDFS,該Flume相當於一個Kafka消費者。所以也需要我們進行上述Kafka客戶端的安全認證(無需手動認證,CM會自動進行配置)。除此之外,我們還需要進行HDFS客戶端的安全認證,這需要我們手動配置。

1)生成hive使用者的keytab檔案

使用者認證的方式有“輸入密碼”和“使用keytab金鑰檔案”兩種方式,此處需使用keytab金鑰檔案進行認證。

[root@hadoop102 hive]# kadmin.local -q "xst -k /var/lib/hive/hive.keytab hive/[email protected]"

2)增加讀許可權

chmod+r/var/lib/hive/hive.keytab

3)分發keytab檔案

xsync/var/lib/hive/hive.keytab

(4)修改flumeagent配置檔案

## 元件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000

## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000

## sink1
a1.sinks.k1.type = hdfs
#a1.sinks.k1.hdfs.proxyUser=hive
a1.sinks.k1.hdfs.kerberosPrincipal=hive/[email protected]
a1.sinks.k1.hdfs.kerberosKeytab=/var/lib/hive/hive.keytab
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
#a1.sinks.k2.hdfs.proxyUser=hive
a1.sinks.k2.hdfs.kerberosPrincipal=hive/[email protected]
a1.sinks.k2.hdfs.kerberosKeytab=/var/lib/hive/hive.keytab
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second

## 不要產生大量小檔案
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制輸出檔案是原生檔案。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

3ods

編輯ods層資料匯入指令碼

[root@hadoop102 bin]# vim ods_db.sh

內容如下

#!/bin/bash

kinit -kt /var/lib/hive/hive.keytab hive/hive
# 定義變數方便修改
APP=gmall

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
   do_date=$1
else 
   do_date=`date -d "-1 day" +%F`
fi 

echo "===日誌日期為 $do_date==="
sql="
load data inpath '/origin_data/gmall/log/topic_start/$do_date' into table "$APP".ods_start_log partition(dt='$do_date');

"

beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"

4dwd

編輯dwd資料匯入指令碼

[root@hadoop102 bin]# vim dwd_start_log.sh

內容如下

#!/bin/bash

kinit -kt /var/lib/hive/hive.keytab hive/hive

# 定義變數方便修改
APP=gmall

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi 

sql="
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table "$APP".dwd_start_log
PARTITION (dt='$do_date')
select 
    get_json_object(line,'$.mid') mid_id,
    get_json_object(line,'$.uid') user_id,
    get_json_object(line,'$.vc') version_code,
    get_json_object(line,'$.vn') version_name,
    get_json_object(line,'$.l') lang,
    get_json_object(line,'$.sr') source,
    get_json_object(line,'$.os') os,
    get_json_object(line,'$.ar') area,
    get_json_object(line,'$.md') model,
    get_json_object(line,'$.ba') brand,
    get_json_object(line,'$.sv') sdk_version,
    get_json_object(line,'$.g') gmail,
    get_json_object(line,'$.hw') height_width,
    get_json_object(line,'$.t') app_time,
    get_json_object(line,'$.nw') network,
    get_json_object(line,'$.ln') lng,
    get_json_object(line,'$.la') lat,
    get_json_object(line,'$.entry') entry,
    get_json_object(line,'$.open_ad_type') open_ad_type,
    get_json_object(line,'$.action') action,
    get_json_object(line,'$.loading_time') loading_time,
    get_json_object(line,'$.detail') detail,
    get_json_object(line,'$.extend1') extend1
from "$APP".ods_start_log 
where dt='$do_date';
"

beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"

5dws

編輯dws資料匯入指令碼

[root@hadoop102 bin]$ vim dws_log.sh

內容如下

!/bin/bash

kinit -kt /var/lib/hive/hive.keytab hive/hive

# 定義變數方便修改
APP=gmall

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi 


sql="
  set hive.exec.dynamic.partition.mode=nonstrict;

  insert overwrite table "$APP".dws_uv_detail_day partition(dt='$do_date')
  select  
    mid_id,
    concat_ws('|', collect_set(user_id)) user_id,
    concat_ws('|', collect_set(version_code)) version_code,
    concat_ws('|', collect_set(version_name)) version_name,
    concat_ws('|', collect_set(lang)) lang,
    concat_ws('|', collect_set(source)) source,
    concat_ws('|', collect_set(os)) os,
    concat_ws('|', collect_set(area)) area, 
    concat_ws('|', collect_set(model)) model,
    concat_ws('|', collect_set(brand)) brand,
    concat_ws('|', collect_set(sdk_version)) sdk_version,
    concat_ws('|', collect_set(gmail)) gmail,
    concat_ws('|', collect_set(height_width)) height_width,
    concat_ws('|', collect_set(app_time)) app_time,
    concat_ws('|', collect_set(network)) network,
    concat_ws('|', collect_set(lng)) lng,
    concat_ws('|', collect_set(lat)) lat
  from "$APP".dwd_start_log
  where dt='$do_date'  
  group by mid_id;
"

beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"

6ads

編輯ads資料匯入指令碼

[root@hadoop102 bin]#vim ads_uv_log.sh

內容如下

#!/bin/bash

kinit -kt /var/lib/hive/hive.keytab hive/hive

# 定義變數方便修改
APP=gmall

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi 

sql="
  set hive.exec.dynamic.partition.mode=nonstrict;

insert into table "$APP".ads_uv_count 
select  
  '$do_date' dt,
   daycount.ct
from 
(
   select  
      '$do_date' dt,
       count(*) ct
   from "$APP".dws_uv_detail_day
   where dt='$do_date'  
)daycount;
"

beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"

3業務數倉

1sqoop匯入

編輯sqoop匯入指令碼

[root@hadoop102 bin]# vim sqoop_import.sh

內容如下

#!/bin/bash
kinit -kt /var/lib/hive/hive.keytab hive/hive
#export HADOOP_USER_NAME=hive
db_date=$2
echo $db_date
db_name=gmall

import_data() {
sqoop import \
--connect jdbc:mysql://hadoop102:3306/$db_name \
--username root \
--password Atguigu.123456 \
--target-dir  /origin_data/$db_name/db/$1/$db_date \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--query "$2"' and  $CONDITIONS;'
}

import_sku_info(){
  import_data  "sku_info"  "select 
id, spu_id, price, sku_name, sku_desc, weight, tm_id,
category3_id, create_time 
  from sku_info  where 1=1"
}

import_user_info(){
  import_data "user_info" "select 
id, name, birthday, gender, email, user_level, 
create_time 
from user_info where 1=1"
}

import_base_category1(){
  import_data "base_category1" "select 
id, name from base_category1 where 1=1"
}

import_base_category2(){
  import_data "base_category2" "select 
id, name, category1_id from base_category2 where 1=1"
}

import_base_category3(){
  import_data "base_category3" "select id, name, category2_id from base_category3 where 1=1"
}

import_order_detail(){
  import_data   "order_detail"   "select 
    od.id, 
    order_id, 
    user_id, 
    sku_id, 
    sku_name, 
    order_price, 
    sku_num, 
    o.create_time  
  from order_info o , order_detail od 
  where o.id=od.order_id 
  and DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'"
}

import_payment_info(){
  import_data "payment_info"   "select 
    id,  
    out_trade_no, 
    order_id, 
    user_id, 
    alipay_trade_no, 
    total_amount,  
    subject, 
    payment_type, 
    payment_time 
  from payment_info 
  where DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"
}

import_order_info(){
  import_data   "order_info"   "select 
    id, 
    total_amount, 
    order_status, 
    user_id, 
    payment_way, 
    out_trade_no, 
    create_time, 
    operate_time  
  from order_info 
  where  (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"
}

case $1 in
  "base_category1")
     import_base_category1
;;
  "base_category2")
     import_base_category2
;;
  "base_category3")
     import_base_category3
;;
  "order_info")
     import_order_info
;;
  "order_detail")
     import_order_detail
;;
  "sku_info")
     import_sku_info
;;
  "user_info")
     import_user_info
;;
  "payment_info")
     import_payment_info
;;
   "all")
   import_base_category1
   import_base_category2
   import_base_category3
   import_order_info
   import_order_detail
   import_sku_info
   import_user_info
   import_payment_info
;;
esac
View Code

2ods

編輯ods層匯入指令碼

[root@hadoop102 bin]# vim ods_db.sh

內容如下

#!/bin/bash
kinit -kt /var/lib/hive/hive.keytab hive/hive
APP=gmall

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi

sql=" 
load data inpath '/origin_data/$APP/db/order_info/$do_date'  OVERWRITE into table "$APP".ods_order_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/order_detail/$do_date'  OVERWRITE into table "$APP".ods_order_detail partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/sku_info/$do_date'  OVERWRITE into table "$APP".ods_sku_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table "$APP".ods_user_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table "$APP".ods_payment_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table "$APP".ods_base_category1 partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table "$APP".ods_base_category2 partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table "$APP".ods_base_category3 partition(dt='$do_date'); 
"
beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
View Code

3dwd

編輯dwd層匯入指令碼

[root@hadoop102 bin]# vim dwd_db.sh

內容如下

#!/bin/bash

kinit -kt /var/lib/hive/hive.keytab hive/hive

# 定義變數方便修改
APP=gmall
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi 

sql="

set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table   "$APP".dwd_order_info partition(dt)
select * from "$APP".ods_order_info 
where dt='$do_date'  and id is not null;
 
insert overwrite table   "$APP".dwd_order_detail partition(dt)
select * from "$APP".ods_order_detail 
where dt='$do_date'   and id is not null;

insert overwrite table   "$APP".dwd_user_info partition(dt)
select * from "$APP".ods_user_info
where dt='$do_date'   and id is not null;
 
insert overwrite table   "$APP".dwd_payment_info partition(dt)
select * from "$APP".ods_payment_info
where dt='$do_date'  and id is not null;

insert overwrite table   "$APP".dwd_sku_info partition(dt)
select  
    sku.id,
    sku.spu_id, 
    sku.price,
    sku.sku_name,  
    sku.sku_desc,  
    sku.weight,  
    sku.tm_id,  
    sku.category3_id,  
    c2.id category2_id ,  
    c1.id category1_id,  
    c3.name category3_name,  
    c2.name category2_name,  
    c1.name category1_name,  
    sku.create_time,
    sku.dt
from
    "$APP".ods_sku_info sku 
join "$APP".ods_base_category3 c3 on sku.category3_id=c3.id 
    join "$APP".ods_base_category2 c2 on c3.category2_id=c2.id 
    join "$APP".ods_base_category1 c1 on c2.category1_id=c1.id 
where sku.dt='$do_date'  and c2.dt='$do_date'  
and  c3.dt='$do_date' and  c1.dt='$do_date' 
and sku.id is not null;
"
beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
View Code

4dws

編輯dws層匯入指令碼

[root@hadoop102 bin]# vim dws_db_wide.sh

內容如下

#!/bin/bash
kinit -kt /var/lib/hive/hive.keytab hive/hive
# 定義變數方便修改
APP=gmall

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi 

sql="
with  
tmp_order as
(
    select 
        user_id, 
        count(*)  order_count,
        sum(oi.total_amount) order_amount
    from "$APP".dwd_order_info oi
    where date_format(oi.create_time,'yyyy-MM-dd')='$do_date'
    group by user_id
) ,
tmp_payment as
(
    select
        user_id, 
        sum(pi.total_amount) payment_amount, 
        count(*) payment_count 
    from "$APP".dwd_payment_info pi 
    where date_format(pi.payment_time,'yyyy-MM-dd')='$do_date'
    group by user_id
)
insert overwrite table "$APP".dws_user_action partition(dt='$do_date')
select
    user_actions.user_id,
    sum(user_actions.order_count),
    sum(user_actions.order_amount),
    sum(user_actions.payment_count),
    sum(user_actions.payment_amount)
from 
(
    select
        user_id,
        order_count,
        order_amount,
        0 payment_count,
        0 payment_amount
    from tmp_order

    union all
    select
        user_id,
        0 order_count,
        0 order_amount,
        payment_count,
        payment_amount
    from tmp_payment
 ) user_actions
group by user_id;
"

beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
View Code

5ads

編輯ads層匯入指令碼

[root@hadoop102 bin]# vim ads_db_gmv.sh

內容如下

#!/bin/bash
kinit -kt /var/lib/hive/hive.keytab hive/hive

# 定義變數方便修改
APP=gmall

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`
fi 

sql="
insert into table "$APP".ads_gmv_sum_day 
select 
    '$do_date' dt,
    sum(order_count)  gmv_count,
    sum(order_amount) gmv_amount,
    sum(payment_amount) payment_amount 
from "$APP".dws_user_action 
where dt ='$do_date'
group by dt;
"

beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/[email protected]" -n hive -e "$sql"
View Code

6sqoop匯出

編輯sqoop匯出指令碼

[root@hadoop102 bin]# vim sqoop_export.sh

內容如下

#!/bin/bash
kinit -kt /var/lib/hive/hive.keytab hive/hive

db_name=gmall

export_data() {
sqoop export \
--connect "jdbc:mysql://hadoop102:3306/${db_name}?useUnicode=true&characterEncoding=utf-8"  \
--username root \
--password Atguigu.123456 \
--table $1 \
--num-mappers 1 \
--export-dir /warehouse/$db_name/ads/$1 \
--input-fields-terminated-by "\t" \
--update-mode allowinsert \
--update-key $2 \
--input-null-string '\\N'    \
--input-null-non-string '\\N'
}

case $1 in
  "ads_uv_count")
     export_data "ads_uv_count" "dt"
;;
  "ads_user_action_convert_day") 
     export_data "ads_user_action_convert_day" "dt"
;;
  "ads_gmv_sum_day")
     export_data "ads_gmv_sum_day" "dt"
;;
   "all")
     export_data "ads_uv_count" "dt"
     export_data "ads_user_action_convert_day" "dt"
     export_data "ads_gmv_sum_day" "dt"
;;
esac
View Code