canal實現當資料庫改變時,同步資料到redis
阿新 • • 發佈:2021-01-12
思路
canal感知sql的改變,作為訊息的提供者將訊息(圖片的postion屬性,指圖片位於網頁的位置)放到rabbitmq的佇列,nginx作為訊息的消費者,獲取訊息,並通過Lua指令碼更新資料
第一步,將訊息放到訊息佇列
啟動類上加上 @EnableCanalClient //聲明當前服務是canal的客戶端
配置檔案
canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128
編寫rabbitmq的配置類
@Configuration
public class RabbitMQConfig {
//定義佇列的名稱
public static final String AD_UPDATE_QUEUE = "ad_update_queue";
//宣告佇列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
}
注意Queue的包時springframework的amqp.core裡的
編寫監聽類,監聽canal的訊息,併發送到mq
1 @CanalEventListener//聲明當前的類是canal的監聽類 2 public class BusinessListener { 3 4 @Autowired 5 private RabbitTemplate rabbitTemplate; 6 7 /** 8 * 9 * @param eventType 當前操作資料庫的型別 10 * @param rowData 當前操作資料庫的資料 changgou_business 11 */ 12 @ListenPoint(schema = "changgou_business", table = {"tb_ad"})//宣告監聽哪個庫的哪個表 13 public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { 14 System.err.println("廣告資料發生變化"); 15 16 //修改前資料 17 //rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改變前的資料:"+c.getName()+"::"+c.getValue())); 18 for(CanalEntry.Column column: rowData.getBeforeColumnsList()) { 19 if(column.getName().equals("position")){ 20 System.out.println("傳送訊息到mq ad_update_queue:"+column.getValue()); 21 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //傳送訊息到mq 22 break; 23 } 24 } 25 26 //修改後資料 27 //rowData.getAfterColumnsList().forEach((c) -> System.out.println("改變後的資料"+c.getName()+"::"+c.getValue())); 28 for(CanalEntry.Column column: rowData.getAfterColumnsList()) { 29 if(column.getName().equals("position")){ 30 System.out.println("傳送訊息到mq ad_update_queue:"+column.getValue()); 31 //傳送訊息到mq 沒有交換機,路由Key直接寫佇列名 32 rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); 33 break; 34 } 35 } 36 } 37 }
這樣,當資料庫發生改變,canal就會通知這個類,這個類就會把position這個訊息放到訊息佇列裡
第二步,監聽訊息佇列,如果有訊息,就通知nginx呼叫lua更新redis
在訊息的消費者模組引入座標
<!--用於從訊息佇列獲取資料--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--用於遠端呼叫nginx--> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>3.9.0</version> </dependency>
建立監聽者,程式碼如下
@Component public class AdListener { @RabbitListener(queues = "ad_update_queue") public void receiveMessage(String message){ System.out.println("接收到的訊息未:" + message); //發起遠端呼叫nginx,進行更新 OkHttpClient okHttpClient = new OkHttpClient(); String url = "http://192.168.200.128/ad_update?position"+message; Request request = new Request.Builder().url(url).build(); Call call = okHttpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { //如果請求失敗 e.printStackTrace(); } @Override public void onResponse(Call call, Response response) throws IOException { //如果請求成功 System.out.println("請求成功:"+ response.message()); } }); } }
商城專案中還有商品行家同步到es的例項