Flink 從 0 到 1 學習之(13)Flink 讀取 Kafka 資料寫入到 RabbitMQ FlinkKafkaRabbitMQ大資料流式計算
前言
之前有文章 《從0到1學習Flink》—— Flink 寫入資料到 Kafka寫過 Flink 將處理後的資料後發到 Kafka 訊息佇列中去,當然我們常用的訊息佇列可不止這一種,還有 RocketMQ、RabbitMQ 等,剛好 Flink 也支援將資料寫入到 RabbitMQ,所以今天我們就來寫篇文章講講如何將 Flink 處理後的資料寫入到 RabbitMQ。
前提準備
安裝 RabbitMQ
這裡我直接用 docker 命令安裝吧,先把 docker 在 mac 上啟動起來。
在命令列中執行下面的命令:
1
|
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management |
對這個命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/
登入使用者名稱和密碼分別是:admin / admin ,登入進去是這個樣子就代表安裝成功了:
依賴
pom.xml 中新增 Flink connector rabbitmq 的依賴如下:
1 |
<dependency> |
生產者
這裡我們依舊自己寫一個工具類一直的往 RabbitMQ 中的某個 queue 中發資料,然後由 Flink 去消費這些資料。
注意按照我的步驟來一步步操作,否則可能會出現一些錯誤!
RabbitMQProducerUtil.java
1 |
import com.rabbitmq.client.Channel; |
Flink 主程式
1 |
import com.zhisheng.common.utils.ExecutionEnvUtil; |
執行 RabbitMQProducerUtil 類,再執行 Main 類!
注意⚠️:
1、RMQConnectionConfig 中設定的使用者名稱和密碼要設定成 admin/admin,如果你換成是 guest/guest,其實是在 RabbitMQ 裡面是沒有這個使用者名稱和密碼的,所以就會報這個錯誤:
1
|
nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
|
不出意外的話應該你執行 RabbitMQProducerUtil 類後,立馬兩個執行的結果都會出來,速度還是很快的。
2、如果你在 RabbitMQProducerUtil 工具類中把註釋的那行程式碼開啟的話:
1 |
// 宣告一個佇列 |
就會出現這種錯誤:
1
|
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
|
這是因為你開啟那個註釋的話,一旦你運行了該類就會建立一個叫做 zhisheng
的 Queue,當你再執行 Main 類中的時候,它又會建立這樣一個叫 zhisheng
的 Queue,然後因為已經有同名的 Queue 了,所以就有了衝突,解決方法就是把那行程式碼註釋就好了。
3、該 connector(聯結器)中提供了 RMQSource 類去消費 RabbitMQ queue 中的訊息和確認 checkpoints 上的訊息,它提供了三種不一樣的保證:
- Exactly-once(只消費一次): 前提條件有,1 是要開啟 checkpoint,因為只有在 checkpoint 完成後,才會返回確認訊息給 RabbitMQ(這時,訊息才會在 RabbitMQ 佇列中刪除);2 是要使用 Correlation ID,在將訊息發往 RabbitMQ 時,必須在訊息屬性中設定 Correlation ID。資料來源根據 Correlation ID 把從 checkpoint 恢復的資料進行去重;3 是資料來源不能並行,這種限制主要是由於 RabbitMQ 將訊息從單個佇列分派給多個消費者。
- At-least-once(至少消費一次): 開啟了 checkpoint,但未使用相 Correlation ID 或 資料來源是並行的時候,那麼就只能保證資料至少消費一次了
- No guarantees(無法保證): Flink 接收到資料就返回確認訊息給 RabbitMQ
Sink 資料到 RabbitMQ
RabbitMQ 除了可以作為資料來源,也可以當作下游,Flink 消費資料做了一些處理之後也能把資料發往 RabbitMQ,下面演示下 Flink 消費 Kafka 資料後寫入到 RabbitMQ。
1 |
public class Main1 { |
是不是很簡單?但是需要注意的是,要換一個之前不存在的 queue,否則是會報錯的。
不出意外的話,你可以看到 RabbitMQ 的監控頁面會出現新的一個 queue 出來,如下圖:
總結
本文先把 RabbitMQ 作為資料來源,寫了個 Flink 消費 RabbitMQ 佇列裡面的資料進行打印出來,然後又寫了個 Flink 消費 Kafka 資料後寫入到 RabbitMQ 的例子!