關於資料同步的幾種實現
關於資料同步的幾種實現
轉載:https://blog.csdn.net/xuemoyao/article/details/14002209
概述
關於資料同步主要有兩個層面的同步,一是通過後臺程式編碼實現資料同步,二是直接作用於資料庫,在資料庫層面實現資料的同步。通過程式編碼實現資料同步,其主要的實現思路很容易理解,即有就更新,無則新增,其他情況日誌記錄,就不做過多的介紹,這裡主要講述的是第二個層面的資料同步,即在資料庫層面實現資料同步。
資料庫層面的資料庫同步主要有三種方式:通過釋出/訂閱的方式實現同步,通過SQL JOB方式實現資料同步,通過Service Broker 訊息佇列的方式實現資料同步。
下面分別就這三種資料同步方式,一一詳解。
- 通過釋出/訂閱的方式實現同步
釋出/訂閱是Sql Server自帶的一種資料庫備份的機制,通過該機制可以快速的實現資料的備份同步,不用編寫任何的程式碼。
此種資料同步的方式存在的以下的一些問題:
表結構不能更改,同步雙方的表結構必須一致,一旦表結構發生更改需要重新生成資料庫快照。
對於大資料量的同步沒有可靠的保證。
網路不穩定的情況下同步也不能保證。
總的來說,這種資料備份同步的方式,在表結構一致、資料量不是特別大的情況下還是非常高效的一種同步方式。
網上有很多的關於如何使用釋出/訂閱的方式實現資料同步的操作示例,這裡就不再重複的演示了,有興趣想要了解的朋友可以參考下面這篇文章:
- 通過SQL JOB方式實現資料同步
通過Sql Job定時作業的方式實現同步其基本原理就是通過目標伺服器和源伺服器的連線,然後通過編寫Sql語句,從源伺服器中讀取資料,再更新到目標伺服器。
這種資料同步的方式比較靈活。建立過sql定時作業之後,主要需要執行以下關鍵的兩步。
2.1 建立資料庫連線(一般作為定時作業執行的第一步)
不同資料庫之間的連線可以通過系統的儲存過程實現。下面就直接用一個示例來講一下如何建立資料庫連線。
–新增一個連線
–系統儲存過程sp_addlinkedserver 引數:
———————-1:目標伺服器的IP或別名,本例中為:’WIN-S1PO3UA6J7I’;———————-2:” (srvproduct,預設);
———————-3:’SQLOLEDB’(provider,預設值);
———————-4:目標伺服器的IP或別名(datasrc),本例中為:’WIN-S1PO3UA6J7I’
exec sp_addlinkedserver ‘WIN-S1PO3UA6J7I’,”,’SQLOLEDB’,’WIN-S1PO3UA6J7I’
–新增登入使用者連線
–系統儲存過程sp_addlinkedsrvlogin 引數:
———————-1:目標伺服器的IP或別名,本例中為:’WIN-S1PO3UA6J7I’;
———————-2:’false’,預設值;
———————-3:null,預設值;
———————-4:’sa’,登入使用者名稱;
———————-5:’[email protected]’,登入密碼;
exec sp_addlinkedsrvlogin ‘WIN-S1PO3UA6J7I’,’false’,null,’sa’,’[email protected]’
建立資料庫連線主要用到了以上的兩個儲存過程,但是在實際操作的過程中可能會遇到“仍有對伺服器XXX的遠端登入或連線登入問題”這樣的問題,如果遇到此類問題,在執行上邊的新增連線和登入使用者連線之前還需要先刪除某個已存在的連結,具體如下:
–系統儲存過程sp_droplinkedsrvlogin 引數:
———————-1:目標伺服器的IP或別名,本例中為:’WIN-S1PO3UA6J7I’;———————-2:null
exec sp_droplinkedsrvlogin ‘WIN-S1PO3UA6J7I’,null
–系統儲存過程sp_dropserver 引數:
———————-1:目標伺服器的IP或別名,本例中為:’WIN-S1PO3UA6J7I’
exec sp_dropserver ‘WIN-S1PO3UA6J7I’
2.2 使用SQL 語句 實現資料同步
主要的同步思路:
1:在目標資料庫中先清空要同步的表的資料
2:使用insert into Table (Cloumn….) select Column….. from 伺服器別名或IP.目標資料庫名.dbo.TableName 的語法將資料從源資料庫讀取並插入到目標資料庫
Truncate table Org_DepartmentsExt –刪除現有系統中已存在的部門表
insert into Org_DepartmentsExt –從名為WIN-S1PO3UA6J7I的伺服器上的DBFrom資料庫上獲取源資料,並同步到目標資料庫中
(
[DeptID] ,[DeptStatus] ,[DeptTel] ,[DeptBrief] ,[DeptFunctions] )
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
SELECT [DeptID]
,[DeptStatus] ,[DeptTel] ,[DeptBrief] ,[DeptFunctions]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
FROM [WIN-S1PO3UA6J7I].[DBFrom].[dbo].[Org_DepartmentsExt]
以上這兩步便是通過SQL Job實現資料同步的關鍵步驟,在完成以上兩步之後,如果沒有其他的表要進行同步,則可建立同步計劃以完善定時作業。帶作業建立完後,便可以執行。
這裡主要只是演示了通過Sql Job方式實現資料同步的關鍵步驟。網上有很多具體的例項演示。有興趣的朋友可以參考以下文章進行練習檢驗:
http://www.cnblogs.com/tyb1222/archive/2011/05/27/2060075.html
- 通過SQL Server Service Broker 訊息佇列的方式實現資料同步
3.1 SQL Server Service Broker概述
SQL Server Service Broker 是資料庫引擎的組成部分,為 SQL Server 提供佇列和可靠的訊息傳遞。既可用於使用單個 SQL Server 例項的應用程式,也可用於在多個例項間分發工作的應用程式。
在單個 SQL Server 例項內,Service Broker 提供了一個功能強大的非同步程式設計模型。資料庫應用程式通常使用非同步程式設計來縮短互動式響應時間,並增加應用程式總吞吐量。
在多個SQL Server例項之間Service Broker 還可以提供可靠的訊息傳遞服務。Service Broker 可幫助開發人員通過稱為服務的獨立、自包含的元件來編寫應用程式。需要使用這些服務中所包含功能的應用程式可以使用訊息來與這些服務進行互動。Service Broker 使用 TCP/IP 在例項間交換訊息。Service Broker 中所包含的功能有助於防止未經授權的網路訪問,並可以對通過網路傳送的訊息進行加密。
3.2 具體的實現演示
在這一小節裡,主要是通過一個完整的資料同步的流程向大家演示,如何實現同一個資料庫例項不同資料庫的資料同步。關於不同的資料庫例項間的資料庫的資料同步整體上跟同一個例項的資料庫同步是一樣的,只不過在不同的資料庫例項間同步時還需啟用傳輸安全、對話安全,建立路由、遠端服務繫結等額外的操作。
這裡邊用到了大量的SQL Server XML的東西,如果有不理解的地方可以參考以下連結:http://www.cnblogs.com/Olive116/p/3355840.html
這是我在做技術準備時,自己的一點學習記錄。
下面就是具體的實現步驟:
3.2.1為資料庫啟動Service Broker活動
這一步主要是用來對要進行資料同步的資料啟用Service Broker 活動,並且授信。
- 1
USE master
GO
–如果資料庫DBFrom、DBTo不存在,則建立相應的資料庫
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name =’DBFrom’)
CREATE DATABASE DBFrom
GO
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name =’DBTo’)
CREATE DATABASE DBTo
GO
–分別為該資料庫啟用Service Broker活動並且授權信任
ALTER DATABASE DBFrom SET ENABLE_BROKER
GO
ALTER DATABASE DBFrom SET TRUSTWORTHY ON
GO
ALTER AUTHORIZATION ON DATABASE::DBFrom To sa
GO
ALTER DATABASE DBTo SET ENABLE_BROKER
GO
ALTER DATABASE DBTo SET TRUSTWORTHY ON
GO
ALTER AUTHORIZATION ON DATABASE::DBTo TO sa
GO
3.2.2 建立資料庫主密匙
這一步主要用來建立資料庫主密匙,上邊有提到Service Broker可以對要傳送的訊息進行加密。
Use DBFrom
go
create master key
encryption by password=’[email protected]’
go
Use DBTo
go
create master key
encryption by password=’[email protected]’
go
3.2.3 建立訊息型別、協定
這裡主要用來建立訊息型別和訊息協定,源資料庫和目標資料庫的訊息型別和協定都要一致。
Use DBFrom
go
–資料同步—訊息型別
create message type [http://oa.founder.com/Data/Sync]
validation=well_formed_xml
go
–資料同步–錯誤反饋訊息型別
create message type [http://oa.founder.com/Data/Sync/Error]
validation=well_formed_xml
go
–資料同步協議
create contract[http://oa.founder.com/Data/SyncContract]
(
[http://oa.founder.com/Data/Sync]
sent by initiator,
[http://oa.founder.com/Data/Sync/Error]
sent by target
)
go
Use DBTo
go
–資料同步—訊息型別
create message type [http://oa.founder.com/Data/Sync]
validation=well_formed_xml
go
–資料同步–錯誤反饋訊息型別
create message type [http://oa.founder.com/Data/Sync/Error]
validation=well_formed_xml
go
–資料同步協議
create contract[http://oa.founder.com/Data/SyncContract]
(
[http://oa.founder.com/Data/Sync]
sent by initiator,
[http://oa.founder.com/Data/Sync/Error]
sent by target
)
Go
建立過之後效果如下圖:
3.2.4 建立訊息佇列
這裡主要用來建立訊息佇列,源資料庫和目標資料庫都要建立,佇列名字可以自主命名。
- 1
use DBFrom
go
create queue [DBFrom_DataSyncQueue]
with status=on
go
use DBTo
go
create queue [DBFrom_DataSyncQueue]
with status=on
go
建立之後效果如下圖:
3.2.5 建立資料同步服務
這裡我們通過利用上邊建立的訊息協定和訊息佇列來建立資料同步的服務。
use DBFrom
go
create service [http://oa.founder.com/DBFrom/Data/SyncService]
on queue dbo.DBFrom_DataSyncQueue
go
–資料同步服務
use DBTo
go
create service [http://oa.founder.com/DBTo/Data/SyncService]
on queue dbo.DBFrom_DataSyncQueue
go
建立後效果如下圖:
- 1
3.2.6 在源資料庫上建立服務配置列表
這裡需要在源資料庫上建立一個服務配置列表,主要用來儲存之前建立過的服務名稱,本例只是用來演示,所以只建立了一個服務,只能是同步一個數據表,如果有多個數據表需要同步,則需建立多個服務,所以這裡建立一個服務配置列表,用來儲存多個服務的服務名稱。
需要注意的是,下面的指令碼在執行完建立表的操作之後又插入了一條資料,也就是上邊我們建立的服務名,如果有多個服務的話,依次插入該表即可。
use DBFrom
go
–同步資料–目標服務配置
create table SyncDataFarServices
(
ServiceID uniqueidentifier,
ServiceName nvarchar(256)
)
go
–將上邊建立的服務名,插入此表中
insert into SyncDataFarServices (ServiceID,ServiceName)
values
(NEWID(),’http://oa.founder.com/DBTo/Data/SyncService‘)
go
效果如下圖:
3.2.7 傳送資料同步訊息
這裡建立了一個儲存過程主要用來發送同步訊息,該訊息內容主要包括操作型別、主鍵、表名、正文內容,分別對應@DMLType,@PrimaryKeyField,@TableName,@XMLData。然後通過建立一個遊標來條的讀取上邊建立的服務列表中的列表資訊,向不同的服務傳送訊息。
- 1
Use DBFrom
go
–傳送同步資料訊息
Create procedure UP_SyncDataSendMsg
(
@PrimaryKeyField nvarchar(128),
@TableName nvarchar(128),
@DMLType char(1),
@XMLData xml
)
as
begin
SET @XMLData.modify(‘insert {sql:variable(“@DMLType”)} as first into /’);
SET @XMLData.modify(‘insert {sql:variable(“@PrimaryKeyField”)} as first into /’);
SET @XMLData.modify(‘insert
{sql:variable(“@TableName”)}as first into /’);
DECLARE FarServices CURSOR FOR SELECT ServiceName FROM SyncDataFarServices;
open FarServices
declare @FarServiceName nvarchar(256);
fetch FarServices into @FarServiceName;
while @@FETCH_STATUS=0
begin
begin Transaction
declare @Conv_Handler uniqueidentifier
begin DIALOG conversation @Conv_Handler –開始一個會話
from service [http://oa.founder.com/DBFrom/Data/SyncService]
to service @FarServiceName
on contract [http://oa.founder.com/Data/SyncContract];
send on conversation @Conv_Handler
Message type http://oa.founder.com/Data/Sync;
fetch FarServices into @FarServiceName;
commit;
end
close FarServices;
deallocate FarServices;
end
go
3.2.8 建立資料同步異常資訊記錄表
這裡建立該表主要用來記錄在資料同步過程中出現的異常資訊。
use DBFrom
go
create Table dbo.SyncException
(
ErrorID uniqueidentifier,
ConversationHandleID uniqueidentifier,
ErrorNumber int,
ErrorSeverity int,
ErrorState int,
ErrorProcedure nvarchar(126),
ErrorLine int,
ErrorMessage nvarchar(2048),
MessageContent nvarchar(max),
CreateDate DateTime
)
go
–修改異常資訊記錄表
alter table dbo.SyncException
add
PrimaryKeyField nvarchar(128),
TableName nvarchar(128),
DMLType char(1),
DBName nvarchar(128)
Go
效果如下圖:
3.2.9 資料同步反饋
這裡主要用來在源資料庫中接收佇列中的訊息,將同時出錯的資訊,解析一下,然後插入到異常資訊記錄表裡邊。
–資料同步回饋
use DBFrom
go
create procedure UP_SyncDataFeedback
as
begin
set nocount on
–會話變數宣告
declare @ConversationHandle uniqueidentifier;–會話控制代碼
declare @Msg_Body nvarchar(max);
declare @Msg_Type_Name sysname;
–變數賦值
while(1=1)
begin
begin transaction
–從佇列中接收訊息
waitfor
(
receive top(1)
@Msg_Type_Name=message_type_name,
@ConversationHandle=[conversation_handle],
@Msg_Body=message_body
from dbo.[DBFrom_DataSyncQueue]
),timeout 1000
–如果接收到訊息處理,否則跳過
if(@@ROWCOUNT<=0)
break;
if @Msg_Type_Name=’http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’
end conversation @ConversationHandle;
else if @Msg_Type_Name=’http://oa.founder.com/Data/Sync/Error’
begin
declare @DataSource xml;
set @DataSource=Convert(xml,@Msg_Body);
insert into dbo.SyncException(ErrorID,ConversationHandleID,ErrorNumber,ErrorSeverity,ErrorState,ErrorProcedure,ErrorLine,ErrorMessage,
PrimaryKeyField,TableName,DMLType,MessageContent,DBName,CreateDate)
select
NEWID(),@ConversationHandle,
T.c.value(‘./@ErrNumber’,’INT’),
T.c.value(‘./@ErrSeverity’,’INT’),
T.c.value(‘./@ErrState’,’INT’),
T.c.value(‘./@ErrProcedure’,’Nvarchar(126)’),
T.c.value(‘./@ErrLine’,’INT’),
T.c.value(‘./@ErrMessage’,’nvarchar(2048)’),
T.c.value(‘./@PrimaryKeyField’,’nvarchar(128)’),
T.c.value(‘./@TableName’,’nvarchar(128)’),
T.c.value(‘./@DMLType’,’char(1)’),
T.c.value(‘./@MessageContent’,’nvarchar(max)’),
T.c.value(‘./@DBName’,’nvarchar(128)’),
GETDATE()
from @DataSource.nodes(‘/row’) as T(c);
end
else if @Msg_Type_Name=’http://schemas.microsoft.com/SQL/ServiceBroker/Error’
end conversation @ConversationHandle;
commit Transaction;
end
end
commit;
go
3.2.10對Service Broker佇列使用內部啟用,並指定將呼叫的儲存過程
這裡主要用來啟用源資料庫的訊息佇列,併為其指定呼叫的儲存過程,即上邊3.2.9 中建立的儲存過程。
- 1
–對Service Broker佇列使用內部啟用,並指定將呼叫的儲存過程
use DBFrom
go
alter queue dbo.DBFrom_DataSyncQueue with activation
(
status=on,
max_queue_Readers=1,
procedure_name=UP_SyncDataFeedback,
execute as owner
);
Go
3.2.11 在源資料庫中為需要同步的資料表建立觸發器
這裡就以使用者表為例,具體操作如下,這裡通過查詢系統的Inserted和Deleted臨時表來判斷執行同步的操作型別是更新(U)、新增(A)還是刪除(D),最後呼叫3.2.7 中建立的儲存過程來對資料進行處理併發送。
use DBFrom
Go
–使用者資訊同步
Create Trigger UT_DataSync_Users
on dbo.Org_Users
after insert,update,delete
as
set nocount on ;
–變數宣告
declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@DMLType char(1);
declare @InsertCount int ,@DeleteCount int ;
declare @XMLData xml;
–變數賦值
set @PrimaryKeyField=’ID’ –組合主鍵,多個主鍵使用”,”隔開
set @TableName=’Org_Users’
set @InsertCount=(select COUNT(*) from inserted)
set @DeleteCount=(select COUNT(*) from deleted)
if @[email protected] and @InsertCount<>0 —-Update
begin
select @XMLData=(select * from inserted For xml raw,binary base64,ELEMENTS XSINIL);
set @DMLType=’U’;
end
else if(@InsertCount<>0 and @DeleteCount=0) —-Insert
begin
select @XMLData=(select * from inserted for xml raw ,Binary base64,ELEMENTS XSINIL)
set @DMLType=’A’;
end
else—-Delete
begin
select @XMLData=(select *from deleted for xml raw,binary base64,ELEMENTS XSINIL)
set @DMLType=’D’;
end
if(@XMLData is not null)
begin
exec UP_SyncDataSendMsg @PrimaryKeyField,@TableName,@DMLType,@XMLData;
end
go
3.2.12 目標資料庫中建立,字元分割函式
該函式主要是用來進行字元分割,用來處理主鍵有多個欄位的情況。
–目標資料庫
use DBTo
go
–轉換用‘,’分割的字串@str
create Function dbo.uf_SplitString
(
@str nvarchar(max),
@Separator nchar(1)=’,’
)
returns nvarchar(2000)
as
begin
declare @Fields xml;–結果欄位列表
declare @Num int;—–記錄迴圈次數
declare @Pos int;—–記錄開始搜尋位置
declare @NextPos int;–搜尋位置臨時變數
declare @FieldValue nvarchar(256);–搜尋結果
set @Num=0;
set @Pos=1;
set @Fields=CONVERT(xml,’‘);
while (@Pos<=LEN(@Str))
begin
select @NextPos=CHARINDEX(@Separator,@Str,@Pos)
if(@NextPos=0 OR @NextPos is null)
select @NextPos=LEN(@Str)+1;
select @FieldValue=RTRIM(ltrim(substring(@Str,@Pos,@[email protected])))
select @[email protected]+1
set @[email protected]+1;
if @FieldValue<> ”
begin
set @Fields.modify(‘insert {sql:variable(“@FieldValue”)} as last into /Fields[1]’);
end
end
return Convert(nvarchar(2000),@Fields);
end
go
3.2.13 將解析過的訊息資訊,根據操作型別的不同同步到資料表中
這是所有的資料同步中最關鍵也是最複雜的一步了,在整個開發的過程中,大部分時間都花在這上邊了,具體的操作都在下面解釋的很清楚了。
- 1
–將XML資料來源中的資料同步到資料表中(包括增刪改)
Use DBTo
go
create function dbo.UF_XMLDataSourceToSQL
(
@DataSource XML,–資料來源
@TableName varchar(128),–同步資料表名稱
@PrimaryKeyField varchar(128),–需要同步的表的主鍵,主鍵為多個時用‘,’隔開
@DMLType char(1) –A:新建;U:編輯;D:刪除
)
returns nvarchar(4000)
as
begin
–變數宣告及資料初始化
–宣告資料表@TableName列Column相關資訊變數
declare @ColumnName nvarchar(128),@DataType nvarchar(128),@MaxLength int;
–宣告用於拼接SQL的變數
declare @FieldsList nvarchar(4000),@QueryStatement nvarchar(4000);
declare @Sql nvarchar(4000);
declare @StrLength int;
–變數初始化
set @FieldsList=’ ‘;–初始化變數不為null,否則對變數使用’+=’操作符無效
set @QueryStatement=’ ‘;
–主鍵資訊,根據引數求解如:ID1ID2
declare @PKs xml;
–當前欄位是否主鍵-在‘更新’,‘刪除’同步資料時使用
declare @IsPK nvarchar(128);
–初始化遊標–遊標內容包括目標資料表TableName列資訊
DECLARE ColumnNameList CURSOR FOR SELECT COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE [email protected] AND
DATA_TYPE<>’xml’;
–資料處理
if @DMLType=’A’–插入資料
begin
open ColumnNameList
fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
while @@FETCH_STATUS=0
begin
–判斷資料來源列中是否存在屬性:@ColumnName
–判斷資料來源列中是否存在–元素:@ColumnName
If @DataSource.exist(‘/row/*[local-name()=sql:variable(“@ColumnName”)]’)=1
begin
–拼接SQL
set @FieldsList+=(@ColumnName+’,’);
set @QueryStatement+=(‘T.c.value(”(./’[email protected]+’[not(@xsi:nil)])[1]”,”’[email protected]);–元素讀取(包含空值情況)
if @MaxLength is not null and @MaxLength<>-1
begin
set @QueryStatement+=’(‘+CONVERT(nvarchar,@MaxLength)+’)’;
end
else if @MaxLength=-1 and @DataType<>’xml’–已調整
begin
set @QueryStatement+=’(MAX)’;
end
set @QueryStatement+=(”’) as ‘[email protected]+’,’);
end
fetch ColumnNameList into @ColumnName,@DataType,@MaxLength
end
close ColumnNameList;
deallocate ColumnNameList;
set @StrLength=LEN(@FieldsList);
–去掉@FieldsList結尾的’,’
set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);
set @StrLength=LEN(@QueryStatement);
–去掉@QueryStatement結尾的’,’
set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
set @Sql=N’insert into ‘[email protected]+’(‘[email protected]+’) select ‘[email protected]+’ from @DataSource.nodes(”/row/”) as T(c)’;
end
else if @DMLType='U'--更新資料 begin --更新語句where 後的條件表示式 declare @Condition nvarchar(1000); set @Condition=' '; set @PKs=CONVERT(xml,dbo.uf_SplitString(@PrimaryKeyField,',')); Open ColumnNameList fetch ColumnNameList into @ColumnName,@DataType,@MaxLength; while @@FETCH_STATUS=0 begin --判斷資料來源列中是否存在元素:@ColumnName if @DataSource.exist('/row/*[local-name()=sql:variable("@ColumnName")]')=1 begin set @IsPK=null; SELECT @IsPk=Fs.F FROM (SELECT T.c.value('.[text()]','Nvarchar(128)') AS F FROM @PKs.nodes('/Fields/Field') AS T(c))Fs Where Fs.F=@ColumnName if @IsPK is null or @IsPK='' begin --非主鍵,更新欄位值 set @FieldsList+=(@ColumnName+'=Source.'+@ColumnName+','); end else begin --主鍵,作為要更新條件 set @Condition+=@TableName+'.'+@ColumnName+'=Source.'+@ColumnName+' And '; end --XML查詢 set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素讀取(包含空值情況) if @MaxLength is not null and @MaxLength<>-1 begin set @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')'; end else if @MaxLength=-