Hbase基礎(十九):億級資料從 MySQL 到 Hbase 的三種同步方案與實踐(三)同步利器
原文:https://blog.csdn.net/rlnlo2pnefx9c/article/details/108288956
4.1 簡單粗暴Sqoop
首先來回顧一下Sqoop架構圖:
架構圖
這裡大家記住一個規則:大資料需要切分!如果不切分,這個億級資料直接匯入會崩潰!!!
★什麼是Sqoop?
”
Sqoop是一個用來將Hadoop和關係型資料庫中的資料相互轉移的開源工具,可以將一個關係型資料庫中的資料導進到Hadoop的HDFS或者HBase等。
sqoop核心引數與程式碼解釋:
sqoop import
--connect jdbc:mysql://localhost:3306/loaddb
--username root
--password xxxx
--query "${sql}"
--hbase-row-key id
--hbase-create-table
--column-family info
--hbase-table mysql_data
--split-by id -m 4
--connect 指定連線的資料庫,如果你的資料庫不是本地的,記得修改地址!--username 使用者名稱 --password 密碼 --query sql語句 --hbase-row-key 指定rowkey,如果存在則修改為該值 --hbase-create-table 建立Hbase表 --column-family 列簇 --hbase-table hbase表名
注意:當-m 設定的值大於1時,split-by必須設定欄位!
由於資料太大,需要分片匯入,具體匯入程式碼見倉庫:
up=185941000 for((i=1; i>0; i++)) do start=$(((${i} - 1) * 40000 + 1)) end=$((${i} * 40000)) if [ $end -ge $up ] then end=185941000 fi sql="select id,carflag, touchevent, opstatus,gpstime,gpslongitude,gpslatitude,gpsspeed,gpsorientation,gpsstatus from loaddb.loadTable1 where id>=${start} and id<=${end} and \$CONDITIONS"; sqoop import --connect jdbc:mysql://localhost:3306/loaddb --username root --password xxxx --query "${sql}" --hbase-row-key id --hbase-create-table --column-family info --hbase-table mysql_data --split-by id -m 4 echo Sqoop import from: ${start} to: ${end} success.................................... if [ $end -eq $up ] then break fi done
思路是每隔4萬匯入一次,當然您也可以修改。
耗時:(使用linux的time統計bash指令碼執行時間)
enter image description here
匯入結果:
enter image description here
如果遇到問題,顯示虛擬記憶體溢位,不斷新開程序,殺死之前的程序,解決方案:關閉虛擬記憶體。
enter image description here
修改yarn-site.xml
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
以上就是MySQL同步Hbase方案1。
4.2 Kafka-thrift同步
enter image description here
4.2.1 binlog
binlog是sever層維護的一種二進位制日誌,與innodb引擎中的redo/undo log是完全不同的日誌。
可以簡單的理解該log記錄了sql標中的更新刪除插入等操作記錄。通常應用在資料恢復、備份等場景。
★開啟binlog
”
對於我的mysql的配置檔案在下面這個資料夾,當然直接編輯my.cnf
也是可以的。
- vi /etc/mysql/mysql.conf.d/mysqld.cnf
對配置檔案設定如下:
openbinlog
★檢視是否啟用
”
進入mysql客戶端輸入:
- show variables like '%log_bin%';
binlog
★binlog介紹
”
我的log存放在var下面的log的mysql下面:
loglook
在mysql-bin.index中包含了所有的log檔案,比如上述圖就是包含了1與2檔案,檔案長度超過相應大小就會新開一個log檔案,索引遞增,如上面的000001,000002。
★binlog實戰
”
首先建立一個表:
- create table house(id int not null primary key,house int,price int);
向表中插入資料:
- insert into loaddb.house(id,house,price) values(1,2,3);
上面提到插入資料後,binlog會更新,那麼我們去檢視上面log檔案,應該會看到插入操作。
Mysql binlog日誌有ROW,Statement,MiXED三種格式;
- set global binlog_format='ROW/STATEMENT/MIXED'
命令列:
- show variables like 'binlog_format'
row
對於mysql5.7的,binlog格式預設為ROW,所以不用修改。
那麼為何要了解binlog格式呢,原因很簡單,我要檢視我的binlog日誌,而該日誌為二進位制檔案,開啟後是亂碼的。對於不同的格式,檢視方式不一樣!
對於ROW模式生成的sql編碼需要解碼,不能用常規的辦法去生成,需要加上相應的引數,如下程式碼:
- sudo /usr/bin/mysqlbinlog mysql-bin.000002 --base64-output=decode-rows -v
使用mysqlbinlog工具檢視日誌檔案:
binlog
4.2.2 啟動thrift介面
thrift為其他語言與hbase操縱介面。啟動目的為後面資料插入做準備。
enter image description here
4.2.3 kafka-thrift流程小結
使用github倉庫程式碼將原始資料進行每2w一個檔案切分!
切分輸出:
split上述切分速度非常快,2分鐘左右即可切完,可以自定義檔案大小。
編寫Kafka資料入Hbase,完整程式碼見github倉庫程式碼:
def batchTokafka(self,start_time,table_name): table = self.conn.table(table_name) i = 1 with table.batch(batch_size=1024*1024) as bat: for m in self.consumer: t = time.time() database = json.loads(m.value.decode('utf-8'))["database"] name = json.loads(m.value.decode('utf-8'))["table"] row_data = json.loads(m.value.decode('utf-8'))["data"] if database=='loaddb' and name == 'sqlbase1': row_id = row_data["id"] row = str(row_id) print(row_data) del row_data["id"] data = {} for each in row_data: neweach = 'info:' + each data[neweach] = row_data[each] data['info:gpslongitude'] = str(data['info:gpslongitude']) data['info:gpslatitude'] = str(data['info:gpslatitude']) data['info:gpsspeed'] = str(data['info:gpsspeed']) data['info:gpsorientation'] = str(data['info:gpsorientation']) # self.insertData(table_name, row, data) print(data) bat.put(row,data) if i%1000==0: print("===========插入了" + str(i) + "資料!============") print("===========累計耗時:" + str(time.time() - start_time) + "s=============") print("===========距離上次耗時"+ str(time.time() - t) +"=========") i+=1
上述執行後,開始MySQL資料插入,這裡插入採用4個多程序進行程式插入,速度非常快。
當MySQL資料在插入的同時,資料流向如下:
mysql插入->入庫mysql->記錄binlog->maxwell提取binlog->返回json給kafka->kafka消費端通過thrift介面->寫入hbase。
上述同步的結果如下,為了明確是否真正資料同步,只看了一條資料,作為驗證。
★
多個程序插入圖
”
mutprocess
★kafka消費入hbase圖
”
enter image description here
★MySQL資料圖
”
enter image description here
★Hbase資料圖
”
enter image description here
以上就是從Mysql到Hbase的同步方案2。
4.3 Kafka-Flink
enter image description here
4.3.1 實時同步Flink
方案3為方案2的改進,上述是通過Python寫入Hbase,這裡改成java,並使用最新的流處理技術:Flink。
Flink在ETL場景中使用頻繁,非常適合資料同步,於是在這個方案中採用Flink進行同步。
核心程式碼實現,完整程式碼見github倉庫地址:
SingleOutputStreamOperator<Student> student = env.addSource( new FlinkKafkaConsumer011<>( "test", //這個 kafka topic 需要和上面的工具類的 topic 一致 new SimpleStringSchema(), props)).setParallelism(9) .map(string -> JSON.parseObject(string, Student.class)) .setParallelism(9); long start =System.currentTimeMillis(); student.timeWindowAll(Time.seconds(3)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception { ArrayList<Student> students = Lists.newArrayList(values); if (students.size() > 0) { System.out.println("1s內收集到 mysql表 的資料條數是:" + students.size()); long end =System.currentTimeMillis(); System.out.printf("已經用時time:%d\n",end-start); out.collect(students); } } }).addSink(new SinkToHBase()).setParallelism(9);
使用Flink進行批量入Hbase。
4.3.2 Flink小結
首先啟動maxwell與kafka,hbase也要啟動,接著在資料寫入端,可以採用load data infile或者python程式插入法進行資料插入,資料會通過maxwell到kafka再到Flink,然後sink到Hbase。
★插入端為load data infile的同步
”
flink_sink
★插入端為Python程式的同步
”
flink_load