1. 程式人生 > 其它 >PostgreSQL/lightdb邏輯複製詳解

PostgreSQL/lightdb邏輯複製詳解

  之所以有邏輯複製,是因為物理複製是基於資料塊的複製,每個例項的資料塊是自己維護的,無法做到全域性,所以只能藉助邏輯塊複製,即使是核心整合的HTAP,在行存和列存之間同步時,也採用的是邏輯塊複製。邏輯複製可用於很多場景,例如部分資料同步、DW整合、同步到大資料、ES、做流式計算、快取更新等等,在這些場景中,CDC是非常重要的。

  Postgres 10開始原生支援邏輯複製。

  邏輯複製也稱為行級複製或CDC,所以vacuum、index update這些都不會包含。主要用於雙主、同步到kafka/redis/gp等場景,因為基於複製協議,理論上也可以做到半同步,效能上可達到流複製的2/3,預設不支援DDL、序列(pglogical可配置)。是否支援多主?(是否可以源端不decode?直接到目標庫,技術上可以的。但是因為要基於當時的catalog進行decode以便精確解析出定義,所以會比較麻煩。比如oracle logminer/OGG/xstream就支援在三個地方進行解碼,也是為了需要資料字典同步)是否支援可以支援DDL?(BDR支援)

邏輯複製的架構

 

  邏輯複製涉及的元件包括:複製槽(pg_replication_slots)、訂閱(pg_subscription)、複製源(pg_replication_origin)、解碼外掛(plugin)、釋出(pg_publication、pg_publication_tables、pg_publication_rel)。其中邏輯複製的消費者不一定要是subscription,可以是其他比如pg_recvlogical。subscription和pg_subscription的存在是為了pg例項之間邏輯複製可以開箱即用。

  在PG的架構上,邏輯解碼是發生在wal_sender上的,而不是消費者負責解碼(oracle則支援在主機或其它例項,其他例項要求包含catalog,和維護

replslotreplication slots 是從 postgresql 9.4 引入的,主要是提供了一種自動化的方法來確保主控機在所有的後備機收到 WAL 段之前不會移除它們,並且主控機也不會移除可能導致恢復衝突的行,即使後備機斷開也是如此。 為了防止 WAL 被刪,SLOT restart_lsn 之後的WAL檔案都不會刪除。(wal_keep_segments 則是一個人為設定的邊界,slot 是自動設定的邊界(無上限),所以使用 slot 並且下游斷開後,可能導致資料庫的 WAL 堆積爆滿)。)中的min_catalog是一樣的,否則就解碼不出了,wal中包含了oid和blockid,同時pg的wal是full record的,所以apply和解析時都可以非常快),如下:

  對於任何一個有訂閱或消費者的複製槽,都有一個對應的walsender(這一點和流複製是一樣的)程序,實時等待被wal writer或bgwriter喚醒去讀取剛剛提交的xlog,通過訊號通知。 

  啟動一個pg_recvlogical消費者,

[zjh@hs-10-20-30-193 ~]$ nohup lt_recvlogical -p25432 -Uzjh -d postgres --slot test_for_recvlogical --start -f - &
[1] 237063
[zjh@hs-10-20-30-193 ~]$ nohup: ignoring input and appending output to ??ohup.out?

[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ tail -fn 100 nohup.out 
BEGIN 2594732
table public.users: INSERT: user_id[integer]:5 user_name[character varying]:'data5' gender[integer]:null salary[numeric]:null dept_id[integer]:null create_date[timestamp without time zone]:null update_date[timestamp without time zone]:'2022-04-10 08:30:42.870449'
COMMIT 2594732

  再對應的walsender程序狀態。如下:

     libc.so.6!__epoll_wait_nocancel    
>    WaitEventSetWaitBlock(set = 0x1841828, cur_timeout = 30000, occurred_events = 0x7ffd43ffa980, nevents = 1)    C++ (gdb)
     WaitEventSetWait(set = 0x1841828, timeout = 30000, occurred_events = 0x7ffd43ffa980, nevents = 1, wait_event_info = 100663303)    C++ (gdb)
     WaitLatchOrSocket(latch = 0x7f9fcafdb2ec, wakeEvents = 43, sock = 11, timeout = 30000, wait_event_info = 100663303)    C++ (gdb)
     WalSndWaitForWal(loc = 1882311624)    C++ (gdb)
     logical_read_xlog_page(state = 0x18f2030, targetPagePtr = 1882308608, reqLen = 3016, targetRecPtr = 1882311600, cur_page = 0x18fdc18 "\006\321\005")    C++ (gdb)
     ReadPageInternal(state = 0x18f2030, pageptr = 1882308608, reqLen = 3016)    C++ (gdb)
     XLogReadRecord(state = 0x18f2030, errormsg = 0x7ffd43ffab58)    C++ (gdb)
     XLogSendLogical    C++ (gdb)
     WalSndLoop(send_data = 0x891d26 <XLogSendLogical>)    C++ (gdb)   ## 一直等待,直到有wal後呼叫send_data函式指標指向的XLogSendLogical函式開始寫邏輯解碼資料
     StartLogicalReplication(cmd = 0x18c1b58)    C++ (gdb)
     exec_replication_command(cmd_string = 0x1812418 "START_REPLICATION SLOT \"test_for_recvlogical\" LOGICAL 0/0")    C++ (gdb)
     PostgresMain(argc = 1, argv = 0x18407b0, dbname = 0x1840720 "postgres", username = 0x1840708 "zjh")    C++ (gdb)
     BackendRun(port = 0x183c740)    C++ (gdb)
     BackendStartup(port = 0x183c740)    C++ (gdb)
     ServerLoop    C++ (gdb)
     PostmasterMain(argc = 3, argv = 0x180cf00)    C++ (gdb)
     main(argc = 3, argv = 0x180cf00)    C++ (gdb)

  收到訊號後,會呼叫訊號處理器,如procsignal_sigusr1_handler。這一點上和流複製基本一致。

postgresql wal中的origin

 

  origin主要用於記錄邏輯複製中記錄來源於哪個源,在邏輯分析外掛中使用,用於過濾掉不需要的資料來源。

    維護在pg_replication_origin中。既可以由訂閱自動建立,也可以人工建立。

--publication
postgres=# \d
       List of relations
 Schema | Name | Type  | Owner  
--------+------+-------+--------
 public | t1   | table | movead
(1 row)
postgres=# create publication pub1 for all tables ;
CREATE PUBLICATION

--------------------------------------------------
--subscription
postgres=# create subscription sub1 connection 'host=xxxxxxxx port=5432 dbname=postgres user=movead' publication pub1;
NOTICE:  created replication slot "sub1" on publisher
CREATE SUBSCRIPTION
postgres=#

postgres=# select * from pg_replication_origin;
 roident |   roname    
---------+-------------
       1 | pg_16389

postgres=# select pg_replication_origin_create('test_origin');   -- 用給定的外部名稱建立一個複製源,並且返回分配給它的內部ID。
 pg_replication_origin_create 
------------------------------
                            2  -- 返回的是origin id
(1 row)

postgres=# select * from pg_replication_origin;
 roident |   roname    
---------+-------------
       1 | pg_16389
       2 | test_origin
(2 rows)
postgres=#

  對於訂閱產生的origin,可以通過在pub端插入資料,然後分析wal,就可以看出wal打標記了。

rmgr: Transaction len (rec/tot):     65/    65, tx:        519, lsn: 0/03000068, prev 0/03000028, desc: COMMIT 2020-04-16 17:09:01.989257 CST; origin: node 1, lsn 0/0, at 2020-04-16 17:05:49.767511 CST


rmgr: Transaction len (rec/tot):     65/    65, tx:        520, lsn: 0/03000128, prev 0/030000E8, desc: COMMIT 2020-04-16 17:09:09.327941 CST; origin: node 2, lsn 0/156BB28, at 2020-04-16 17:09:09.268948 CST

  originid通過roident標識:

postgres=#  \d pg_replication_origin
    Table "pg_catalog.pg_replication_origin"
 Column  | Type | Collation | Nullable | Default 
---------+------+-----------+----------+---------
 roident | oid  |           | not null | 
 roname  | text | C         | not null | 

  對於手工建立的origin,需要呼叫pg_replication_origin_session_setup () API繫結會話到origin。

postgres=# select pg_replication_origin_session_setup('test_origin'); -- 將當前會話標記為從給定的原點回放,從而允許跟蹤回放進度。 只能在當前沒有選擇原點時使用。使用pg_replication_origin_session_reset 命令來撤銷。
 pg_replication_origin_session_setup 
-------------------------------------
(1 row)
postgres=# insert into t1 values(100);select pg_current_wal_lsn();   -- 會話必須繫結到origin會話,wal中才會標記origin
INSERT 0 1
 pg_current_wal_lsn 
--------------------
 0/4000230
(1 row)

   自帶外掛test_decoding可以實現Origin的解析使用。

   就一般使用而言,解碼器可以考慮wal2json、pglogical。但是他們倆都屬於元件級別,算不到產品級。如果沒有自研能力,基本使用可以考慮Debezium。

http://www.postgres.cn/docs/13/functions-admin.html 

https://www.highgo.ca/2020/04/18/the-origin-in-postgresql-logical-decoding/

https://www.postgresql.org/docs/14/replication-origins.html 

https://www.postgresql.org/docs/14/logicaldecoding.html 

https://www.postgresql.fastware.com/blog/logical-replication-tablesync-workers   雙程序體系

https://wiki.postgresql.org/wiki/Logical_replication_and_physical_standby_failover 

邏輯複製入門系列

https://blog.anayrat.info/en/2017/07/29/postgresql-10-and-logical-replication-overview/

https://blog.anayrat.info/en/2017/08/05/postgresql-10-and-logical-replication-setup/

https://blog.anayrat.info/2017/08/27/postgresql-10-et-la-r%C3%A9plication-logique-restrictions/

https://blog.anayrat.info/en/2018/03/10/logical-replication-internals/ 

 

https://www.slideshare.net/PetrJelinek1/logical-replication-in-postgresql-flossuk-2016

https://www.slideshare.net/UmairShahid16/logical-replication-with-pglogical

https://www.slideshare.net/AlexanderShulgin3/streaming-huge-databases-using-logical-decoding  每行記錄都包含的元資料,非常耗費資源,需要考慮下其他序列化機制。

avro(相當於帶schema定義的JSONB/BSON),kafka中的應用

https://docs.oracle.com/database/121/XSTRM/xstrm_pt_concepts.htm#XSTRM72454  oracle邏輯複製

https://zhuanlan.zhihu.com/p/163204827