Flink流式處理百萬資料量CSV檔案
技術標籤:平凡人筆記
前言
最近公司讓做一個'沒有必要'的需求
需求針對的物件
這是同一個csv檔案的不同展示形式
-
Excel展示形式
-
文字展示形式
這個csv檔案中可能有數百萬條資料
需求
將所有的異常資料檢測出來
-
什麼樣的資料是異常資料
圈紅的資料我手動添加了一個a
原本是數字型別
現在變成了一個字串型別
那麼程式中將字串型別轉換為數字型別的話
就會報錯
那這個值就是異常資料
-
為什麼我說是 '沒有必要的需求'
-
百萬級別的資料量的csv一般都是由資料庫匯出來的
-
資料庫的列欄位都是定義好的 比如是decimal型別的資料型別 匯出來的話 那麼也肯定是數字而不會是字串
-
而出現數字成字串 是由於人工手動錄入的情況下 才會出現 而這種情況又比較少
-
該csv資料集用於跑python演算法 比如通過pandas讀取csv 統計某一個列資料的和 若全是數字則可以統計 若某一行資料是字串 則會出現異常 那麼可以通過pandas的方式做異常值資料處理 比如剔除這一行即可
-
-
綜上 花費人力物力去處理這一個沒有必要的需求真的有些'沒必要'
但領導發話了呀 這是客戶的需求
所以do it
大致實現思路
讀取該csv檔案
解析csv每一行資料
檢驗每一行資料是否是異常資料
實現方式
-
普通方式
通過java讀取csv 然後一行一行處理
這種方式 若單機記憶體太小 很容易造成記憶體溢位
而且方式很low 沒有多大挑戰性
對個人技術能力沒有提升
所以這種方式pass
-
Flink 流式處理
剛好頭段時間 自己學習到了Flink
之前一直是紙上談兵
現在終於有了用武之地
選好了技術方案 let's do it!
業務邏輯圖
接下來簡要說說此流程上的核心技術的實現原理
rabbitmq
DEMO原始碼
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/rabbitmq
傳送訊息
重點配置說明
-
durable
是否持久化,是否將佇列持久化到mnesia資料庫中,有專門的表儲存佇列宣告
-
exclusive
①當前定義的佇列是connection的channel是共享的,其他的connection是訪問不到的
②當connection關閉的時候,佇列將被刪除
-
autoDelete
當最後一個consumer(消費者)斷開之後,佇列將自動刪除
監聽訊息
方式一
重要引數說明
-
autoack
autoAck(同no-ack)為true的時候
訊息傳送到作業系統的套接字緩衝區時即任務訊息已經被消費
但如果此時套接字緩衝區崩潰
訊息在未被消費者應用程式消費的情況下就被佇列刪除
所以,如果想要保證訊息可靠的達到消費者端
建議將autoAck欄位設定為false
這樣當上面套接字緩衝區崩潰的情況同樣出現
仍然能保證訊息被重新消費
方式二 註解方式
-
對類新增@RabbitListener(queues = "${java.flink.queue}")註解
-
指定佇列名稱 可從配置檔案中讀取
-
-
對方法新增 @RabbitHandler 註解
三個引數
-
Object message
任意型別的訊息
#解析mq訊息
StringmessageString=JsonUtils.toJson(message);
Messagemessage1=JsonUtils.fromJsonObject(messageString,Message.class);
Stringmessage2=newString(message1.getBody(),"UTF-8");
-
Message msg
手動確認
//如果手動ACK,訊息會被監聽消費,但是訊息在佇列中依舊存在,如果未配置acknowledge-mode預設是會在消費完畢後自動ACK掉
finallongdeliveryTag=msg.getMessageProperties().getDeliveryTag();
//通知MQ訊息已被成功消費,可以ACK了
channel.basicAck(deliveryTag,false);
-
Channel channel
//處理失敗,重新壓入MQ
channel.basicRecover();
執行緒池
原始碼
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/thread
spring執行緒相關注解
-
@EnableAsync
使用多執行緒
-
@Async
加線上程任務的方法上(需要非同步執行的任務)
定義一個執行緒任務
通過spring提供的ThreadPoolTaskExecutor就可以使用執行緒池
重要引數
-
corePoolSize
核心執行緒數
-
maxPoolSize
最大執行緒數
-
queueCapacity
佇列容量
-
keepAliveSeconds
活躍時間
-
waitForTasksToCompleteOnShutdown
設定執行緒池關閉的時候等待所有任務都完成再繼續銷燬其他的Bean
-
rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
-
setRejectedExecutionHandler:當pool已經達到max size的時候,如何處理新任務
-
CallerRunsPolicy:不在新執行緒中執行任務,而是由呼叫者所在的執行緒來執行
-
使用執行緒池中的執行緒去執行非同步任務
分散式記憶體檔案系統Alluxio
環境搭建
-
自定義dokcer網路
dockernetworkcreatealluxio_nw
-
安裝alluxio master
dockerrun-d--rm\
-p19999:19999\
-p19998:19998\
--net=alluxio_nw\
--name=alluxio-master\
-eALLUXIO_JAVA_OPTS="\
-Dalluxio.master.hostname=alluxio-master"\
alluxio/alluxiomaster
-
安裝alluxio worker
dockerrun-d--rm\
-p29999:29999\
-p30000:30000\
--net=alluxio_nw\
--name=alluxio-worker\
--shm-size=3971.64MB\
-eALLUXIO_JAVA_OPTS="\
-Dalluxio.worker.memory.size=3971.64MB\
-Dalluxio.master.hostname=alluxio-master\
-Dalluxio.worker.hostname=alluxio-worker"\
alluxio/alluxioworker
域名轉發配置
sudovim/etc/hosts
127.0.0.1alluxio-worker
上傳alluxio檔案
下載alluxio檔案
將檔案流寫入本地
原始碼
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/alluxio
Flink流式處理資料
結合當前業務梳理流程
來源資料來源:數百萬資料量的CSV檔案
結果儲存資料:CSV或Mysql
讀取目標資料
-
略過表頭
-
在已知幾列的情況下 執行上圖程式碼
比如有6列
那麼讀取csv的時候
flink均認為是String型別去讀取(人為指定的型別)
篩選異常資料
異常資料的判斷標準
比如輸入資料來源CSV中一行資料為
若認定圈紅的那一列是數字型別
那麼此時因為是字串 無法轉換為數字型別
那麼這一行是異常資料
將異常資料儲存
根據業務靈活處理
-
第一個全紅的 2: 表示第二行
-
第二個圈紅的部分 表示 當前列資料應為Double型別但實際上是非數字型別 所以該行是異常資料
在方法內部對於全域性變數的使用僅限於在方法內部使用 不會對方法之後的作用域有效
比如
過濾函式
filter是過濾函式
對每一行資料進行過濾
返回false則會被過濾掉
全域性變數
List<Integer>rowList=newArrayList<>();
在filter函式作用域之外
可以在filter函式中使用
僅限於filter函式之間才有效
在filter函式之後則無法使用filter對該變數的修改
-
儲存到CSV
-
缺陷
需要指定Tuple類
比如生成的csv檔案一行有25列那麼就使用Tuple25
還需要定義25個泛型比如Tuple25<String,String,....>
最多可支援25列
如果是超過25列那麼就不支援
所以使用起來非常不方便而且使用範圍有限
我當時在這塊費了時間,因為csv列數超過了25列 比如26列,我想著在增加一個Tuple26或TupleN 嘗試了之後 不可以 後來找到了國內Flink釘釘群 請教了下里面的大佬 說是建議儲存到Mysql中
-
儲存到Mysql
配置mysql資訊和要執行的sql語句
侷限性
假如我有1000個列那麼需要建立一個表有1000個列嗎
如果有5000個列呢所以這種方式也不太好
此時已經到了專案的最後期限了 很多同事在等著我的結果 我的壓力也倍增 差點準備放棄flink 用low的方式實現 最後靈機一動看到了儲存到txt文字檔案的方法
-
儲存到Text
這種方式簡單有效
DEMO原始碼
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/flink
Flink國內釘釘群號
群號:23138101
後記
上面這點東西忙活了我3-4天時間
自我感覺真是太笨了
國內相關的資料目前還比較少
寫下這點心得和經驗給需要的朋友們
避免走彎路