1. 程式人生 > >Debezium SQL Server Source Connector+Kafka+Spark+MySQL 實時資料處理

Debezium SQL Server Source Connector+Kafka+Spark+MySQL 實時資料處理

寫在前面

前段時間在實時獲取SQLServer資料庫變化時候,整個過程可謂是坎坷。然後就想在這裡記錄一下。

本文的技術棧: Debezium SQL Server Source Connector+Kafka+Spark+MySQL

ps:後面應該會將資料放到Kudu上。

然後主要記錄一下,整個元件使用和元件對接過程中一些注意點和坑。

開始吧

在處理實時資料時,需要即時地獲得資料庫表中資料的變化,然後將資料變化傳送到Kafka中。不同的資料庫有不同的元件進行處理。

常見的MySQL資料庫,就有比較多的支援 canal ,maxwell等,他們都是類似 MySQL binlog 增量訂閱&消費元件這種模式 。那麼關於微軟的SQLServer資料庫,好像整個開源社群 支援就沒有那麼好了。

1.選擇Connector

Debezium的SQL Server聯結器是一種源聯結器,可以獲取SQL Server資料庫中現有資料的快照,然後監視和記錄對該資料的所有後續行級更改。每個表的所有事件都記錄在單獨的Kafka Topic中,應用程式和服務可以輕鬆使用它們。然後本聯結器也是基於MSSQL的change data capture實現。

2.安裝Connector

我參照官方文件安裝是沒有問題的。

2.1 Installing Confluent Hub Client

Confluent Hub客戶端本地安裝為Confluent Platform的一部分,位於/ bin目錄中。
Linux
Download and unzip the Confluent Hub tarball.

[root@hadoop001 softs]# ll confluent-hub-client-latest.tar 
-rw-r--r--. 1 root root 6909785 9月  24 10:02 confluent-hub-client-latest.tar
[root@hadoop001 softs]# tar confluent-hub-client-latest.tar -C ../app/conn/
[root@hadoop001 softs]# ll ../app/conn/
總用量 6748
drwxr-xr-x. 2 root root      27 9月  24 10:43 bin
-rw-r--r--. 1 root root 6909785 9月  24 10:02 confluent-hub-client-latest.tar
drwxr-xr-x. 3 root root      34 9月  24 10:05 etc
drwxr-xr-x. 2 root root       6 9月  24 10:08 kafka-mssql
drwxr-xr-x. 4 root root      29 9月  24 10:05 share
[root@hadoop001 softs]#

配置bin目錄到系統環境變數中

export CONN_HOME=/root/app/conn
export PATH=$CONN_HOME/bin:$PATH

確認是否安裝成功

[root@hadoop001 ~]# source /etc/profile
[root@hadoop001 ~]# confluent-hub
usage: confluent-hub <command> [ <args> ]

Commands are:
    help      Display help information
    install   install a component from either Confluent Hub or from a local file

See 'confluent-hub help <command>' for more information on a specific command.
[root@hadoop001 ~]# 

2.2 Install the SQL Server Connector
使用命令confluent-hub

[root@hadoop001 ~]# confluent-hub install debezium/debezium-connector-sqlserver:0.9.4
The component can be installed in any of the following Confluent Platform installations: 
  1. / (installed rpm/deb package) 
  2. /root/app/conn (where this tool is installed) 
Choose one of these to continue the installation (1-2): 2
Do you want to install this into /root/app/conn/share/confluent-hub-components? (yN) n

Specify installation directory: /root/app/conn/share/java/confluent-hub-client
 
Component's license: 
Apache 2.0 
https://github.com/debezium/debezium/blob/master/LICENSE.txt 
I agree to the software license agreement (yN) y

You are about to install 'debezium-connector-sqlserver' from Debezium Community, as published on Confluent Hub. 
Do you want to continue? (yN) y

注意:Specify installation directory:這個安裝目錄最好是你剛才的confluent-hub 目錄下的 /share/java/confluent-hub-client 這個目錄下。其餘的基本操作就好。

3.配置Connector

首先需要對Connector進行配置,配置檔案位於 $KAFKA_HOME/config/connect-distributed.properties:

# These are defaults. This file just demonstrates how to override some settings.
# kafka叢集地址,我這裡是單節點多Broker模式
bootstrap.servers=haoop001:9093,hadoop001:9094,hadoop001:9095

# Connector叢集的名稱,同一叢集內的Connector需要保持此group.id一致
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# 儲存到kafka的資料格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# 內部轉換器的格式,針對offsets、config和status,一般不需要修改
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
# 用於儲存offsets的topic,應該有多個partitions,並且擁有副本(replication),主要根據你的叢集實際情況來
# Kafka Connect會自動建立這個topic,但是你可以根據需要自行建立
offset.storage.topic=connect-offsets-2
offset.storage.replication.factor=3
offset.storage.partitions=1

# 儲存connector和task的配置,應該只有1個partition,並且有3個副本
config.storage.topic=connect-configs-2
config.storage.replication.factor=3


# 用於儲存狀態,可以擁有多個partition和replication
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status-2
status.storage.replication.factor=3
status.storage.partitions=1


offset.storage.file.filename=/root/data/kafka-logs/offset-storage-file

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000


# REST埠號
rest.port=18083

# 儲存connectors的路徑
#plugin.path=/root/app/kafka_2.11-0.10.1.1/connectors
plugin.path=/root/app/conn/share/java/confluent-hub-client

4.建立kafka Topic

我這裡是單節點多Broker模式的Kafka,那麼建立Topic可以如下:

kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-offsets-2 --replication-factor 3 --partitions 1

kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-configs-2 --replication-factor 3 --partitions 1

kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-status-2 --replication-factor 3 --partitions 1

檢視狀態 <很重要>

[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-offsets-2
Topic:connect-offsets-2 PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: connect-offsets-2    Partition: 0    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2

[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-configs-2
Topic:connect-configs-2 PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: connect-configs-2    Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3

[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-status-2
Topic:connect-status-2  PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: connect-status-2 Partition: 0    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
[root@hadoop001 ~]#

5.開啟SqlServer Change Data Capture(CDC)更改資料捕獲

變更資料捕獲用於捕獲應用到 SQL Server 表中的插入、更新和刪除活動,並以易於使用的關係格式提供這些變更的詳細資訊。變更資料捕獲所使用的更改表中包含映象所跟蹤源表列結構的列,同時還包含了解所發生的變更所需的元資料。變更資料捕獲提供有關對錶和資料庫所做的 DML 更改的資訊。通過使用變更資料捕獲,您無需使用費用高昂的方法,如使用者觸發器、時間戳列和聯接查詢等。

資料變更歷史表會隨著業務的持續,變得很大,所以預設情況下,變更資料歷史會在本地資料庫保留3天(可以通過檢視msdb.dbo.cdc_jobs的欄位retention來查詢,當然也可以更改對應的表來修改保留時間),每天會通過SqlServer後臺代理任務,每天晚上2點定時刪除。所以推薦定期的將變更資料轉移到資料倉庫中。

以下命令基本就夠用了

--檢視資料庫是否起用CDC
  GO
  SELECT [name], database_id, is_cdc_enabled
  FROM sys.databases      
  GO
    
--資料庫起用CDC
 USE test1
 GO
 EXEC sys.sp_cdc_enable_db
 GO
 
--關閉資料庫CDC
 USE test1
 go
 exec sys.sp_cdc_disable_db
 go
 
--查看錶是否啟用CDC
USE test1
GO
SELECT [name], is_tracked_by_cdc 
FROM sys.tables
GO
--啟用表的CDC,前提是資料庫啟用 
USE Demo01
GO
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name   = 'user',
@capture_instance='user',
@role_name     = NULL
GO

--關閉表上的CDC功能
USE test1
GO
EXEC sys.sp_cdc_disable_table
@source_schema = 'dbo',
@source_name   = 'user',
@capture_instance='user'
GO
 
--可能不記得或者不知道開啟了什麼表的捕獲,返回所有表的變更捕獲配置資訊

EXECUTE sys.sp_cdc_help_change_data_capture;
GO

--檢視對某個例項(即表)的哪些列做了捕獲監控:

EXEC sys.sp_cdc_get_captured_columns
@capture_instance = 'user'
 
 
 --查詢配置資訊 -retention 變更資料保留的分鐘數
SELECT * FROM test1.dbo.cdc_jobs

--更改資料保留時間為分鐘
EXECUTE sys.sp_cdc_change_job
@job_type = N'cleanup',
@retention=1440
GO

--停止捕獲作業
exec sys.sp_cdc_stop_job N'capture'
go
--啟動捕獲作業
exec sys.sp_cdc_start_job N'capture'
go
 

6.執行Connector

怎麼執行呢?參照

[root@hadoop001 bin]# pwd
/root/app/kafka_2.11-1.1.1/bin
[root@hadoop001 bin]# ./connect-distributed.sh 
USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties
[root@hadoop001 bin]#
[root@hadoop001 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

... 這裡就會有大量日誌輸出

驗證:

[root@hadoop001 ~]# netstat -tanp |grep 18083
tcp6       0      0 :::18083                :::*                    LISTEN      29436/java          
[root@hadoop001 ~]#

6.1 獲取Worker的資訊

ps:可能你需要安裝jq這個軟體: yum -y install jq ,當然可以在瀏覽器上開啟

[root@hadoop001 ~]# curl -s hadoop001:18083 | jq
{
  "version": "1.1.1",
  "commit": "8e07427ffb493498",
  "kafka_cluster_id": "dmUSlNNLQ9OyJiK-bUc6Tw"
}
[root@hadoop001 ~]#

6.2 獲取Worker上已經安裝的Connector

[root@hadoop001 ~]# curl -s hadoop001:18083/connector-plugins | jq
[
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "0.9.5.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "1.1.1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "1.1.1"
  }
]
[root@hadoop001 ~]# 

可以看見io.debezium.connector.sqlserver.SqlServerConnector 這個是我們自己剛才安裝的聯結器

6.3 列出當前執行的connector(task)

[root@hadoop001 ~]#  curl -s hadoop001:18083/connectors | jq
[]
[root@hadoop001 ~]# 

6.4 提交Connector使用者配置 《重點》

當提交使用者配置時,就會啟動一個Connector Task,
Connector Task執行實際的作業。
使用者配置是一個Json檔案,同樣通過REST API提交:

curl -s -X POST -H "Content-Type: application/json" --data '{
 "name": "connector-mssql-online-1",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "test1",
     "database.hostname" : "hadoop001",
     "database.port" : "1433",
     "database.user" : "sa",
     "database.password" : "xxx",
     "database.dbname" : "test1",
     "database.history.kafka.bootstrap.servers" : "hadoop001:9093",
     "database.history.kafka.topic": "test1.t201909262.bak"
     }
}' http://hadoop001:18083/connectors

馬上檢視connector當前狀態,確保狀態是RUNNING

[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1/status | jq
{
  "name": "connector-mssql-online-1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "xxx:18083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "xxx:18083"
    }
  ],
  "type": "source"
}
[root@hadoop001 ~]# 

此時檢視Kafka Topic

[root@hadoop001 ~]#  kafka-topics.sh --list --zookeeper hadoop001:2181
__consumer_offsets
connect-configs-2
connect-offsets-2
connect-status-2
#自動生成的Topic, 記錄表結構的變化,生成規則:你的connect中自定義的
test1.t201909262.bak

[root@hadoop001 ~]# 

再次列出執行的connector(task)

[root@hadoop001 ~]#  curl -s hadoop001:18083/connectors | jq
[
  "connector-mssql-online-1"
]
[root@hadoop001 ~]# 

6.5 檢視connector的資訊

[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1 | jq
{
  "name": "connector-mssql-online-1",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.user": "sa",
    "database.dbname": "test1",
    "tasks.max": "1",
    "database.hostname": "hadoop001",
    "database.password": "xxx",
    "database.history.kafka.bootstrap.servers": "hadoop001:9093",
    "database.history.kafka.topic": "test1.t201909262.bak",
    "name": "connector-mssql-online-1",
    "database.server.name": "test1",
    "database.port": "1433"
  },
  "tasks": [
    {
      "connector": "connector-mssql-online-1",
      "task": 0
    }
  ],
  "type": "source"
}
[root@hadoop001 ~]#

6.6 檢視connector下執行的task資訊

[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1/tasks | jq
[
  {
    "id": {
      "connector": "connector-mssql-online-1",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "database.user": "sa",
      "database.dbname": "test1",
      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
      "tasks.max": "1",
      "database.hostname": "hadoop001",
      "database.password": "xxx",
      "database.history.kafka.bootstrap.servers": "hadoop001:9093",
      "database.history.kafka.topic": "test1.t201909262.bak",
      "name": "connector-mssql-online-1",
      "database.server.name": "test1",
      "database.port": "1433"
    }
  }
]
[root@hadoop001 ~]#

task的配置資訊繼承自connector的配置

6.7 暫停/重啟/刪除 Connector

# curl -s -X PUT hadoop001:18083/connectors/connector-mssql-online-1/pause
# curl -s -X PUT hadoop001:18083/connectors/connector-mssql-online-1/resume
# curl -s -X DELETE hadoop001:18083/connectors/connector-mssql-online-1

7.從Kafka中讀取變動資料

# 記錄表結構的變化,生成規則:你的connect中自定義的
kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.t201909262.bak --from-beginning


# 記錄資料的變化,生成規則:test1.dbo.t201909262

kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262 --from-beginning

這裡就是:

kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262 --from-beginning

kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262

8. 對錶進行 DML語句 操作

新增資料:
然後kafka控制檯也就會馬上打出日誌

spark 對接kafka 10s一個批次

然後就會將這個新增的資料插入到MySQL中去
具體的處理邏輯後面再花時間來記錄一下

修改和刪除也是OK的,就不演示了

有任何問題,歡迎留言一起交流~~

更多好文:https://blog.csdn.net/liuge36

參考文章:
https://docs.confluent.io/current/connect/debezium-connect-sqlserver/index.html#sqlserver-source-connector
https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/track-data-changes-sql-server?view=sql-server-2017
https://blog.csdn.net/qq_19518987/article/details/89329464
http://www.tracefact.net/tech/087.html