1. 程式人生 > >ABP 框架 資料庫底層遷移 Mysql 叢集

ABP 框架 資料庫底層遷移 Mysql 叢集

技術交流,請加QQ群:538327407

我的各種github 開源專案和程式碼:https://github.com/linbin524

背景

 

筆者 目前架構的IOT 專案是使用abp 框架作為後臺,雖然abp的框架適用於中小型專案框架,但由於架構優美,筆者認為還是可以經過改造,作為大型專案中使用。但IOT 的這個專案目前剛上線不久,十幾天資料庫已經有了上百GB,而且由於實施檢查裝置狀態,呼叫裝置狀態維護表,審計日誌壓力很大,單單審計日誌一天的資料量就有幾十萬,目前在架構上,筆者做了幾個優化處理;

1、針對審計日誌,筆者重寫了Abp 原有的 IAuditingStore,實現mongodb和redis 兩種轉移,並且針對審計日誌內容做了過濾,DisableAuditing特性標記指定的類或方法不進行記錄。

ps:abp 雖然有mongodb 的封裝,但它的出發點是和EF 同一個模式,左右系統唯一的ORM,如果要使用abp 的mongo 封裝,必須要替代EF,或者重寫ABP UnitOfWorkOptions,否則直接用會出現工作單元轉換失敗的問題。

2、站點層面使用nginx 做了反向代理,進行多站點服務,通訊模式由原來的佇列、改為服務化,EventBus等方式

3、資料庫底層 做了Percona XtraDB Cluster—MySQL 叢集處理遷移。

 

思考評估:1、審計日誌這樣處理,從源頭做了縮減,並且進行Nosql拆分,有助於緩解資料庫壓力。

                 2、中間層的處理是一般IOT 中介軟體各種腳手架的組合,成熟,也有經過多年生產環境的檢驗。

                 3、資料庫底層 使用Percona XtraDB Cluster,是因為它支援叢集,可以緩解資料庫請求壓力,又支援abp的事務;

               但從真正大系統考慮,其實最理性的模式應該是分片,結合SOA、或者微服務才能真正解決底層壓力,目前考量了Tidb(張善友 張隊推薦的)、oceanbase(淘寶 自有資料庫,生產環境十年)、mycat中介軟體(聽說這個坑多)等,

               為了暫時不做大改造,只能先使用 Percona XtraDB Cluster,後續可能使用Orleans(Azure 雲框架)、akka.net(大型的框架) 或者 

Service Fabric(微服務框架)

     

二、Percona XtraDB Cluster 評估

 

 

優點如下:

1.當執行一個查詢時,在本地節點上執行。因為所有資料都在本地,無需遠端訪問。

2.無需集中管理。可以在任何時間點失去任何節點,但是叢集將照常工作。

3.良好的讀負載擴充套件,任意節點都可以查詢。

缺點如下:

1.加入新節點,開銷大。需要複製完整的資料。

2.不能有效的解決寫縮放問題,所有的寫操作都將發生在所有節點上。

3.有多少個節點就有多少重複的資料。

 

Percona XtraDB Cluster是MySQL高可用性和可擴充套件性的解決方案.

Percona XtraDB Cluster提供的特性有:

1.同步複製,事務要麼在所有節點提交或不提交。

2.多主複製,可以在任意節點進行寫操作。

3.在從伺服器上並行應用事件,真正意義上的並行複製。

4.節點自動配置。

5.資料一致性,不再是非同步複製。

Percona XtraDB Cluster完全相容MySQL和Percona Server,表現在:

1.資料的相容性

2.應用程式的相容性:無需更改應用程式

 

1.叢集是有節點組成的,推薦配置至少3個節點,但是也可以執行在2個節點上。

2.每個節點都是普通的mysql/percona伺服器,可以將現有的資料庫伺服器組成叢集,反之,也可以將叢集拆分成單獨的伺服器。

3.每個節點都包含完整的資料副本。

 

三、部署流程

 

1、環境準備

 

  在騰訊雲上開設三個測試伺服器,系統 映象 CentOS 7.5 64

 

 

用遠端工具連線三臺測試伺服器,完成如下操作

 

(1)  關閉firewalld防火牆

# systemctl disable firewalld --now

關閉防火牆或者允許3306, 4444, 4567和4568四個埠的連線

 

(2)關閉SELINUX

# setenforce 0
# sed -i 's,^SELINUX=enforcing,SELINUX=disabled,g' /etc/selinux/config

 

2、主節點部署

 

(1)安裝PXC yum源

yum install http://www.percona.com/downloads/percona-release/redhat/0.1-3/percona-release-0.1-3.noarch.rpm

(2) 安裝PXC

# yum install Percona-XtraDB-Cluster-56

 

最終下載下來的版本是Percona-XtraDB-Cluster-56-5.6.30

(3) 修改    /etc/my.cnf

     vim  /etc/my.cnf

 

[mysqld]

datadir=/var/lib/mysql
user=mysql

wsrep_provider=/usr/lib64/galera3/libgalera_smm.so
#叢集的ip
wsrep_cluster_address=gcomm://節點ip1,節點ip2,節點ip3

binlog_format=ROW

default_storage_engine=InnoDB

innodb_autoinc_lock_mode=2

#當前主節點的ip
wsrep_node_address=當前節點ip


wsrep_sst_method=xtrabackup-v2

wsrep_cluster_name=my_centos_cluster
#初始化一個mysql的使用者和密碼
wsrep_sst_auth="admin:123456"

 

(4)啟動主節點

 systemctl start [email protected]

(5)進入mysql

登入 (初始化狀態,無密碼,遇到要輸密碼直接回車)

mysql -uroot -p

 

(6) 登入客戶端檢視資料庫的狀態,在進行許可權配置允許ip訪問,預設無法遠端訪問,但是我們需要遠端通過圖形化等介面檢視,所以要做如下配置

mysql> show status like 'wsrep%';

CREATE USER 'admin'@'localhost' IDENTIFIED BY '123456';//如果這裡報錯,看一下是否有 使用者存在了

GRANT RELOAD, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'admin'@'localhost'; 

FLUSH PRIVILEGES;

 

完成後可以用Navicat For mysql 連線看一下是否可以成功訪問

 

3、其他兩個節點的配置

(1)安裝PXC yum源

yum install http://www.percona.com/downloads/percona-release/redhat/0.1-3/percona-release-0.1-3.noarch.rpm

(2) 安裝PXC

# yum install Percona-XtraDB-Cluster-56

(3) 修改    /etc/my.cnf

     vim  /etc/my.cnf

 

[mysqld]

datadir=/var/lib/mysql
user=mysql

wsrep_provider=/usr/lib64/galera3/libgalera_smm.so
#叢集的ip
wsrep_cluster_address=gcomm://節點ip1,節點ip2,節點ip3

binlog_format=ROW

default_storage_engine=InnoDB

innodb_autoinc_lock_mode=2

#當前主節點的ip
wsrep_node_address=當前節點ip


wsrep_sst_method=xtrabackup-v2

wsrep_cluster_name=my_centos_cluster
#初始化一個mysql的使用者和密碼
wsrep_sst_auth="admin:123456"

 (4)啟動當前節點(這一步和主節點不一樣)

systemctl start mysql

(5)進入mysql

登入 (初始化狀態,無密碼,遇到要輸密碼直接回車)

mysql -uroot -p

 

(6) 登入客戶端檢視資料庫的狀態,在進行許可權配置允許ip訪問,預設無法遠端訪問,但是我們需要遠端通過圖形化等介面檢視,所以要做如下配置

mysql> show status like 'wsrep%';

CREATE USER 'admin'@'localhost' IDENTIFIED BY '123456';//如果這裡報錯,看一下是否有 使用者存在了

GRANT RELOAD, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'admin'@'localhost'; 

FLUSH PRIVILEGES;

 

完成後可以用Navicat For mysql 連線看一下是否可以成功訪問

 (7)可以在mysql中執行如下命令檢視

 show status like 'wsrep%';

 如果正常,可以出現如下介面,標識當前三個叢集節點

 

(8)如果出現啟動節點時候出現異常,可以檢視提示的操作,看看日誌,百度一下看看是什麼錯誤,怎麼解決,因為各種錯誤都有,就不好一一解釋了。

比如筆者在操作過程中就出現如下錯誤

ecStop=/usr/bin/mysql-systemd stop (code=exited, status=2)

後面查詢原因應該是 防火牆等問題,進行關閉攔截等操作,就是一開始 環境準備的後面那一步,關閉防火牆、SELINUX,

 

 主節點重啟

systemctl stop [email protected]
systemctl start [email protected]

其他節點也再次啟動

systemctl start mysql

 

4、abp 進行資料庫遷移

 

(1)abp 想要進行mysql 支援,網上的教程有,我就不重複造輪子自己參考(不要要注意 元件的版本,如果出現差異可能會失敗)

 https://www.jianshu.com/p/543e34da16a7?winzoom=1

 

(2) 將資料庫連線字串改為 主節點

 <add name="Default" connectionString="server=主節點ip;port=3306;database=abpzero4_6db;uid=admin;password=123456;" providerName="MySql.Data.MySqlClient" />

(3) 執行遷移 

 

 (4)檢視對應的三臺伺服器叢集都自動同步該資料庫

 

 

(5)在Appservice 中建立測試服務進行增刪改查、事務等測試

using Abp.Application.Services;
using Abp.Application.Services.Dto;
using Abp.AutoMapper;
using Abp.Domain.Repositories;
using Abp.Domain.Uow;
using AutoMapper; using System; using System.Collections.Generic; using System.Data.Entity; using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; using System.Linq.Dynamic; using Abp.Linq.Extensions; using MyCompanyName.AbpZeroTemplate; using MyCompanyName.AbpZeroTemplate.ZLDB_Domain; using MyCompanyName.AbpZeroTemplate.Authorization.Consignee.Exporting; using MyCompanyName.AbpZeroTemplate.ZLDB_Domain.Dtos; using MyCompanyName.AbpZeroTemplate.Dto; namespace MyCompanyName.AbpZeroTemplate { /// <summary> /// 收貨地址 業務實現介面 /// </summary> public class ConsigneeAppService : AbpZeroTemplateAppServiceBase, IConsigneeAppService { private readonly IRepository<Consignee, Guid> _consigneeRepository; private readonly IConsigneeListExcelExporter _iConsigneeListExcelExporter; /// <summary> /// 建構函式自動注入我們所需要的類或介面 /// </summary> public ConsigneeAppService(IRepository<Consignee, Guid> consigneeRepository, IConsigneeListExcelExporter iConsigneeListExcelExporter) { _consigneeRepository = consigneeRepository; _iConsigneeListExcelExporter = iConsigneeListExcelExporter; //_consigneeMongoDbRepository = consigneeMongoDbRepository;  } /// <summary> /// 獲取所有資料列表 /// </summary> /// <returns>返回資料集合</returns> public async Task<List<ConsigneeDto>> GetAllList(string guid = "") { //try //{ // var model = new Consignee() { DealerId = System.Guid.NewGuid() }; // var mr = _consigneeMongoDbRepository.Insert(model); //} //catch (Exception ex) //{ // throw; //} //呼叫Task倉儲的特定方法GetAllWithPeople var resultList = await _consigneeRepository.GetAllListAsync(); return Mapper.Map<List<ConsigneeDto>>(resultList).ToList(); } /// <summary> /// 獲取分頁資料列表 分頁具體程式碼需要適當修改,如orderby 需要匹配 建立時間 或者其他資料Id(int) /// </summary> /// <returns>返回資料集合</returns> public async Task<PagedResultDto<ConsigneeDto>> GetPagedListAsync(PagedAndFilteredInputDto input) { var query = _consigneeRepository.GetAll(); //TODO:根據傳入的引數新增過濾條件 var resultCount = await query.CountAsync(); var resultconsignee = await query .OrderBy(x => x.Id) .PageBy(input) .ToListAsync(); var resultListDtos = resultconsignee.MapTo<List<ConsigneeDto>>(); return new PagedResultDto<ConsigneeDto>( resultCount, resultListDtos ); } /// <summary> /// 獲取指定條件的資料列表 webapi 無法使用 /// </summary> /// <returns>返回資料集合</returns> public async Task<List<ConsigneeDto>> GetListByCodition(Expression<Func<Consignee, bool>> predicate) { var resultList = await _consigneeRepository.GetAllListAsync(predicate); return Mapper.Map<List<ConsigneeDto>>(resultList).ToList(); } /// <summary> /// 匯出excel 具體方法 /// </summary> /// <returns>excel檔案</returns> /// public async Task<FileDto> GetConsigneeToExcel() ///{ /// var resultList = await _consigneeRepository.GetAllListAsync(); /// var consigneeDtos= Mapper.Map<List<ConsigneeDto>>(resultList).ToList(); /// return _iConsigneeListExcelExporter.ExportToFile(consigneeDtos); /// } /// <summary> /// 根據指定id 獲取資料實體 /// </summary> /// <param name="input">當前id</param> /// <returns></returns> public async Task<ConsigneeDto> GetConsigneeForEditAsync(NullableIdDto<System.Guid> input) { var output = new ConsigneeDto(); ConsigneeDto consigneeEditDto; if (input.Id.HasValue) { var entity = await _consigneeRepository.GetAsync(input.Id.Value); consigneeEditDto = entity.MapTo<ConsigneeDto>(); } else { consigneeEditDto = new ConsigneeDto(); } output = consigneeEditDto; return output; } /// <summary> /// 根據Id建立或編輯操作 /// </summary> /// <param name="input">實體</param> /// <returns></returns> public async Task CreateOrUpdateConsigneeAsync(ConsigneeDto input) { if (!string.IsNullOrWhiteSpace(input.Id)) { await Update(input); } else { await Create(input); } } /// <summary> /// 新增 /// </summary> /// <param name="input">新增引數</param> /// <returns>新增實體</returns> public async Task<Guid> Create(ConsigneeDto input) { input.Id = new Consignee().Id.ToString(); var resultObj = input.MapTo<Consignee>(); var result = await _consigneeRepository.InsertAsync(resultObj); return result.Id; } /// <summary> /// 新增 /// </summary> /// <param name="input">新增引數</param> /// <returns>新增實體</returns> public async Task<int> CreateList(List<ConsigneeDto> list) { foreach (var input in list) { if (input.Contact.Contains("ex")) { throw new Exception("測試分散式異常!"); } input.Id = new Consignee().Id.ToString(); var resultObj = input.MapTo<Consignee>(); var result = await _consigneeRepository.InsertAsync(resultObj); } return list.Count(); } /// <summary> /// 修改 /// </summary> /// <param name="input">修改引數</param> /// <returns>修改實體</returns> public async Task<ConsigneeDto> Update(ConsigneeDto input) { Consignee obj = await _consigneeRepository.GetAsync(new Guid(input.Id)); input.MapTo(obj); var result = await _consigneeRepository.UpdateAsync(obj); return obj.MapTo<ConsigneeDto>(); } /// <summary> /// 刪除 /// </summary> /// <param name="input">刪除Dto</param> /// <returns>無返回值</returns> public async System.Threading.Tasks.Task Delete(EntityDto<string> input) { await _consigneeRepository.DeleteAsync(new Guid(input.Id)); } /// <summary> /// 刪除 webapi 無法使用 /// </summary> /// <param name="predicate">刪除條件</param> /// <returns>無返回值</returns> public async System.Threading.Tasks.Task DeleteByCondition(Expression<Func<Consignee, bool>> predicate) { await _consigneeRepository.DeleteAsync(predicate); } } }

在swagger ui中增刪改查都已經正常,而且資料在三個資料庫中正常同步

 

 

針對事務,做了人為異常處理,確認會實現回滾(abp 自帶工作單元處理事務)

 

 

 五、後記

 這一次只是做了簡單的實驗性測試,後續需要在加強深入檢測,才可以用生產環境中。