1. 程式人生 > >MongoDB和資料流:使用MongoDB作為Kafka消費者

MongoDB和資料流:使用MongoDB作為Kafka消費者

640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1

資料流 在當今的資料環境中,沒有一個系統可以提供所有必需的觀點來提供真正的洞察力。從資料中獲取完整含義需要混合來自多個來源的大量資訊。 與此同時,我們不耐煩地立即獲得答案;如果洞察時間超過10毫秒,那麼該值就會丟失 - 高頻交易,欺詐檢測和推薦引擎等應用程式不能等待。這通常意味著在資料進入記錄資料庫之前分析資料的流入。為資料丟失增加零容忍,挑戰變得更加艱鉅。 Kafka和資料流專注於從多個消防軟管攝取大量資料,然後將其路由到需要它的系統 - 過濾,彙總和分析途中。 本文介紹了Apache Kafka,然後演示瞭如何使用MongoDB作為流資料的源(生產者)和目標(消費者)。有關此主題的更完整的研究可以在使用Kafka和MongoDB白皮書的Data Streaming中找到。

Apache Kafka

Kafka提供了一種靈活,可擴充套件且可靠的方法,用於將來自一個或多個生產者的事件資料流傳達給一個或多個消費者。事件的例子包括:

     定期感測器讀數,例如當前溫度      使用者在網上商店中將商品新增到購物車中      正在傳送帶有特定主題標籤的Tweet

Kafka事件流被組織成主題。生產者選擇一個主題來發送給定的事件,而消費者則選擇他們從哪個主題中提取事件。例如,金融應用程式可以從一個主題中提取紐約證券交易所股票交易,並從另一個主題中提取公司財務公告,以尋找交易機會。

在Kafka中,話題被進一步分成多個分割槽來支援擴充套件。每個Kafka節點(代理)負責接收,儲存和傳遞來自一個或多個分割槽的針對給定主題的所有事件。這樣,一個主題的處理和儲存可以在許多Broker中線性擴充套件。類似地,應用程式可以通過針對給定主題使用許多消費者來擴充套件,每個拉事件來自離散的一組分割槽。

640?wx_fmt=png

圖1:Kafka生產者,消費者,主題和分割槽

MongoDB作為Kafka消費者的一個Java示例 為了將MongoDB作為Kafka消費者使用,接收到的事件必須先轉換為BSON文件,然後再儲存到資料庫中。在這個例子中,事件是代表JSON文件的字串。這些字串被轉換為Java物件,以便Java開發人員可以輕鬆使用;那些物件然後被轉換成BSON文件。

完整的原始碼,Maven配置和測試資料可以在下面找到,但這裡有一些亮點;從用於接收和處理來自Kafka主題的事件訊息的主迴圈開始:

640?wx_fmt=png

Fish類包含輔助方法以隱藏物件如何轉換為BSON文件:

640?wx_fmt=png

640?wx_fmt=png

在實際的應用程式中,接收到的訊息可能會更多 - 它們可以與從MongoDB讀取的參考資料結合使用,然後通過釋出到其他主題來處理並傳遞。在這個例子中,最後一步是從mongo shell確認資料已經新增到資料庫中:


640?wx_fmt=png

MongoDB Kafka Consumer的完整Java程式碼 業務物件 -  Fish.java

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

MongoDB的Kafka使用者 -  MongoDBSimpleConsumer.java 

請注意,此示例消費者是使用Kafka Simple Consumer API編寫的 - 還有一個Kafka高階消費者API,它隱藏了很多複雜性 - 包括管理偏移量。 Simple API為應用程式提供了更多控制權,但需要花費額外的程式碼。

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

Maven依賴- pom.xml

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

測試資料 -  Fish.json注入Kafka的測試資料示例如下所示:

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

640?wx_fmt=png

對於簡單測試,可以使用kafka-console-producer.sh命令將此資料注入到clusterdb-topic1主題中。

公眾號推薦:

公眾號:VOA英語每日一聽

微訊號: voahk01

可長按掃碼關注,謝謝

640?wx_fmt=jpeg