1. 程式人生 > 其它 >使用flink-cdc實現實時資料同步

使用flink-cdc實現實時資料同步

使用flink-cdc實現實時資料庫同步

本文地址:https://www.cnblogs.com/muphy/p/16043924.html

  • Flink CDC Connectors 是Apache Flink的一組源聯結器,使用變更資料捕獲 (CDC) 從不同的資料庫中獲取變更。
  • 基於查詢的 CDC:sqoop、dataX等,離線排程查詢作業,批處理。把一張表同步到其他系統,每次通過查詢去獲取表中最新的資料;無法保障資料一致性,查的過程中有可能資料已經發生了多次變更;不保障實時性,基於離線排程存在天然的延遲。
  • 基於日誌的 CDC:canel、Oracle Goldengate等,實時消費日誌,流處理,例如 MySQL 的 binlog 日誌完整記錄了資料庫中的變更,可以把 binlog 檔案當作流的資料來源;保障資料一致性,因為 binlog 檔案包含了所有歷史變更明細;保障實時性,因為類似 binlog 的日誌檔案是可以流式消費的,提供的是實時資料。

安裝 kali Linux 只用於安裝Linux環境

flinkcdc安裝

官方文件

下載flinkcdc

https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz

下載資料庫connector

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.6/flink-sql-connector-elasticsearch7_2.12-1.13.6.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1.1/flink-sql-connector-postgres-cdc-2.1.1.jar

安裝flinkcdc

  • 解壓flink-1.13.6-bin-scala_2.12.tgz到flink-1.13.6
tar -zxvf flink-1.13.6-bin-scala_2.12.tgz
  • 將connector複製到flink-1.13.6/lib
cp flink-sql-connector-* flink-1.13.6/lib
  • 啟動start-cluster
cd flink-1.13.6
./bin/start-cluster.sh

安裝es、kibana、mysql

開防火牆 非必須

firewall-cmd --permanent --zone=public --add-port=9200/tcp
firewall-cmd --permanent --zone=public --add-port=9300/udp
firewall-cmd --permanent --zone=public --add-port=5601/tcp  --add-port=5601/udp
firewall-cmd --reload

1.第一種方式 podman或者docker

# 授權目錄以便正常啟動ES
podman network create es_net
mkdir /opt/es/data/
chmod 776 /opt/es/data/ -R
# 安裝ES
podman run --name=elasticsearch -d \
-e cluster.name=elasticsearch \
-e bootstrap.memory_lock=true  \
-e ES_JAVA_OPTS="-Xms512m -Xmx512m" \
-e discovery.type=single-node \
-v /opt/es/data/es/data:/usr/share/elasticsearch/data \
-v /opt/es/data/es/logs:/usr/share/elasticsearch/logs \
-v /opt/es/data/es/plugins:/usr/share/elasticsearch/plugins \
--net es_net \
--network-alias elasticsearch \
-p 9200:9200 -p 9300:9300 \
--ulimit memlock=-1:-1 \
--ulimit nofile=65536:65536 \
docker.elastic.co/elasticsearch/elasticsearch:7.6.0
# 安裝kibna
podman run --name=kibana -d \
-e elasticsearch.hosts=http://elasticsearch:9200 \
-v /opt/es/data/kibana/data:/usr/share/kibana/data \
--net es_net \
--network-alias kibana \
-p 5601:5601 \
docker.elastic.co/kibana/kibana:7.6.0

1.第二種方式 docker-compose或者podman-compose

  • 建立docker-compose.yml檔案
version: '3'
services:
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
    networks:
      - es_net
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0
    # localhost: elasticsearch
    container_name: elasticsearch
    # user: root
    environment:
      - cluster.name=elasticsearch
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    volumes:
      - ./data/es/data:/usr/share/elasticsearch/data
      - ./data/es/logs:/usr/share/elasticsearch/logs
      - ./data/es/plugins:/usr/share/elasticsearch/plugins
      # - /docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    networks:
      - es_net
  kibana:
    image: docker.elastic.co/kibana/kibana:7.6.0
    container_name: kibana
    # hostname: kibana
    # user: root
    depends_on:
      - elasticsearch
    ports:
      - "5601:5601"
    environment:
      - "elasticsearch.hosts=http://elasticsearch:9200"
    networks:
      - es_net
# 網路
networks:
  es_net: {}
  • 啟動容器
docker-compose up -d

或者

podman-compose up -d

安裝es中文分詞器

podman exec -it elasticsearch bash
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.0/elasticsearch-analysis-ik-7.6.0.zip

資料同步

mysql準備測試資料

  • 登入
podman-compose exec mysql mysql -uroot -p123456
  • 建表
CREATE DATABASE test;
USE test;
CREATE TABLE test_1 (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE test_1 AUTO_INCREMENT = 10001;
INSERT INTO test_1
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer");
select t.id, t.name, t.description from test_1 t;
  • 如果mysql沒有啟動binlog,需要開啟binlog

檢視是否開啟: show variables like '%log_bin%' # on為開啟

es預先建立索引,預設索引可能不滿足要求 非必須

  • 建立索引
# 如果存在刪除索引 非必須
DELETE index_test_1
PUT index_test_1
PUT index_test_1/_mapping
{
  "properties" : {
    "_class" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "id" : {
      "type" : "long"
    },
    "name" : {
      "type" : "keyword"
    },
    "description" : {
      "type" : "text",
      "analyzer" : "ik_max_word"
    }
  }
}

在flink的sql-client中建立任務

  • 啟動sql-client
./bin/start-cluster.sh
  • 分別建立對應mysql和es的表
-- mysql 生產
CREATE TABLE test_1 (
    id INT,
    name STRING,
    description STRING
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test',
    'table-name' = 'test_1'
  );
  
CREATE TABLE index_test_1 (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'index_test_1'
 );
  • 開啟 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL                   
SET execution.checkpointing.interval = 3s;
  • 建立任務
INSERT INTO index_test_1 
SELECT d.* FROM test_1 d;

檢視結果

檢視任務地址:http://localhost:8081/#/job/running
檢視es資料:http://localhost:9200/index_test_1/_search
亦可以kibana檢視