1. 程式人生 > 其它 >Flink流式處理百萬資料量CSV檔案

Flink流式處理百萬資料量CSV檔案

技術標籤:平凡人筆記

前言

最近公司讓做一個'沒有必要'的需求

需求針對的物件

這是同一個csv檔案的不同展示形式

  • Excel展示形式
  • 文字展示形式

這個csv檔案中可能有數百萬條資料

需求

將所有的異常資料檢測出來

  • 什麼樣的資料是異常資料

圈紅的資料我手動添加了一個a

原本是數字型別

現在變成了一個字串型別

那麼程式中將字串型別轉換為數字型別的話

就會報錯

那這個值就是異常資料

  • 為什麼我說是 '沒有必要的需求'

    • 百萬級別的資料量的csv一般都是由資料庫匯出來的

    • 資料庫的列欄位都是定義好的 比如是decimal型別的資料型別 匯出來的話 那麼也肯定是數字而不會是字串

    • 而出現數字成字串 是由於人工手動錄入的情況下 才會出現 而這種情況又比較少

    • 該csv資料集用於跑python演算法 比如通過pandas讀取csv 統計某一個列資料的和 若全是數字則可以統計 若某一行資料是字串 則會出現異常 那麼可以通過pandas的方式做異常值資料處理 比如剔除這一行即可

  • 綜上 花費人力物力去處理這一個沒有必要的需求真的有些'沒必要'

但領導發話了呀 這是客戶的需求

所以do it

大致實現思路

讀取該csv檔案

解析csv每一行資料

檢驗每一行資料是否是異常資料

實現方式

  • 普通方式

通過java讀取csv 然後一行一行處理

這種方式 若單機記憶體太小 很容易造成記憶體溢位

而且方式很low 沒有多大挑戰性

對個人技術能力沒有提升

所以這種方式pass

  • Flink 流式處理

剛好頭段時間 自己學習到了Flink

之前一直是紙上談兵

現在終於有了用武之地

選好了技術方案 let's do it!

業務邏輯圖

接下來簡要說說此流程上的核心技術的實現原理

rabbitmq

DEMO原始碼

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/rabbitmq

傳送訊息

重點配置說明

  • durable

    是否持久化,是否將佇列持久化到mnesia資料庫中,有專門的表儲存佇列宣告

  • exclusive

    ①當前定義的佇列是connection的channel是共享的,其他的connection是訪問不到的

    ②當connection關閉的時候,佇列將被刪除

  • autoDelete

    當最後一個consumer(消費者)斷開之後,佇列將自動刪除

監聽訊息

方式一

重要引數說明

  • autoack
autoAck(同no-ack)為true的時候
訊息傳送到作業系統的套接字緩衝區時即任務訊息已經被消費
但如果此時套接字緩衝區崩潰
訊息在未被消費者應用程式消費的情況下就被佇列刪除

所以,如果想要保證訊息可靠的達到消費者端
建議將autoAck欄位設定為false
這樣當上面套接字緩衝區崩潰的情況同樣出現
仍然能保證訊息被重新消費

方式二 註解方式

  • 對類新增@RabbitListener(queues = "${java.flink.queue}")註解

    • 指定佇列名稱 可從配置檔案中讀取
  • 對方法新增 @RabbitHandler 註解

三個引數

  • Object message
任意型別的訊息

#解析mq訊息
StringmessageString=JsonUtils.toJson(message);
Messagemessage1=JsonUtils.fromJsonObject(messageString,Message.class);
Stringmessage2=newString(message1.getBody(),"UTF-8");
  • Message msg
手動確認

//如果手動ACK,訊息會被監聽消費,但是訊息在佇列中依舊存在,如果未配置acknowledge-mode預設是會在消費完畢後自動ACK掉
finallongdeliveryTag=msg.getMessageProperties().getDeliveryTag();

//通知MQ訊息已被成功消費,可以ACK了
channel.basicAck(deliveryTag,false);

  • Channel channel
//處理失敗,重新壓入MQ
channel.basicRecover();

執行緒池

原始碼

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/thread

spring執行緒相關注解

  • @EnableAsync

    使用多執行緒

  • @Async

加線上程任務的方法上(需要非同步執行的任務)
定義一個執行緒任務
通過spring提供的ThreadPoolTaskExecutor就可以使用執行緒池

重要引數

  • corePoolSize

    核心執行緒數

  • maxPoolSize

    最大執行緒數

  • queueCapacity

    佇列容量

  • keepAliveSeconds

    活躍時間

  • waitForTasksToCompleteOnShutdown

    設定執行緒池關閉的時候等待所有任務都完成再繼續銷燬其他的Bean

  • rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())

    • setRejectedExecutionHandler:當pool已經達到max size的時候,如何處理新任務

    • CallerRunsPolicy:不在新執行緒中執行任務,而是由呼叫者所在的執行緒來執行

使用執行緒池中的執行緒去執行非同步任務

分散式記憶體檔案系統Alluxio

環境搭建

  • 自定義dokcer網路
dockernetworkcreatealluxio_nw
  • 安裝alluxio master
dockerrun-d--rm\
-p19999:19999\
-p19998:19998\
--net=alluxio_nw\
--name=alluxio-master\
-eALLUXIO_JAVA_OPTS="\
-Dalluxio.master.hostname=alluxio-master"
\
alluxio/alluxiomaster
  • 安裝alluxio worker
dockerrun-d--rm\
-p29999:29999\
-p30000:30000\
--net=alluxio_nw\
--name=alluxio-worker\
--shm-size=3971.64MB\
-eALLUXIO_JAVA_OPTS="\
-Dalluxio.worker.memory.size=3971.64MB\
-Dalluxio.master.hostname=alluxio-master\
-Dalluxio.worker.hostname=alluxio-worker"
\
alluxio/alluxioworker

域名轉發配置

sudovim/etc/hosts

127.0.0.1alluxio-worker

上傳alluxio檔案

下載alluxio檔案

將檔案流寫入本地

原始碼

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/alluxio

Flink流式處理資料

結合當前業務梳理流程

來源資料來源:數百萬資料量的CSV檔案
結果儲存資料:CSV或Mysql

讀取目標資料

  • 略過表頭

  • 在已知幾列的情況下 執行上圖程式碼

    比如有6列
    那麼讀取csv的時候
    flink均認為是String型別去讀取(人為指定的型別)

篩選異常資料

異常資料的判斷標準

比如輸入資料來源CSV中一行資料為

若認定圈紅的那一列是數字型別

那麼此時因為是字串 無法轉換為數字型別

那麼這一行是異常資料

將異常資料儲存

根據業務靈活處理

  • 第一個全紅的 2: 表示第二行

  • 第二個圈紅的部分 表示 當前列資料應為Double型別但實際上是非數字型別 所以該行是異常資料

在方法內部對於全域性變數的使用僅限於在方法內部使用 不會對方法之後的作用域有效

比如

過濾函式

filter是過濾函式
對每一行資料進行過濾
返回false則會被過濾掉

全域性變數

List<Integer>rowList=newArrayList<>();
在filter函式作用域之外
可以在filter函式中使用
僅限於filter函式之間才有效
在filter函式之後則無法使用filter對該變數的修改
  • 儲存到CSV
  • 缺陷
需要指定Tuple類

比如生成的csv檔案一行有25列那麼就使用Tuple25

還需要定義25個泛型比如Tuple25<String,String,....>

最多可支援25列

如果是超過25列那麼就不支援

所以使用起來非常不方便而且使用範圍有限

我當時在這塊費了時間,因為csv列數超過了25列 比如26列,我想著在增加一個Tuple26或TupleN 嘗試了之後 不可以 後來找到了國內Flink釘釘群 請教了下里面的大佬 說是建議儲存到Mysql中

  • 儲存到Mysql
配置mysql資訊和要執行的sql語句

侷限性

假如我有1000個列那麼需要建立一個表有1000個列嗎

如果有5000個列呢所以這種方式也不太好

此時已經到了專案的最後期限了 很多同事在等著我的結果 我的壓力也倍增 差點準備放棄flink 用low的方式實現 最後靈機一動看到了儲存到txt文字檔案的方法

  • 儲存到Text
這種方式簡單有效

DEMO原始碼

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/flink

Flink國內釘釘群號

群號:23138101

後記

上面這點東西忙活了我3-4天時間

自我感覺真是太笨了

國內相關的資料目前還比較少

寫下這點心得和經驗給需要的朋友們

避免走彎路