1. 程式人生 > 實用技巧 >canal實現當資料庫改變時,同步資料到redis

canal實現當資料庫改變時,同步資料到redis

思路

canal感知sql的改變,作為訊息的提供者將訊息(圖片的postion屬性,指圖片位於網頁的位置)放到rabbitmq的佇列,nginx作為訊息的消費者,獲取訊息,並通過Lua指令碼更新資料

第一步,將訊息放到訊息佇列

  啟動類上加上 @EnableCanalClient //聲明當前服務是canal的客戶端

   配置檔案

canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128

  編寫rabbitmq的配置類  

@Configuration
public class RabbitMQConfig {
//定義佇列的名稱
public static final String AD_UPDATE_QUEUE = "ad_update_queue";
//宣告佇列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
}

注意Queue的包時springframework的amqp.core裡的

  編寫監聽類,監聽canal的訊息,併發送到mq

 1 @CanalEventListener
//聲明當前的類是canal的監聽類 2 public class BusinessListener { 3 4 @Autowired 5 private RabbitTemplate rabbitTemplate; 6 7 /** 8 * 9 * @param eventType 當前操作資料庫的型別 10 * @param rowData 當前操作資料庫的資料 changgou_business 11 */ 12 @ListenPoint(schema = "changgou_business", table = {"tb_ad"})//
宣告監聽哪個庫的哪個表 13 public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { 14 System.err.println("廣告資料發生變化"); 15 16 //修改前資料 17 //rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改變前的資料:"+c.getName()+"::"+c.getValue())); 18 for(CanalEntry.Column column: rowData.getBeforeColumnsList()) { 19 if(column.getName().equals("position")){ 20 System.out.println("傳送訊息到mq ad_update_queue:"+column.getValue()); 21 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //傳送訊息到mq 22 break; 23 } 24 } 25 26 //修改後資料 27 //rowData.getAfterColumnsList().forEach((c) -> System.out.println("改變後的資料"+c.getName()+"::"+c.getValue())); 28 for(CanalEntry.Column column: rowData.getAfterColumnsList()) { 29 if(column.getName().equals("position")){ 30 System.out.println("傳送訊息到mq ad_update_queue:"+column.getValue()); 31 //傳送訊息到mq 沒有交換機,路由Key直接寫佇列名 32 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); 33 break; 34 } 35 } 36 } 37 }

  這樣,當資料庫發生改變,canal就會通知這個類,這個類就會把position這個訊息放到訊息佇列裡

第二步,監聽訊息佇列,如果有訊息,就通知nginx呼叫lua更新redis

  在訊息的消費者模組引入座標

 <!--用於從訊息佇列獲取資料-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--用於遠端呼叫nginx-->
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.9.0</version>
        </dependency>

  建立監聽者,程式碼如下

@Component
public class AdListener {

    @RabbitListener(queues = "ad_update_queue")
    public void receiveMessage(String message){
        System.out.println("接收到的訊息未:" + message);

        //發起遠端呼叫nginx,進行更新
        OkHttpClient okHttpClient = new OkHttpClient();
        String url = "http://192.168.200.128/ad_update?position"+message;
        Request request = new Request.Builder().url(url).build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                //如果請求失敗
                e.printStackTrace();
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                //如果請求成功
                System.out.println("請求成功:"+ response.message());
            }
        });

    }
}

商城專案中還有商品行家同步到es的例項