k8s叢集中 spark訪問hbase中資料
hbase資料分割槽是按照region進行的,分割槽的location就是各個region的location。那麼後續分配executor時可以按照region所在機器分配對應executor,直接在本機讀取資料計算。
我們先來往hbase裡面寫兩個資料
hbase shell whoami list # 如果list出錯,說正在初始化中,要等待,可以在dashboard中看 exists 't1' create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2} put 't1','rowkey001','f1:col1','value01' put 't1','rowkey002','f1:col1','value01' put 't1','rowkey003','f1:col1','value01' put 't1','rowkey004','f1:col1','value01'
我們使用python來實現連線
python程式碼如下
# 這些函式只在driver驅動中執行. 只有DRR的mapreduce才會在exec程式中執行 # 需要設定hbase的hostname和subdomain 並在dns增加一條重定向 from pyspark.sql import SparkSession from pyspark.sql import SQLContext spark = SparkSession.builder.appName("hbase_test").getOrCreate() sc = spark.sparkContext zookeeper = 'zookeeper-1.cloudai-2.svc.cluster.local,zookeeper-2.cloudai-2.svc.cluster.local,zookeeper-3.cloudai-2.svc.cluster.local' table = 't1' # # 讀取 conf = { "hbase.zookeeper.quorum": zookeeper, "hbase.zookeeper.property.clientPort":"2181", "hbase.regionserver.port":"60010", # "hbase.master":"10.233.9.21:60000", # "hbase.master.port":"60000", "zookeeper.znode.parent":"/hbase", "hbase.defaults.for.version.skip":"true", "hbase.mapreduce.inputtable": table } keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf) count = hbase_rdd.count() hbase_rdd.cache() output = hbase_rdd.collect() for (k, v) in output: print((k, v))
在程式碼中我們配置了zookeeper的地址, spark會先訪問spark來獲取hbase的所有資訊,包含master,regionserver的資訊.
為了保證正確,我們可以先自己查詢一遍看看是否正確.
進入zookeeper的pod,
./bin/zkCli.sh # 計入zookeeper命令列 ls / # 檢視zookeeper根目錄儲存: 包含zookeeper hbase兩個資料夾(資料節點) ls2 /zookeeper 檢視zookeeper節點目錄 ls2 /hbase 檢視hbase節點資訊: [meta-region-server, backup-masters, table, draining, region-in-transition, table-lock, running, master, namespace, hbaseid, online-snapshot, replication, splitWAL, recovering-regions, rs, flush-table-proc] 檢視hbase叢集在zookeeper記錄的資訊,比如:regionserver1-slave-1,regionserver2-slave-2 ls2 /hbase/rs [hbase-master-deployment-1-ddb859944-ctbrm,16201,1541399059310] 這裡可以看出, 記錄在zookeeper中的資料是pod的主機名hostname 而不是ip 表鎖節點會有所有表。 [zk: localhost:2181(CONNECTED) 10] ls2 /hbase/table-lock [TestTable, product, device, t1, rec_history, hbase:namespace, face_detect] cZxid = 0x100000009 ctime = Wed Sep 19 07:25:49 UTC 2018 mZxid = 0x100000009 mtime = Wed Sep 19 07:25:49 UTC 2018 pZxid = 0x300000749 cversion = 21 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 7 檢視所有表 [zk: localhost:2181(CONNECTED) 11] ls2 /hbase/table [TestTable, product, t1, rec_history, device, hbase:meta, hbase:namespace, face_detect] cZxid = 0x100000006 ctime = Wed Sep 19 07:25:49 UTC 2018 mZxid = 0x100000006 mtime = Wed Sep 19 07:25:49 UTC 2018 pZxid = 0x30000074b cversion = 22 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 8 檢視hbase的meta表資訊,包含server資訊。 [zk: localhost:2181(CONNECTED) 14] get /hbase/table/hbase:meta ?master:60000o,?OG?e?PBUF cZxid = 0x100000029 ctime = Wed Sep 19 07:25:57 UTC 2018 mZxid = 0x10000002a mtime = Wed Sep 19 07:25:57 UTC 2018 pZxid = 0x100000029 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 31 numChildren = 0
由於hbase的通訊是通過hostname連線的. 所以我們在python程式碼中設定了將hostname寫入到hosts檔案,但是這個程式碼只在driver驅動中執行,只有DRR資料才會分解到exec驅動中執行. 所以只有driver的pod中成功修改了hosts 而exec仍然無法解析hostname
有幾種方案:
1 將hbase的hostname和pod的ip想辦法能配置到映象中 因為pod的ip可能是變化的,所以必須以配置的方式新增進去.
2 新增代理,或者自定義DNS 將hbase的hostname代理到pod的ip
3 重定向 將hostname 重定向到pod的訪問地址
{pod-name}.{subdomain}.{namespace}.svc.cluster.local
4 建立service-headless服務的名稱和hbase的hostname設定成一樣,並且讓spark和hbase在同一個名稱空間下 ( 這種最簡單 )
我們先按照第4種方案在hbase所在的名稱空間建立和hbase的pod的hostname相同的service-headless,並且讓spark也在這個名稱空間下執行
apiVersion: v1
kind: Service
metadata:
name: hbase-master-1
namespace: cloudai-2
spec:
selector:
app: hbase-master-1
clusterIP: None
ports:
- name: rpc
port: 60000
targetPort: 60000 # 配置的hbase的連結埠
- name: info
port: 60001 # hbase執行狀態的網頁查詢埠
targetPort: 60001
- name: region
port: 60010
targetPort: 60010
- name: regioninfo
port: 60011
targetPort: 60011
- name: thrift
port: 9090 # thrift伺服器ip,讓外界通過thrift來訪問hbase
targetPort: 9090
如果我們想讓spark在一個獨立的名稱空間執行,就要另尋它法了.
建立hbase的deployment時指定pod的hostname和subdomain
hostname: hbase-master-1 # 設定主機名
subdomain: hbase-headless # 設定子域名
為subdomain建一個service-headless
apiVersion: v1
kind: Service
metadata:
name: hbase-headless
namespace: cloudai-2
spec:
selector:
app: hbase-master-1
clusterIP: None
ports:
- name: rpc
port: 60000
targetPort: 60000 # 配置的hbase的連結埠
- name: info
port: 60001 # hbase執行狀態的網頁查詢埠
targetPort: 60001
- name: region
port: 60010
targetPort: 60010
- name: regioninfo
port: 60011
targetPort: 60011
- name: thrift
port: 9090 # thrift伺服器ip,讓外界通過thrift來訪問hbase
targetPort: 9090
這樣就能訪問hbase的pod了,只不過域名必須是
hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local
下面我們來設定代理, 在使用hbase的hostname訪問pod時也能正常訪問到hbase的pod.
我這裡在dns中增加了一個重定向解析記錄
我這裡的dns使用的coredns,在建立Corefile的configmap時使用下面的方法
apiVersion: v1
data:
Corefile: |
.:53 {
errors
health
rewrite name hbase-master-1 hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local
kubernetes cluster.local in-addr.arpa ip6.arpa {
pods insecure
upstream
fallthrough in-addr.arpa ip6.arpa
}
prometheus :9153
proxy . /etc/resolv.conf
cache 30
reload
}
kind: ConfigMap
metadata:
name: coredns
namespace: kube-system
這樣dns在解析hbase-master-1時返回的是hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local的ip
如果你使用的是kube-dns,可以參考https://blog.csdn.net/luanpeng825485697/article/details/84108166