1. 程式人生 > 實用技巧 >Hbase基礎(十九):億級資料從 MySQL 到 Hbase 的三種同步方案與實踐(三)同步利器

Hbase基礎(十九):億級資料從 MySQL 到 Hbase 的三種同步方案與實踐(三)同步利器

原文:https://blog.csdn.net/rlnlo2pnefx9c/article/details/108288956

4.1 簡單粗暴Sqoop

首先來回顧一下Sqoop架構圖:

架構圖

這裡大家記住一個規則:大資料需要切分!如果不切分,這個億級資料直接匯入會崩潰!!!

什麼是Sqoop?

Sqoop是一個用來將Hadoop和關係型資料庫中的資料相互轉移的開源工具,可以將一個關係型資料庫中的資料導進到Hadoop的HDFS或者HBase等。

sqoop核心引數與程式碼解釋:

sqoop import 
--connect jdbc:mysql://localhost:3306/loaddb
--username root
--password xxxx
--query "
${sql}"
--hbase-row-key id
--hbase-create-table
--column-family info
--hbase-table mysql_data
--split-by id -m 4

--connect 指定連線的資料庫,如果你的資料庫不是本地的,記得修改地址!--username 使用者名稱 --password 密碼 --query sql語句 --hbase-row-key 指定rowkey,如果存在則修改為該值 --hbase-create-table 建立Hbase表 --column-family 列簇 --hbase-table hbase表名

注意:當-m 設定的值大於1時,split-by必須設定欄位!

由於資料太大,需要分片匯入,具體匯入程式碼見倉庫:

up=185941000
for((i=1; i>0; i++))
do   
    start=$(((${i} - 1) * 40000 + 1))
    end=$((${i} * 40000))
    if [ $end -ge $up ]
    then 
        end=185941000
    fi  
 
    sql="select id,carflag, touchevent, opstatus,gpstime,gpslongitude,gpslatitude,gpsspeed,gpsorientation,gpsstatus from loaddb.loadTable1 where id>=${start} and id<=${end} and \$CONDITIONS
"; sqoop import --connect jdbc:mysql://localhost:3306/loaddb --username root --password xxxx --query "${sql}" --hbase-row-key id --hbase-create-table --column-family info --hbase-table mysql_data --split-by id -m 4 echo Sqoop import from: ${start} to: ${end} success.................................... if [ $end -eq $up ] then break fi done

思路是每隔4萬匯入一次,當然您也可以修改。

耗時:(使用linux的time統計bash指令碼執行時間)

enter image description here

匯入結果:

enter image description here

如果遇到問題,顯示虛擬記憶體溢位,不斷新開程序,殺死之前的程序,解決方案:關閉虛擬記憶體。

enter image description here

修改yarn-site.xml

<property>
  <name>yarn.nodemanager.vmem-check-enabled</name>
  <value>false</value>
</property>

以上就是MySQL同步Hbase方案1。

4.2 Kafka-thrift同步


enter image description here

4.2.1 binlog

binlog是sever層維護的一種二進位制日誌,與innodb引擎中的redo/undo log是完全不同的日誌。

可以簡單的理解該log記錄了sql標中的更新刪除插入等操作記錄。通常應用在資料恢復、備份等場景。

開啟binlog

對於我的mysql的配置檔案在下面這個資料夾,當然直接編輯my.cnf也是可以的。

  1. vi /etc/mysql/mysql.conf.d/mysqld.cnf

對配置檔案設定如下:

openbinlog

檢視是否啟用

進入mysql客戶端輸入:

  1. show variables like '%log_bin%';

binlog

binlog介紹

我的log存放在var下面的log的mysql下面:

loglook

在mysql-bin.index中包含了所有的log檔案,比如上述圖就是包含了1與2檔案,檔案長度超過相應大小就會新開一個log檔案,索引遞增,如上面的000001,000002。

binlog實戰

首先建立一個表:

  1. create table house(id int not null primary key,house int,price int);

向表中插入資料:

  1. insert into loaddb.house(id,house,price) values(1,2,3);

上面提到插入資料後,binlog會更新,那麼我們去檢視上面log檔案,應該會看到插入操作。

Mysql binlog日誌有ROW,Statement,MiXED三種格式;

  1. set global binlog_format='ROW/STATEMENT/MIXED'

命令列:

  1. show variables like 'binlog_format'

row

對於mysql5.7的,binlog格式預設為ROW,所以不用修改。

那麼為何要了解binlog格式呢,原因很簡單,我要檢視我的binlog日誌,而該日誌為二進位制檔案,開啟後是亂碼的。對於不同的格式,檢視方式不一樣!

對於ROW模式生成的sql編碼需要解碼,不能用常規的辦法去生成,需要加上相應的引數,如下程式碼:

  1. sudo /usr/bin/mysqlbinlog mysql-bin.000002 --base64-output=decode-rows -v

使用mysqlbinlog工具檢視日誌檔案:

binlog

4.2.2 啟動thrift介面

thrift為其他語言與hbase操縱介面。啟動目的為後面資料插入做準備。

enter image description here

4.2.3 kafka-thrift流程小結

使用github倉庫程式碼將原始資料進行每2w一個檔案切分!

切分輸出:

split上述切分速度非常快,2分鐘左右即可切完,可以自定義檔案大小。

編寫Kafka資料入Hbase,完整程式碼見github倉庫程式碼:

def batchTokafka(self,start_time,table_name):
    table = self.conn.table(table_name)
    i = 1
    with table.batch(batch_size=1024*1024) as bat:
        for m in self.consumer:
            t = time.time()
            database = json.loads(m.value.decode('utf-8'))["database"]
            name = json.loads(m.value.decode('utf-8'))["table"]
            row_data = json.loads(m.value.decode('utf-8'))["data"]
            if database=='loaddb' and name == 'sqlbase1':
                row_id = row_data["id"]
                row = str(row_id)
                print(row_data)
                del row_data["id"]
                data = {}
                for each in row_data:
                    neweach = 'info:' + each
                    data[neweach] = row_data[each]
                data['info:gpslongitude'] = str(data['info:gpslongitude'])
                data['info:gpslatitude'] = str(data['info:gpslatitude'])
                data['info:gpsspeed'] = str(data['info:gpsspeed'])
                data['info:gpsorientation'] = str(data['info:gpsorientation'])
                # self.insertData(table_name, row, data)
                print(data)
                bat.put(row,data)
                if i%1000==0:
                    print("===========插入了" + str(i) + "資料!============")
                    print("===========累計耗時:" + str(time.time() - start_time) + "s=============")
                    print("===========距離上次耗時"+ str(time.time() - t)  +"=========")
                i+=1

上述執行後,開始MySQL資料插入,這裡插入採用4個多程序進行程式插入,速度非常快。

當MySQL資料在插入的同時,資料流向如下:

mysql插入->入庫mysql->記錄binlog->maxwell提取binlog->返回json給kafka->kafka消費端通過thrift介面->寫入hbase。

上述同步的結果如下,為了明確是否真正資料同步,只看了一條資料,作為驗證。

多個程序插入圖

mutprocess

kafka消費入hbase圖

enter image description here

MySQL資料圖

enter image description here

Hbase資料圖

enter image description here

以上就是從Mysql到Hbase的同步方案2。

4.3 Kafka-Flink

enter image description here

4.3.1 實時同步Flink

方案3為方案2的改進,上述是通過Python寫入Hbase,這裡改成java,並使用最新的流處理技術:Flink。

Flink在ETL場景中使用頻繁,非常適合資料同步,於是在這個方案中採用Flink進行同步。

核心程式碼實現,完整程式碼見github倉庫地址:

SingleOutputStreamOperator<Student> student = env.addSource(
new FlinkKafkaConsumer011<>(
        "test",   //這個 kafka topic 需要和上面的工具類的 topic 一致
        new SimpleStringSchema(),
        props)).setParallelism(9)
        .map(string -> JSON.parseObject(string, Student.class))
        .setParallelism(9);
 
long start =System.currentTimeMillis();
student.timeWindowAll(Time.seconds(3)).apply(new AllWindowFunction<Student,
List<Student>, TimeWindow>() {
    @Override
    public void apply(TimeWindow window, Iterable<Student> values,
    Collector<List<Student>> out) throws Exception {
        ArrayList<Student> students = Lists.newArrayList(values);
        if (students.size() > 0) {
            System.out.println("1s內收集到 mysql表 的資料條數是:" 
            + students.size());
            long end =System.currentTimeMillis();
            System.out.printf("已經用時time:%d\n",end-start);
            out.collect(students);
        }
    }
}).addSink(new SinkToHBase()).setParallelism(9);

使用Flink進行批量入Hbase。

4.3.2 Flink小結

首先啟動maxwell與kafka,hbase也要啟動,接著在資料寫入端,可以採用load data infile或者python程式插入法進行資料插入,資料會通過maxwell到kafka再到Flink,然後sink到Hbase。

插入端為load data infile的同步

flink_sink

插入端為Python程式的同步

flink_load