使用flink-cdc實現實時資料同步
阿新 • • 發佈:2022-03-23
使用flink-cdc實現實時資料庫同步
- Flink CDC Connectors 是Apache Flink的一組源聯結器,使用變更資料捕獲 (CDC) 從不同的資料庫中獲取變更。
- 基於查詢的 CDC:sqoop、dataX等,離線排程查詢作業,批處理。把一張表同步到其他系統,每次通過查詢去獲取表中最新的資料;無法保障資料一致性,查的過程中有可能資料已經發生了多次變更;不保障實時性,基於離線排程存在天然的延遲。
- 基於日誌的 CDC:canel、Oracle Goldengate等,實時消費日誌,流處理,例如 MySQL 的 binlog 日誌完整記錄了資料庫中的變更,可以把 binlog 檔案當作流的資料來源;保障資料一致性,因為 binlog 檔案包含了所有歷史變更明細;保障實時性,因為類似 binlog 的日誌檔案是可以流式消費的,提供的是實時資料。
安裝 kali Linux 只用於安裝Linux環境
flinkcdc安裝
官方文件
下載flinkcdc
https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
下載資料庫connector
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.6/flink-sql-connector-elasticsearch7_2.12-1.13.6.jar wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1.1/flink-sql-connector-postgres-cdc-2.1.1.jar
安裝flinkcdc
- 解壓flink-1.13.6-bin-scala_2.12.tgz到flink-1.13.6
tar -zxvf flink-1.13.6-bin-scala_2.12.tgz
- 將connector複製到flink-1.13.6/lib
cp flink-sql-connector-* flink-1.13.6/lib
- 啟動start-cluster
cd flink-1.13.6
./bin/start-cluster.sh
安裝es、kibana、mysql
開防火牆 非必須
firewall-cmd --permanent --zone=public --add-port=9200/tcp firewall-cmd --permanent --zone=public --add-port=9300/udp firewall-cmd --permanent --zone=public --add-port=5601/tcp --add-port=5601/udp firewall-cmd --reload
1.第一種方式 podman或者docker
# 授權目錄以便正常啟動ES
podman network create es_net
mkdir /opt/es/data/
chmod 776 /opt/es/data/ -R
# 安裝ES
podman run --name=elasticsearch -d \
-e cluster.name=elasticsearch \
-e bootstrap.memory_lock=true \
-e ES_JAVA_OPTS="-Xms512m -Xmx512m" \
-e discovery.type=single-node \
-v /opt/es/data/es/data:/usr/share/elasticsearch/data \
-v /opt/es/data/es/logs:/usr/share/elasticsearch/logs \
-v /opt/es/data/es/plugins:/usr/share/elasticsearch/plugins \
--net es_net \
--network-alias elasticsearch \
-p 9200:9200 -p 9300:9300 \
--ulimit memlock=-1:-1 \
--ulimit nofile=65536:65536 \
docker.elastic.co/elasticsearch/elasticsearch:7.6.0
# 安裝kibna
podman run --name=kibana -d \
-e elasticsearch.hosts=http://elasticsearch:9200 \
-v /opt/es/data/kibana/data:/usr/share/kibana/data \
--net es_net \
--network-alias kibana \
-p 5601:5601 \
docker.elastic.co/kibana/kibana:7.6.0
1.第二種方式 docker-compose或者podman-compose
- 建立docker-compose.yml檔案
version: '3'
services:
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
networks:
- es_net
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0
# localhost: elasticsearch
container_name: elasticsearch
# user: root
environment:
- cluster.name=elasticsearch
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
volumes:
- ./data/es/data:/usr/share/elasticsearch/data
- ./data/es/logs:/usr/share/elasticsearch/logs
- ./data/es/plugins:/usr/share/elasticsearch/plugins
# - /docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
networks:
- es_net
kibana:
image: docker.elastic.co/kibana/kibana:7.6.0
container_name: kibana
# hostname: kibana
# user: root
depends_on:
- elasticsearch
ports:
- "5601:5601"
environment:
- "elasticsearch.hosts=http://elasticsearch:9200"
networks:
- es_net
# 網路
networks:
es_net: {}
- 啟動容器
docker-compose up -d
或者
podman-compose up -d
安裝es中文分詞器
podman exec -it elasticsearch bash
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.0/elasticsearch-analysis-ik-7.6.0.zip
資料同步
mysql準備測試資料
- 登入
podman-compose exec mysql mysql -uroot -p123456
- 建表
CREATE DATABASE test;
USE test;
CREATE TABLE test_1 (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE test_1 AUTO_INCREMENT = 10001;
INSERT INTO test_1
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer");
select t.id, t.name, t.description from test_1 t;
- 如果mysql沒有啟動binlog,需要開啟binlog
檢視是否開啟:
show variables like '%log_bin%'
# on為開啟
es預先建立索引,預設索引可能不滿足要求 非必須
- 建立索引
# 如果存在刪除索引 非必須
DELETE index_test_1
PUT index_test_1
PUT index_test_1/_mapping
{
"properties" : {
"_class" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "keyword"
},
"description" : {
"type" : "text",
"analyzer" : "ik_max_word"
}
}
}
在flink的sql-client中建立任務
- 啟動sql-client
./bin/start-cluster.sh
- 分別建立對應mysql和es的表
-- mysql 生產
CREATE TABLE test_1 (
id INT,
name STRING,
description STRING
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'test_1'
);
CREATE TABLE index_test_1 (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'index_test_1'
);
- 開啟 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL
SET execution.checkpointing.interval = 3s;
- 建立任務
INSERT INTO index_test_1
SELECT d.* FROM test_1 d;
檢視結果
檢視任務地址:http://localhost:8081/#/job/running
檢視es資料:http://localhost:9200/index_test_1/_search
亦可以kibana檢視