python實現Phoenix批量匯入資料
官網文件:
Phoenix provides two methods for bulk loading data into Phoenix tables:
Single-threaded client loading tool for CSV formatted data via the psql command
MapReduce-based bulk load tool for CSV and JSON formatted data
The psql tool is typically appropriate for tens of megabytes, while the MapReduce-based loader is typically better for larger load volumes.
上述大意為:phoenix有兩種方式供批量寫資料。一種是單執行緒psql方式,另一種是mr分散式。單執行緒適合一次寫入十來兆的檔案,mr方式更加適合寫大批量資料。
下面分別用兩種方式進行測試批量寫資料
準備階段
1、 建立phoenix表(對應的hbase表並不存在)
CREATE TABLE example (
my_pk bigint not null,
m.first_name varchar(50),
m.last_name varchar(50)
CONSTRAINT pk PRIMARY KEY (my_pk));
2、建立二級索引
create index example_first_name_index on example(m.first_name);
3、建立data.csv檔案
# -*- coding: UTF-8 -*- import csv #python2可以用file替代open,追加改模式w 為a with open("test.csv","w") as csvfile: writer = csv.writer(csvfile) #先寫入columns_name # writer.writerow(["class_id","f.student_info"]) # 寫入多行用writerows,分批寫入用writerow # writer.writerows([[1,'aaaa'],[2,'bbbbb'],[3,'ccccccc']]) list_all = [] for i in range(0, 3): list=[] list.append(i) list.append(str(i)+'aaaaaa') list_all.append(list) writer.writerows(list_all) csvFile.close()
4、上傳至hdfs
hadoop fs -rm /kk/kk_test.csv
hadoop fs -put /root/kangkai/kk_test.csv /kk
1.單執行緒psql方式如下:
[[email protected] Templates]#
/usr/hdp/2.5.3.0-37/phoenix/bin/psql.py -t EXAMPLE hdp14:2181 /root/Templates/data.csv
注:
(1)/root/Templates/data.csv為本地檔案
(2) hdp14:2181為zookeeper對應的主機以及埠
(3) 上述語句還支援不少引數,如-t為表名,-d為檔案內容分割符,預設為英文符的逗號。
驗證資料是否寫入正常以及索引表是否有進行同步更新
通過上述結果可知批量匯入資料正常以及批量匯入資料是會自動更新索引表的。
2. mr批量寫資料方式
[[email protected] ~]# hadoop jar /home/hadoop/apache-phoenix-4.14.0-cdh5.11.2-bin/phoenix-4.14.0-cdh5.11.2-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table kk_test --input /kk/kk_test.csv
注:
1、官網指出如果為phoenix4.0及以上要用如下方式(HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar phoenix--client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool –table EXAMPLE –input /data/example.csv)
2、用hadoop命令建立檔案上傳檔案等操作都用root使用者執行,/tmp/YCB/data.csv為hdfs上對應的檔案路徑
3、該命令可隨意挑叢集中一臺機器,當然也可以通過指定具體機器執行,如新增-z 機器:埠便可(HADOOP_CLASSPATH=/usr/hdp/2.5.3.0-37/hbase/lib/hbase-protocol.jar:/usr/hdp/2.5.3.0-37/hbase/conf/ hadoop jar /usr/hdp/2.5.3.0-37/phoenix/phoenix-4.7.0.2.5.3.0-37-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool -z hdp15:2181 –table EXAMPLE –input /tmp/YCB/data.csv)
驗證結果:
0: jdbc:phoenix:hdp14,hdp15> SELECT * FROM example1_first_name_index;
+---------------+---------+
| M:FIRST_NAME | :MY_PK |
+---------------+---------+
| Joddhn | 12345 |
| Joddhn | 123452 |
| Maryddd | 67890 |
| Maryddd | 678902 |
+---------------+---------+
4 rows selected (0.042 seconds)
0: jdbc:phoenix:hdp14,hdp15> SELECT * FROM example1;
+---------+-------------+---------------+
| MY_PK | FIRST_NAME | LAST_NAME |
+---------+-------------+---------------+
| 12345 | Joddhn | Dois |
| 67890 | Maryddd | Poppssssins |
| 123452 | Joddhn | Dois |
| 678902 | Maryddd | Poppssssins2 |
+---------+-------------+---------------+
由此可知,匯入資料的同時,可以建立索引。
測試批量匯入的速度
在各環境一般的情況下(5個節點,64G記憶體),大概批量寫入200萬資料用時兩分鐘左右。
小結:
1、速度:
CSV data can be bulk loaded with built in utility named psql. Typical upsert rates are 20K - 50K rows per second (depends on how wide are the rows).
解釋上述意思是:通過bulk loaded 的方式批量寫資料速度大概能達到20K-50K每秒。具體這個數值筆者也只是粗糙測試過,速度也還算挺不錯的。官網出處:https://phoenix.apache.org/faq.html
2、 通過測試確認批量匯入會自動更新phoenix二級索引(這個結果不受是否先有hbase表的影響)。
3、匯入檔案編碼預設是utf-8格式。
4、mr方式支援的引數還有其他的具體如下:
5、mr方式匯入資料預設會自動更新指定表的所有索引表,如果只需要更新指定的索引表可用-it 引數指定更新的索引表。對檔案預設支援的分割符是逗號,引數為-d.
6、如果是想通過程式碼方式批量匯入資料,可以通過程式碼先將資料寫到hdfs中,將mr批量匯入方式寫到shell指令碼中,再通過程式碼呼叫shell指令碼(寫批量執行命令的指令碼)執行便可(這種方式筆者也沒有試過,等實際有需求再試試了,理論上應該是沒問題的)。
參考官網