1. 程式人生 > 其它 >lua+canal+oepnresty實現入口網站高併發訪問

lua+canal+oepnresty實現入口網站高併發訪問

首頁門戶系統需要展示各種各樣的廣告資料,但是通常情況下,首頁(門戶系統的流量一般非常的高)不適合直接通過mysql資料庫直接訪問的方式來獲取展示。

如下思路:

1.首先訪問nginx ,我們可以採用快取的方式,先從nginx本地快取中獲取,獲取到直接響應

2.如果沒有獲取到,再次訪問redis,我們可以從redis中獲取資料,如果有 則返回,並快取到nginx中

3.如果沒有獲取到,再次訪問mysql,我們從mysql中獲取資料,再將資料儲存到redis中,返回。

而這裡面,我們都可以使用LUA指令碼嵌入到程式中執行這些查詢相關的業務。

 

因為tomcat的併發承受量相對而言,有可能承受不住這種高併發場景,所以整個獲取資料步驟都不用java實現,因為只要這個步驟中出現java程式碼實現,就需要tomcat,這樣就限制住了併發量上限,一切都毫無意義。

 

 

Lua

  Lua 是一種輕量小巧的指令碼語言,用標準C語言編寫並以原始碼形式開放, 其設計目的是為了嵌入應用程式中,從而為應用程式提供靈活的擴充套件和定製功能。

lua的安裝

我們採用linux版本的安裝,首先我們需要準備一個centos

安裝步驟,在linux系統中執行下面的命令。

curl -R -O http://www.lua.org/ftp/lua-5.3.5.tar.gz
tar zxf lua-5.3.5.tar.gz
cd lua-5.3.5
make linux test

注意:此時安裝,有可能會出現如下錯誤:

此時需要安裝lua相關依賴庫的支援,執行如下命令即可:

yum install libtermcap-devel ncurses-devel libevent-devel readline-devel

此時再執行lua命令測試看lua是否安裝成功

LUA的基本語法(跟java語法整體上格式差不多,但又不一樣)

(1)互動式程式設計

  Lua 提供了互動式程式設計模式。我們可以在命令列中輸入程式並立即檢視效果。

  Lua 互動式程式設計模式可以通過命令 lua -i 或 lua 來啟用:

(2)指令碼式程式設計

  我們可以將 Lua 程式程式碼保持到一個以 lua 結尾的檔案,並執行,該模式稱為指令碼式程式設計,例如上面入門程式中將lua語法寫到hello.lua檔案中。

 

註釋:--行註釋:兩個減號是單行註釋:

變數

全域性變數,預設的情況下,定義一個變數都是全域性變數,

如果要用區域性變數 需要宣告為local

如果變數沒有初始化:則 它的值為nil 這和java中的null不同。

 

模組

  模組類似於一個封裝庫,從 Lua 5.1 開始,Lua 加入了標準的模組管理機制,可以把一些公用的程式碼放在一個檔案裡,以 API 介面的形式在其他地方呼叫,有利於程式碼的重用和降低程式碼耦合度。(類似於java的jar包,寫完程式碼後,可以打成jar包供其他人依賴使用,lua中是寫一個lua指令碼,然後其他地方想要使用,直接匯入這個寫好的指令碼(模組)   也可以理解成java中的封裝)

lua的使用在菜鳥教程中有,直接搜尋菜鳥教程lua即可,非常全面,這裡就不再贅述了。

 

 

OpenResty介紹

  OpenResty(又稱:ngx_openresty) 是一個基於 nginx的可伸縮的 Web 平臺,由中國人章亦春發起,提供了很多高質量的第三方模組。

  OpenResty 簡單理解成 就相當於封裝了nginx,並且集成了LUA指令碼,開發人員只需要簡單的其提供了模組就可以實現相關的邏輯,而不再像之前,還需要在nginx中自己編寫lua的指令碼,再進行呼叫了。

 

安裝openresty

centos安裝openresty:

1.新增倉庫執行命令

 yum install yum-utils
 yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

2.執行安裝

yum install openresty

3.安裝成功後 會在預設的目錄如下:

/usr/local/openresty

安裝nginx

  預設已經安裝好了nginx,在目錄:/usr/local/openresty/nginx 下。

修改/usr/local/openresty/nginx/conf/nginx.conf,將配置檔案使用的根設定為root,目的就是將來要使用lua指令碼的時候 ,直接可以載入在root下的lua指令碼。

cd /usr/local/openresty/nginx/conf
vi nginx.conf

在第一行新增程式碼如下:

 配置改完後,需要去指定目錄啟動nginx:

cd /usr/local/openresty/nginx/sbin   
啟動命令:./nginx
可以使用 ps -ef|grep nginx檢視已經啟動的nginx

 

測試訪問

//埠預設是80,可以去nginx配置檔案中修改
http://{ip地址}/

openresty+lua+redis實現多級快取

  openresty有著百萬級的併發承受量,並且openresty內建nginx,也可以做快取,加上redis實現多級快取

1、先建立一個資料夾,用於存放lua指令碼(lua指令碼中連線資料庫、redis獲取資料)

ngx.header.content_type="application/json;charset=utf8"
local uri_args = ngx.req.get_uri_args();
local id = uri_args["id"];
--獲取本地快取物件
local cache_ngx = ngx.shared.dis_cache;
--根據ID 獲取本地快取資料
local contentCache = cache_ngx:get('content_cache_'..id);

--判斷,如果openresty本地沒有自己想要的快取資料的話
if contentCache == "" or contentCache == nil then
    --則連線redis查詢
    local redis = require("resty.redis");
    local red = redis:new()
    red:set_timeout(2000)
    --傳入連線redis的ip和埠
    red:connect("ip", prot)
    --根據ID獲取redis中的資料  這裡的 ..(兩個點)  表示拼接
    local rescontent=red:get("content_"..id);

    --如果redis中沒有查到資料,則……
    if ngx.null == rescontent then
        --連線mysql
        local cjson = require("cjson");
        local mysql = require("resty.mysql");
        local db = mysql:new();
        db:set_timeout(2000)
        --下面的database表示要連線的資料庫  庫名
        local props = {
            host = "127.0.0.1",
            port = 3306,
            database = "xxxx",
            user = "root",
            password = "haozigg"
        }
        local res = db:connect(props);
        local select_sql = "select url,pic from tb_content where status ='1' and category_id="..id.." order by sort_order";
        res = db:query(select_sql);
        
        --使用json格式
        local responsejson = cjson.encode(res);
        --將查詢到的資料往redis中儲存一份,並返回資料
        red:set("content_"..id,responsejson);
        ngx.say(responsejson);
        db:close()
    else
        --如果redis中有資料,則往openresty中儲存一份,並返回資料
        cache_ngx:set('content_cache_'..id, rescontent, 10*60);
        ngx.say(rescontent)
    end
    red:close()
else
    --如果openresty本地有資料的話,直接返回資料
    ngx.say(contentCache)
end    

2、修改nginx配置檔案,指定到上面編寫的lua指令碼路徑

修改/usr/local/openresty/nginx/conf/nginx.conf檔案: 新增頭資訊,和 location資訊

user   root root;
#user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;
    
    
    #配置nginx的快取物件,分配記憶體空間(多大記憶體)快取物件名稱為dis_cache  空間大小為50M
    lua_shared_dict dis_cache 50m;
    
    #限流設定規則  用ip的方式,每次請求會儲存ip,容量為10m,儲存ip的容器名為contentRateLimit,當容量滿後,其他請求無效。速率為2r每秒
    limit_req_zone $binary_remote_addr zone=contentRateLimit:10m rate=2r/s;

    #設定客戶端ip與伺服器的連線數的計數容器(儲存ip計數) 容器名為perip
    limit_conn_zone $binary_remote_addr zone=perip:1m;
    #限制與伺服器的總連線數的計數容器為1m,容器名為perserver
    limit_conn_zone $server_name zone=perserver:1m; 
    
    server {
        listen       80;
        server_name  localhost;
        
        location / {
            limit_conn perip 10;#單個客戶端ip與伺服器的連線數.
            limit_conn perserver 100; #限制與伺服器的總連線數
            root   html;
            index  index.html index.htm;
        }
        
        
        #update_content請求過來後,由openresty接收到並轉發到指定lua指令碼處理該請求
        location /update_content{
            content_by_lua_file /root/lua/update_content.lua;
        }
        
        #read_content  另一個介面轉發
        location /read_content{
            content_by_lua_file /root/lua/read_content.lua;
            #設定該介面使用上面設定的限流規則
            limit_req zone=contentRateLimit burst=4 nodelay;
        }
        
        

    }


    # another virtual host using mix of IP-, name-, and port-based configuration
    #
    #server {
    #    listen       8000;
    #    listen       somename:8080;
    #    server_name  somename  alias  another.alias;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}


    # HTTPS server
    #
    #server {
    #    listen       443 ssl;
    #    server_name  localhost;

    #    ssl_certificate      cert.pem;
    #    ssl_certificate_key  cert.key;

    #    ssl_session_cache    shared:SSL:1m;
    #    ssl_session_timeout  5m;

    #    ssl_ciphers  HIGH:!aNULL:!MD5;
    #    ssl_prefer_server_ciphers  on;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}

}

以上配置就做完了,重新載入nginx配置檔案即可測試:

cd /usr/local/openresty/nginx/sbin
重新載入配置檔案命令:./nginx -s reload

另外,如果測試不成功,可以去看看日誌:

cd /usr/local/openresty/nginx/logs

訪問頁面:http://{ip}/read_content?id=1

上面的配置中,配置了openresty+lua的環境,以及配置了nginx轉發請求到編寫的lua指令碼執行(從nginx中獲取快取,有則返回,沒有則查redis,redis中有則返回並往nginx快取中新增資料,沒有則查mysql)

此外,還做了限流的設定,如下解釋說明:

  一般情況下,首頁的併發量是比較大的,即使 有了多級快取,當用戶不停的重新整理頁面的時候,也是沒有必要的,另外如果有惡意的請求 大量達到,也會對系統造成影響。而限流就是保護措施之一。

  nginx提供兩種限流的方式:

    •   一是控制速率

    •   二是控制併發連線數

控制速率

   控制速率的方式之一就是採用漏桶演算法。 漏桶演算法實現控制速率限流

配置示意圖如下:

 配置說明:

binary_remote_addr 是一種key,表示基於 remote_addr(客戶端IP) 來做限流,binary_ 的目的是壓縮記憶體佔用量。
zone:定義共享記憶體區來儲存訪問資訊, contentRateLimit:10m 表示一個大小為10M,名字為contentRateLimit的記憶體區域。
1M能儲存16000 IP地址的訪問資訊,10M可以儲存16W IP地址訪問資訊。 rate:用於設定最大訪問速率,rate
=10r/s 表示每秒最多處理10個請求。Nginx 實際上以毫秒為粒度來跟蹤請求資訊,
因此 10r/s 實際上是限制:每100毫秒處理一個請求。這意味著,自上一個請求處理完後,若後續100毫秒內又有請求到達,
將拒絕處理該請求.我們這裡設定成2 方便測試。

處理突發流量

  上面例子限制 2r/s,如果有時正常流量突然增大,超出的請求將被拒絕,無法處理突發流量,可以結合 burst 引數使用來解決該問題。(類似於使用者正常手抖多點選了一次訪問,就直接返回503給使用者顯然是不合理的)

burst 譯為突發、爆發,表示在超過設定的處理速率後能額外處理的請求數,當 rate=10r/s 時,將1s拆成10份,即每100ms可處理1個請求。

此處,burst=4 ,若同時有4個請求到達,Nginx 會處理第一個請求,剩餘3個請求將放入佇列,然後每隔500ms從佇列中獲取一個請求進行處理。若請求數大於4,將拒絕處理多餘的請求,直接返回503.

不過,單獨使用 burst 引數並不實用。假設 burst=50 ,rate依然為10r/s,排隊中的50個請求雖然每100ms會處理一個,但第50個請求卻需要等待 50 * 100ms即 5s,這麼長的處理時間自然難以接受。

因此,burst 往往結合 nodelay 一起使用。

例如:如下配置:

server {
    listen       80;
    server_name  localhost;
    location /update_content {
        content_by_lua_file /root/lua/update_content.lua;
    }
    location /read_content {
        limit_req zone=contentRateLimit burst=4 nodelay;
        content_by_lua_file /root/lua/read_content.lua;
    }
}

如上表示:

平均每秒允許不超過2個請求,突發不超過4個請求,並且處理突發4個請求的時候,沒有延遲,等到完成之後,按照正常的速率處理。

如上兩種配置結合就達到了速率穩定,但突然流量也能正常處理的效果。完整配置程式碼在最開始上面的配置中就有。

 

測試:如下圖 在1秒鐘之內可以重新整理4次,正常處理。

但是超過之後,連續重新整理5次,丟擲異常。

 

 

控制併發量(連線數)

ngx_http_limit_conn_module 提供了限制連線數的能力。主要是利用limit_conn_zone和limit_conn兩個指令。

利用連線數限制 某一個使用者的ip連線的數量來控制流量。

注意:並非所有連線都被計算在內 只有當伺服器正在處理請求並且已經讀取了整個請求頭時,才會計算有效連線。此處忽略測試。

配置語法:

Syntax:    limit_conn zone number;
Default: —;
Context: http, server, location;

如下,配置如下:

limit_conn_zone $binary_remote_addr zone=addr:10m;  表示限制根據使用者的IP地址來顯示,設定儲存地址為的記憶體大小10M

limit_conn addr 2;   表示 同一個地址只允許連線2次。

測試:

此時開3個執行緒,測試的時候會發生異常,開2個就不會有異常

 

(2)限制每個客戶端IP與伺服器的連線數,同時限制與虛擬伺服器的連線總數

如下配置:

limit_conn_zone $binary_remote_addr zone=perip:10m;
limit_conn_zone $server_name zone=perserver:10m; 
server {  
    listen       80;
    server_name  localhost;
    charset utf-8;
    location / {
        limit_conn perip 10;#單個客戶端ip與伺服器的連線數.
        limit_conn perserver 100; #限制與伺服器的總連線數
        root   html;
        index  index.html index.htm;
    }
}

canal同步

  canal可以用來監控資料庫資料的變化,從而獲得新增資料,或者修改的資料。

  canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。

  阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。

 

  在上述流程中,使用openresty接收前端請求,並轉發lua處理(多級快取),但此時是有問題的,當專案的運營人員更改了資料時,redis以及openresty快取中的資料就變成垃圾資料了,也就是說,資料存在不一致的情況。

  但此時因為並沒有java程式碼去幹涉這個獲取資料的流程,無法更新資料。

  所以此時,解決方案有三,第一是設定快取的過期時間,上面的配置中,openresty就設定了過期時間,當快取時間過期後,自然會去資料庫中獲取最新的,redis也一樣,可以設定過期時間,雖然不能實時更新資料,但也勉強能做到資料更新。

  方案二則是,openresty不變,用過期時間,但在java專案中,如果改變了資料,則多一個往redis中再次儲存 一次資料,這樣簡單方便就實現了redis的同步,當openresy的快取過期後自然會去redis中獲取最新的資料。

  方案三:使用canal實現資料實時同步到redis中,雖然應用在這裡,我覺得有點多次一舉,因為饒了這麼一大圈,整了這麼多環境,其實最終用起來的實現效果跟方案二是一樣的,不過作為學習來說,多瞭解一種技術是好的,也許以後哪個地方就能用到呢。

 

canal原理相對比較簡單(基於mysql的主從):

  1. canal模擬mysql slave的互動協議,偽裝自己為mysql slave(從),向mysql master傳送dump協議

  2. mysql master(主)收到dump請求,開始推送binary log給slave(也就是canal)

  3. canal解析binary log物件(原始為byte流)

 

開啟binlog模式

   canal是基於mysql的主從模式實現的,所以必須先開啟binlog   連線到mysql中,並修改/etc/mysql/mysql.conf.d/mysqld.cnf   執行如下命令,編輯mysql配置檔案
cd /etc/mysql/mysql.conf.d
vi mysqld.cnf

修改mysqld.cnf配置檔案,新增如下配置:

 上圖配置如下:

log-bin=/var/lib/mysql/mysql-bin
server-id=12345

使用root賬號建立使用者並授予許可權(可以不賦予超級管理員許可權)

create user canal@'%' IDENTIFIED by 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

重啟mysql容器

docker restart mysql

canal容器安裝

下載映象:

docker pull docker.io/canal/canal-server

容器安裝:

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

進入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步資料的資料庫連線配置。

執行命令如下:

docker exec -it canal /bin/bash
cd canal-server/conf/
vi canal.properties
cd example/
vi instance.properties

修改canal.properties的id,不能和mysql的server-id重複,如下圖:

修改instance.properties,配置資料庫連線地址:

 這裡的canal.instance.filter.regex有多種配置,如下:

mysql 資料解析關注的表,Perl正則表示式.
多個正則之間以逗號(,)分隔,轉義符需要雙斜槓(\\) 
常見例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打頭的表:canal\\.canal.*
4.  canal schema下的一張表:canal.test1
5.  多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
注意:此過濾條件只針對row模式的資料有效(ps. mixed/statement因為不解析sql,所以無法準確提取tableName進行過濾)

可以參考地址如下:

https://github.com/alibaba/canal/wiki/AdminGuide

配置完成後,設定開機啟動,並記得重啟canal。

docker update --restart=always canal
docker restart canal

canal微服務搭建(canal客戶端)

  之前搭建的是canal服務端,它會監聽到mysql的資料變化,但是它知道了我不知道,所以需要一個微服務監聽canal server,只要conal server有資料了,我們程式碼裡面也能監聽到,才能執行我們想要的邏輯。

(1)安裝輔助jar包(在自己搭建的微服務工程中)

  在canal\spring-boot-starter-canal-master中有一個工程starter-canal,它主要提供了SpringBoot環境下canal的支援,我們需要先安裝該工程,在starter-canal目錄下執行mvn install

(2)canal微服務工程搭建

canal依賴  使用別人封裝好的,但是這個地址是通過匯入本地maven的方式,如果是線上環境構建的話,需要在伺服器做配置,所以挺麻煩。可以選擇去網上看canal客戶端原始碼,然後實現,也可以去找別人封裝好的包直接使用,網上搜一搜肯定有

依賴:

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!--canal依賴-->
<dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies>

application.yml配置:

server:
  port: 18083
spring:
  application:
    name: canal
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
        #如果enabled設定為false,則請求超時交給ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE
#canal配置
canal:
  client:
    instances:
      example:
        host: 192.168.211.132
        port: 11111

(3)監聽建立

建立一個CanalDataEventListener類,實現對錶增刪改操作的監聽,程式碼如下:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.content.entity.Content;
import com.changgou.content.feign.ContentFeign;
import com.xpand.starter.canal.annotation.*;
import entity.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.List;

@CanalEventListener
public class CanalDataEventListener {

    @Autowired
    private ContentFeign contentFeign;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /***
     * 增加資料監聽
     * @param eventType
     * @param rowData
     */
    /*@InsertListenPoint
    public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }*/

    /***
     * 修改資料監聽
     * @param rowData
     */
    /*@UpdateListenPoint
    public void onEventUpdate(CanalEntry.RowData rowData) {
        System.out.println("UpdateListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }*/

    /***
     * 刪除資料監聽
     * @param eventType
     */
    /*@DeleteListenPoint
    public void onEventDelete(CanalEntry.EventType eventType) {
        System.out.println("DeleteListenPoint");
    }*/

    /***
     * 自定義資料修改監聽
     * @param eventType
     * @param rowData
     * example一定要跟配置檔案中保持一致,並且跟canal中資料夾的名稱保持一致,canal配置的哪個就用哪個
     */
    @ListenPoint(destination = "example", schema = "changgou_content", table = {"tb_content_category", "tb_content"},
            eventType = {   CanalEntry.EventType.UPDATE,
                            CanalEntry.EventType.INSERT,
                            CanalEntry.EventType.DELETE})
    public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("型別:"+eventType.name());
        System.err.println("DeleteListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
        //這裡能獲取到所有被修改資料的欄位名和值,然後執行業務程式碼就可以了,最後更新到redis中就實現了需求
        Long categoryId = Long.valueOf(getColumnValue(eventType, rowData));
        Result<List<Content>> byCategoryId = contentFeign.findByCategoryId(categoryId);
        String o = JSONObject.toJSONString(byCategoryId);
        stringRedisTemplate.boundValueOps("content_"+categoryId).set(o);
    }


    private String getColumnValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        //判斷是delete還是update和insert
        String value = "";
        if (eventType == CanalEntry.EventType.DELETE){
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                if (column.getName().equals("category_id")){
                    value = column.getValue();
                    break;
                }
            }
        }else {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (column.getName().equals("category_id")){
                    value = column.getValue();
                    break;
                }
            }
        }
        return value;
    }
}

(4)啟動類建立:

/**
 * Created by 錢溢 Luna on 2022/5/23 18:29
 * 作用:監聽canal服務端獲取資料,實現mysql資料發生變更,用canal監聽,當canal監聽到資料後,同步到redis中
 */
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableEurekaClient
@EnableCanalClient
@EnableFeignClients(basePackages = "com.changgou.content.feign")//加入註解才能使用feign呼叫,指定basePackages(feign介面的包路徑)不指定預設會掃描所有
public class CanalApplication {
    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class,args);
    }
}

(5)測試

啟動canal微服務,然後修改任意資料庫的表資料,檢視canal微服務後臺輸出,再檢視redis中的資料是否實時更新