1. 程式人生 > 程式設計 >Kafka資料如何同步至MaxCompute之實踐講解

Kafka資料如何同步至MaxCompute之實踐講解

摘要:本次分享主要介紹Kafka產品的原理和使用方式,以及同步資料到MaxCompute的引數介紹、獨享整合資源組與自定義資源組的使用背景和配置方式、Kafka同步資料到MaxCompute的開發到生產的整體部署操作等內容。 演講嘉賓簡介:耿江濤,阿里雲智慧技術支援工程師 以下內容根據演講視訊以及PPT整理而成。 本次分享主要圍繞以下兩個方面: 一、背景介紹 二、具體操作流程 1.Kafka訊息佇列使用以及原理 2.資源組介紹以及配置 3.同步過程及其注意事項 4.開發測試以及生產部署 一、背景介紹 1. 實驗目的 在日常工作中,很多企業將APP或網站產生的行為日誌和業務資料通過Kafka收集之後做兩方面的處理。一方面是離線處理,一方面是實時處理。並且一般會投遞到MaxCompute中作為模型的構建,進行相關的業務處理,如使用者的特徵、銷售排名、訂單地區分佈等。這些資料形成之後會在資料報表中作為展示。 2. 方案說明 Kafka資料同步到DataWorks有兩條鏈路。一條鏈路是業務資料和行為日誌通過Kafka,再通過Flume 上傳到Datahub,以及Max Compute,最終在QuickBI進行展示。另一條鏈路是業務資料和行為日誌通過Kafka以及DataWorks,MaxCompute,最終在QuickBI當中展示。 本次展示Kafka通過DataWorks上傳到MaxCompute的流程。從DataWorks上傳到MaxCompute是通過兩種方案進行上傳資料同步的。方案一是自定義資源組,方案二是獨享資源組。自定義資源組一般適用於複雜網路的資料上雲場景。獨享資源組操作方式主要針對整合資源不足的情況。

二、具體操作流程 1.Kafka訊息佇列使用及其原理 Kafka產品概述:訊息佇列 for Apache Kafka 是阿里雲提供的分散式、高吞吐、可擴充套件的訊息佇列服務。訊息佇列for Apache Kafka一般用於日誌收集、監控資料聚合、流式資料處理、線上離線分析等大資料領域。訊息佇列 for Apache Kafka 針對開源的 Apache Kafka 提供全託管服務,徹底解決開源產品長期以來的痛點。雲上Kafka具有低成本、更彈性、更可靠的優勢,使用者只需專注於業務開發,無需部署運維。

Kafka架構介紹:如下圖所示,一個典型的Kafka叢集主要分為四部分。Producer生產資料並通過 push 模式向訊息佇列 for Apache Kafka 的 Kafka Broker 傳送訊息。傳送的訊息可以是網站的頁面訪問、伺服器日誌,也可以是 CPU 和記憶體相關的系統資源資訊。Kafka Broker用於儲存訊息的伺服器。Kafka Broker 支援水平擴充套件。 Kafka Broker 節點的數量越多,Kafka 叢集的吞吐率越高。Kafka Broker針對topic會partition一個概念,partition有leader、follower的角色分配。Consumer通過 pull 模式從訊息佇列 for Apache Kafka Broker 訂閱並消費leader的資訊資料。其中partition內部有offset作為訊息的消費點位。通過ZooKeeper管理叢集的配置、選舉 leader 分割槽,並且在Consumer Group 發生變化時,管理partition_leader的負載均衡。

Kafka訊息佇列購買以及部署:如下圖,使用者首先可以到Kafka訊息佇列產品頁面點選購買,根據個人情況選擇對應包年、包月等消費方式、地區、例項型別、磁碟、流量以及訊息存放時間。其中較為重要的一點是要選擇對應地區,如果使用者的MaxCompute在華北,那麼儘量選擇華北地區。選擇開通完成後需要進行部署。點選部署,選擇合適的VPC及其交換機進行部署。

部署完成後進入Kafka Topic管理頁面,點選建立Topic輸入自己的Topic。Topic命名下面有三條注意資訊,命名儘量跟自己的業務一致,比如是財經業務或者是商務業務,儘量進行區分。第四步進入Consumer Group管理,點選建立Consumer Group建立自己所需要的Consumer Group。Consumer Group的命名也需要規範,如果是財經或商務業務,儘量和自己的Topic相對應。

Kafka白名單配置:Kafka安裝部署完成之後確認需要訪問Kafka的伺服器或產品的白名單。下圖中的預設接入點即為訪問介面。

2.資源組介紹及其配置 自定義資源組的使用背景:自定義資源組一般針對IDC之間的網路問題。本地網路和雲上網路存在差異,如DataWorks可以通過免費傳輸能力(預設任務資源組)進行海量資料上雲,但預設資源組無法實現傳輸速度存在較高要求或複雜環境中的資料來源同步上雲的需求。此時使用者可以使用自定義資源組可實現複雜環境同步上雲的需求,解決DataWorks默 認資源組與您的資料來源不通的問題,或實現更高速度的傳輸能力。然而,自定義資源組主要解決的還是複雜網路環境上雲同步問題,打通任意網路環境之間的資料傳輸同步。

自定義資源組的配置:自定義資源組的配置需要六步操作,首先點選進入DataWorks控制檯,點開工作空間的列表,選擇使用者需要的專案空間,點選進入資料整合,即確認自己的資料整合是要在哪個空間專案下進行新增。之後,點選進入資料來源介面,點選新增自定義資源組。要注意頁面右上角的新增自定義資源組是隻有專案管理員有許可權新增。

第三步是確認Kafka與需要新增的自定義資源組屬於同一個VPC下。本次實驗是ECS向Kafka傳送訊息,二者的VPC應該一致。第四步登入ECS,即個人的自定義資源組。執行命令dmidecode|grep UUID得到ECS的UUID。

第五步是將新增伺服器UUID以及自定義資源組的IP或機器CPU和記憶體填寫進來。最後是在ECS上執行相關命令,Agent安裝共5步,做一一確認,在第4小步完成後點選重新整理檢視服務是否為可用狀態。新增完成後進行檢查連通測試,檢查是否新增成功。

獨享資源組的使用背景:一些客戶反映在Kafka同步到MaxCompute時會報資源不足的問題,可以通過新增獨享資源組的方式進行資料同步。獨享資源模式下,機器的物理資源(網路、磁碟、CPU和記憶體等)完全獨享。不僅可以隔離使用者間的資源使用,也可以隔離不同工作空間任務的資源使用。此外,獨享資源也支援靈活的擴容、縮容功能,可以滿足資源獨 享、靈活配置等需求。獨享資源組可以訪問在同一地域下的VPC資料來源,同時也可以訪問跨地域的公網RDS地址。

獨享資源組的配置:獨享資源組的配置主要需要兩步操作,首先進入DataWorks控制檯的資源列表,點選新增獨享資源組,包括獨享整合資源組和獨享排程資源組。此處選擇新增獨享整合資源組,點選購買時仍要注意選擇對應的購買方式、區域、資源、記憶體、時間期限、數量等。

購買完成後需要把獨享整合資源組繫結到與Kafka對應的VPC,點選專有網路繫結,選擇與Kafka對應的交換機(最明顯的是可用區的區別)、安全組。

3.同步過程及其注意事項 Kafka同步到MaxCompute的需要進行相關引數配置同時需要注意以下幾個事項。 DataWorks資料整合操作:進入DataWorks操作介面,點選建立業務流程,在新建的業務流程新增資料同步節點,再進行命名。

如下圖所示,進入資料同步節點,包括Reader端和Writer端,點選Reader端資料來源為Kafka,Writer端資料來源為ODPS。點選轉化為指令碼模式。下圖右上角是幫助檔案,Reader或Writer端的一些同步引數可以在此處就近點選,方便閱讀、操作和理解。

Kafka Reader的主要引數:Kafka Reader的主要引數首先server,上文所述Kafka的預設接入點就是其中一個server,ip:port。注意此處server是必填引數。topic,表示在Kafka部署完成之後,Kafka處理資料來源的topic,此處也是必填引數。下一個引數是針對列column,column支援常量列、資料列、屬性列。常量列和資料列不太重要。同步的完整訊息一般存放在屬性列 value 中,如果需要其它資訊,如partition、offset、timestamp,也可以在屬性列中篩選。column是必填引數。

keyType、valueType各有6種型別,根據使用者同步的資料,選擇相應的資訊,同步一個型別。需要注意同步方式是按訊息時間同步,還是按消費點位置同步的。按資料消費點位置同步有四個場景,beginDateTime,endDateTime,beginOffset,endOffset。 beginDateTime 和beginOffset 二選其一,作為資料消費起點。endDateTime 和endOffset 二選其一。需要注意beginDateTime、endDateTime 中需要Kafka0.10.2版本以上才支援按資料消費點位置同步功能。另外需要注意beginOffset有三個比較特殊的形式:seekToBeginning,表示從開始點位消費資料;seekToLast,表示從上次消費的偏移位置消費資料,按照beginOffset從上次偏移位置只能一次消費,如果使用beginDateTime則可以多次消費,這取決於訊息存放時間;seekToEnd,表示從最後點位消費資料,會讀取到空資料。

skipExceeedRecord沒有太大作用,是不必填項。partition對topic所有分割槽共同讀消費的,所以無需自定義一個分割槽,是非必填項。kafkaConfig,如果有其它相關配置引數可以擴充套件配置在kafkaConfig,kafkaConfig也是非必填項。

MaxCompute Writer的主要引數:dataSource是資料來源名稱,新增ODPS資料來源。tables,表示所建立的資料表的表名稱,Kafka的資料要同步到哪張表中,相應的欄位也可以建立。 partition,如果表為分割槽表,則必須配置到最後一級分割槽,確定同步位置。若為非分割槽表,則不必填。column,儘量與Kafka column中的相關欄位做一一對應的操作。同步的欄位對應,資訊同步才能確認成功。truncate,寫入時同步的資料是選擇以追加模式寫還是以覆蓋模式寫,儘量避免多個DDL同時操作一個分割槽,或者在多個併發作業啟動前提前建立分割槽。

Kafka同步資料到MaxCompute:將下圖拆分為三部分。Kafka的Reader端,MaxCompute的Writer端以及限制引數。Reader包含server、endOffset、kafkaConfig、group.id、valueType、ByteArray、column欄位、topic、beginOffset、seekToLast等。MaxCompute的Writer端包含覆蓋、追加、壓縮、檢視原始碼、同步到的表、欄位要和Kafka的Reader端做一一對應,最重要的是value資料同步。限制引數,主要有errorlimit,資料超過幾個錯誤後會進行報錯;speed,可以限制流速、併發度等。

參考Kafka生產者SDK編寫程式碼:最終生產出的資料要傳送到Kafka中,通過相關程式碼可以檢視使用者的生產資料。下圖一段程式碼表示配置資訊的讀取,協議、序列化方式以及請求的等待時間,需要傳送哪一個topic,傳送什麼樣的訊息。傳送完成後回傳一個資訊。詳細程式碼可以參考配置檔案、訊息來源、生產者消費者的程式碼模板: help.aliyun.com/document_de…

程式碼打包執行在ECS上(與Kafka同一個可用區):如下圖所示,執行crontab-e命令,每到17:00執行一次。下圖為傳送日誌完成後的訊息記錄。

在MaxCompute上建立表:進入DataWorks業務流程頁面,建立目標表,使用一個DDL語句建立同步的表,或根據使用者個人業務相應建立不同的表的欄位。

4.開發測試以及生產部署 選擇自定義資源組(或獨享整合資源組)進行同步操作:下圖所示,選擇右上角“配置任務資源組”,根據使用者個人需求選擇資源組,點選執行。執行完成後,會出現標識顯示成功,同步資料記錄以及結果是否成功。同步過程基本結束。

查詢同步的資料結果:在DataWorks臨介面檢視同步結果,在臨時節點點選查詢命令,select * from testkafka3(表),檢視資料同步結果。資料已經同步過來,證明測試成功。

設定排程引數:業務流程開發資料同步之後,會對相關模型進行一些業務處理,最後設計一些SQL節點、同步節點,進行部署。如下圖所示,在右側點選排程配置,輸入排程時間。具體操作可參考DataWorks官方檔案完善業務處理流程。

提交業務流程節點,並打包釋出:點選業務流程,選擇所需要提交的節點並提交。一些業務流程提交之後不需要放到生產環境當中。然後進入任務釋出介面,將節點新增到待發布進行任務部署。

確認業務流程釋出成功:最後在運維中心頁面,確認釋出是否在生產環境中存在。至此Kafka同步資料到MaxCompute過程結束。到了對應的排程時間,在各個節點或者右上角會有節點的日誌展示,可以檢視日誌執行情況是否正常,或是否需要進行後續操作,部署資料或是相關命令。



原文連結 本文為雲棲社群原創內容,未經允許不得轉載。