面試(涉及技術二)
六、Flume
6.1 Flume的組成
6.1.1 taildir source
1)斷點續傳
2)Apache1.7以及CDH1.6產生
3)若遇到無斷點續傳功能的source怎麼辦?
自定義
4)taildir掛了怎麼辦?
不會丟失資料,因為有斷點續傳,可能會有重複資料
5)如何處理重複資料?
(1)不處理:生產環境通常不處理,因為會影響傳輸效率
(2)處理:可以在taildir source中增加自定義事務;也可以在Hive、SparkStream、Flink中進行處理,分組去重或開窗取第一條、Redis的Set集合都能去重
6)taildir source是否支援遞迴遍歷資料夾來讀取檔案?
不支援,但是可以自定義,遞迴遍歷資料夾 + 讀取檔案
6.1.2 channel
1)file channel
資料儲存於磁碟,可以通過配置dataDirs指向多個路徑,每個路徑對應不同的硬碟,增加Flume的吞吐量,可靠性高,但傳輸速度低,預設容量100萬Event
2)memory channel
資料儲存於記憶體,傳輸速度快,但可靠性差,預設100個Event
3)Kafka channel
資料儲存於Kafka,基於磁碟,可靠性高,傳輸速度大於memory channel + kafka sink(原因是省去了sink階段)
4)kafka channel哪個版本產生的?
Flume1.6產生,但當時還沒火,還存在一些Bug,增加了資料額外的清洗工作,Flume1.7開始解決了Bug,然後就火了
5)生產環境如何選擇?
(1)若下一級是Kafka,選kafka channel
(2)若對資料要求非常準確的公司,選file channel
(3)若是普通日誌,可以選擇memory channel
6.1.3 HDFS sink
6.1.4 事務
source到channel是put事務,channel到sink是take事務
6.2 Flume攔截器
6.2.1 攔截器
1)ETL攔截器:輕度清洗,過濾出Json格式不完整的資料
2)時間攔截器:提取日誌時間作為分割槽的依據,避免零點漂移問題
6.2.2 自定義攔截器步驟
1)實現Interceptor
2)重寫方法
(1)initialize初始化方法
(2)public Event intercept(Event event) 處理單個Event
(3)public List<Event> intercept(List<Event> events) 處理多個Event,在這個方法中呼叫Event intercept(Event event)
(4)close方法
3)靜態內部類,實現Interceptor.Builder
4)打包,上傳至flume/lib目錄下
5)在配置檔案中關聯:全類名 + $builder
6.2.3 攔截器可以不用嗎?
可以不用,在hive的dwd層或sparkStream中處理即可,也可以用,但會影響效能,不推薦用在實時性高的場景
6.3 Flume Channel選擇器
1)複製(預設):將全部source中的Event資料發往所有的channel,我們公司使用預設
2)多路複用:將全部source中的Event資料有選擇的發往相應的channel
6.4 Flume監控器
1)採用ganglia監控,若監控到Flume嘗試提交的次數遠遠大於最終成功的次數,說明Flume執行比較差
2)解決辦法
(1)增加記憶體:在flume-env.sh中設定記憶體大小為4~6G,-Xmx與-Xms最好設定一致,減少記憶體抖動帶來的效能影響,不然容易導致頻繁fullgc
(2)增加伺服器數量:日誌伺服器配置8~16G記憶體、8T硬碟
6.5 Flume採集資料會丟失嗎?
file channel 不會丟失資料,使用file channel 資料可以儲存在磁碟中,並且資料傳輸自身有事務,memory channel可能會丟失資料或channel儲存的資料已滿,導致未寫入的資料丟失
七、Kafka
7.1 Kafka架構
生產者、Broker主機節點、消費者、Zookeeper儲存Broker id和消費者offsets資訊
7.2 Kafka的機器數量
kafka機器數 = 2 * (峰值生產速度 * 副本數 / 100)+ 1,峰值生產速度可以通過生產者的壓力測試得出,若峰值生產速度為50M/s,副本數為2,則Kafka機器數 = 2 * (50 * 2 / 100)+ 1 = 3臺
7.3 副本數設定
我們公司設定為2個,提高可靠性,但增加了網路IO的傳輸
7.4 Kafka壓測
kafka官方自帶壓力測試指令碼,基本上每次此時都是網路IO率先達到瓶頸
7.5 Kafka日誌儲存時間
預設儲存7天,我們公司儲存3天
7.6 Kafka每天有多少資料量?
每天大概有60G的資料量,產生6千萬條日誌資料
平均每秒產生 60000000/24/60/60 = 700多條資料
低谷期每秒產生 20多條資料,也就是凌晨3點到5點這段時間
高峰期每秒產生 1萬多條資料,也就是晚上7點到12點這段時間
每條日誌資料平均大小為1k左右,所以平均每秒大約產生1M左右的資料
7.7 Kafka的硬碟大小
60G(每天產生的資料量) * 2(副本數)* 3(天)/ 70%(預留30%的磁碟)= 500多G,我們公司設定為1T,完全夠Kafka使用了
7.8 Kafka監控
我們公司使用的是開源元件:kafkaEagle
7.9 Kafka分割槽數
建立只有一個分割槽的主題,測試這個主題的生產者和消費者的吞吐量,假設他們的值分別是Tp和Tc,單位可以是MB/s,假設總的目標吞吐量是Tt,則分割槽數= Tt / min(Tp , Tc)
例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;分割槽數=100 / 20 =5分割槽,分割槽數一般設定為:3-10個
我們公司設定為3個
7.10 Kafka的Topic數量
離線數倉中只有一個Topic,實時專案是從mysql中匯入資料,所以有多少張表就有多少個Topic
7.11 Kafka的ISR副本同步佇列
ISR中包括Leader和Follower,若Leader程序掛掉,controller會在ISR佇列中選擇一個服務作為新的Leader(該佇列的作用就是選舉老大)
有兩個引數決定一臺kafka服務是否能加入ISR佇列:延遲時間和延遲條數。在0.10版本移除了延遲條數,防止kafka服務頻繁的進出ISR佇列
7.12 Kafka的分割槽分配策略
1)Range(預設):Range是對每個Topic而言的,首先GroupCoordinator組協調器對同一個Topic裡的分割槽按照序號排序,並對消費者按照字母順序排序,然後用分割槽的個數除以消費者執行緒總數求出每個消費者執行緒消費的分割槽數,若除不盡,則前幾個消費者執行緒多消費一個分割槽。這種方式在某種情況下可能導致資料傾斜
2)RoundRobin(輪詢):將Topic中的每個分割槽以輪詢的方式傳送給消費者執行緒進行消費
3)Sticky: 粘性的, 發生rebalance之後,儘量複用之前的分配關係
7.13 Kafka掛掉怎麼辦?
1)先重啟試試,然後檢視日誌
2)資料可能會丟,當ack設定為1,leader收到資料後,未來得及同步資料就掛掉,這時可能會丟失資料
3)資料也可能重複,kafka沒有事務,ack也沒有設定為-1時,資料可能重複
4)我們公司的日誌伺服器有30天的備份,所以可以重新拉取一次資料即可
7.14 Kafka丟不丟資料?
我們公司設定ack為-1,並且設定ISR佇列中必須至少有兩個kafka服務節點(預設ISR佇列只有一個Leader,因為這種情況相當於ack等於1),以此來保證可靠性
ack = 0 ,生產者傳送完資料就不管了,資料可能會丟
ack = 1,生產者傳送資料,Leader也接收到了資料,寫入了磁碟,但還未來得及同步資料到其它節點,Leader就宕機了,此時資料會丟
ack = -1,生產者傳送資料,Leader接收資料並寫入磁碟,也進行了副本同步,若此時Leader宕機,可能發生重複消費,資料可能會重複
7.15 Kafka資料重複問題
我們公司在kafka中加入了事務和冪等性,並且ack設定為-1,以此來保證資料不重複。當然,如果資料真的重複了,也可以在下游對資料進行處理,比如Hive的DWD層去重(分組或開窗取第一個值)、SparkStreaming、Flink都能去重,還有Redis的Set集合
7.16 Kafka資料積壓怎麼辦?
1)提升消費者組中的消費者數以及Topic中的分割槽數,讓二者相等,我們公司設定為3個分割槽 = 3CPU
2)提高消費者拉取資料的能力,比如Flume每次拉取的資料可以由1000條改為3000條、Spark中將限流的引數增大、Flink中保證資料的處理效率等
7.17 Kafka的引數優化
1)日誌儲存策略有預設的7天改為3天
2)副本數由預設的1個改為2個
3)增大網路延時時間,如果網路不太好的話
4)生產者生產資料預設不進行壓縮,我們可以採用snappy或lzo壓縮,減少網路IO
5)每個Kafka服務的預設記憶體是1個G,可以改為5個G
7.18 Kafka為什麼能高效的讀寫資料?
1)分散式叢集,採用分割槽技術
2)順序寫磁碟
3)零拷貝
7.19 Kafka單條日誌傳輸大小
預設是1M,我們公司的單條日誌資料不會超過1M,所以使用預設值
7.20 Kafka過期資料的清洗
有兩個方式進行處理:一種是壓縮,一種是刪除。我們公司是直接刪除過期的資料
7.21 Kafka可以按照時間消費資料嗎?
可以,有一個引數可以在消費者配置中進行設定,那個引數具體叫啥來著我記不清了
7.22 Kafka消費者角度考慮是拉取資料還是推送資料?
Kafka的消費策略我們可以稱為事件驅動型,是一種被動型的消費,有資料過來我就消費,沒有就閒著,像Flink也是一樣,來一條資料就處理一條,不來就處於空閒狀態
7.23 Kafka中的資料是有序的嗎?
單分割槽內有序,分割槽間無序。如果想讓資料有序,可以在生產者生產資料時指定key,讓相同的key進入同一個分割槽中,因為單分割槽內有序,所以我們在實際開發中用庫名+表名作為key值,以此來保證一張表中的資料有序
八、HBase
8.1 HBase的儲存結構
8.2 HBase的RowKey設計原則
三大原則:長度、雜湊、唯一 三大原則
8.3 實際生產中如何設計RowKey?
1)生產隨機數、雜湊值
2)字串反轉
8.4 Phoenix二級索引
8.4.1 Phoenix簡介
1)定義:一款附著在Hbase上的面板
2)特點:
(1)易於整合,如Spark、Hive、MR、Flume
(2)操作簡單,通過Phoenix操作HBase可以使用SQL化的語法進行操作
(3)支援HBase二級索引的建立
3)Phoenix架構
8.4.3 全域性二級索引
使用Phoenix建立,CREATE INDEX teacher_index ON teacher(name);
8.4.4 本地二級索引
直接建立即可
面試官:怎麼直接建立,在哪建立?
我們:建立表時建立,選擇表中的某個欄位,比如 CREATE LOCAL INDEX teacher_local_index ON teacher(name)
8.4.5 HBase協處理器(擴充套件)
1)案例需求:編寫協處理器,實現在往A表插入資料的同時讓HBase自身(協處理器)向B表中插入一條資料
2)實現步驟:
(1)在Shell視窗建立A表和B表
(2)建立Maven工程,引入依賴
(3)定義MyCoprocessor類並實現RegionObserver和RegionCoprocessor介面
(4)接下來可以通過重啟的或不重啟的方式載入該處理器
重啟的方式:
(1)打包放入HBase的lib目錄下
(2)分發jar包並重啟HBase
(3)建表時指定註冊協處理器即可
不重啟的方式:
(1) 給hbase-site.xml中新增配置,防止協處理器異常導致叢集停機
(2) 打成jar包並修改jar包名稱為c1.jar然後至上傳hdfs,比如存放路徑為 hdfs://hadoop102:9820/c1.jar
(3) 在hbase的shell視窗禁用表
(4) 載入協處理器:alter '表名', METHOD => 'table_att', 'Coprocessor'=>'jar所處HDFS路徑| 協處理器所在的全類名|優先順序|引數'
(5) 啟動表
(6) 向A表插入資料,觀察B表是否同步有資料插入
九、Azkaban
9.1 每天叢集多少指標?什麼時候跑,跑多久?
每天大約跑100多個指標,有時會達到200多個左右;每天00:30開始跑,平均跑3個多小時
9.2 任務掛了怎麼辦?
1)執行成功或者失敗都會發郵件、釘釘、打電話
2)主要的解決方案是嘗試重啟、然後檢視日誌定位問題
十、Spark Core & SQL
10.1 Spark有幾種部署方式?請分別簡要論述
1)Local本地模式,用於測試
2)Standalone模式,Spark自帶的一個排程系統
3)Yarn模式,Spark客戶端直接連線Yarn,不需要額外構建Spark叢集;有yarn-client和yarn-cluster兩個模式,主要區別在於Driver程式的執行節點
4)Mesos模式,國內用的少
10.2 Spark任務使用什麼進行提交?Java介面還是指令碼?
shell指令碼
10.3 Spark提交作業引數
1)executor-cores,每個executor使用的核心數,預設為1,官方建議2-5個,我們企業是4個
2)num-executors,啟動executors的數量,預設為2
3)executor-memory,executor記憶體大小,預設1G
4)driver-cores,driver使用核心數,預設為1
5)driver-memory,driver記憶體大小,預設512M
spark-submit \ --master local[5] \ --driver-cores 2 \ --driver-memory 8g \ --executor-cores 4 \ --num-executors 10 \ --executor-memory 8g \ --class PackageName.ClassName XXXX.jar \ --name "Spark Job Name" \ InputPath \ OutputPath
10.4 簡述Spark的架構與作業提交流程(畫圖講解,註明各個部分的作用)
10.4.1 SparkOnYarnClient提交流程
1)客戶端提交應用給ResourceManager進行處理,並且在Driver端啟動一個DriverEndpoint
2)ResourceManager收到請求後選擇一個NodeManager來啟動AppMaster
3)ApppMaster啟動後向ResourceManager請求註冊自己,表示自己已經啟動,並向ResourceManager申請資源執行Task任務
4)ResourceManager獲取請求後給AppMaster分配資源,AppMaster獲取資源後開始選擇一個NodeManager來啟動Container容器
5)容器啟動後有YarnCoarseGrained ExecutorBackend跟Driver端的DriverEndpoint進行反向註冊
6)DriverEndpoint收到註冊請求後返回註冊完成響應給YarnCoarseGrained ExecutorBackend
7)YarnCoarseGrained ExecutorBackend開始建立執行器,建立Executor
8)Executor建立成功後YarnCoarseGrained ExecutorBackend傳送通知給DriverEndpoint,說:Executor已經建立成功
9)DriverEndpoint隨後給YarnCoarseGrained ExecutorBackend傳送offset,執行Task任務
10.4.2 SparkOnYarnCluster提交流程
1)客戶端提交應用給ResourceManager進行處理
2)ResourceManager收到請求後選擇一個NodeManager來啟動AppMaster,並且在AppMaster中啟動Driver
3)ApppMaster啟動後向ResourceManager請求註冊自己,表示自己已經啟動,並向ResourceManager申請資源執行Task任務
4)ResourceManager獲取請求後給AppMaster分配資源,AppMaster獲取資源後開始選擇一個NodeManager來啟動Container容器
5)容器啟動後有YarnCoarseGrained ExecutorBackend跟Driver中的DriverEndpoint進行反向註冊
6)DriverEndpoint收到註冊請求後返回註冊完成響應給YarnCoarseGrained ExecutorBackend
7)YarnCoarseGrained ExecutorBackend開始建立執行器,建立Executor
8)Executor建立成功後YarnCoarseGrained ExecutorBackend傳送通知給DriverEndpoint,說:Executor已經建立成功
9)DriverEndpoint隨後給YarnCoarseGrained ExecutorBackend傳送offset,執行Task任務
10.5 如何理解Spark中血統的概念(RDD)?
RDD在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來解決資料容錯時的高效性以及劃分任務時候起到重要作用
10.6 簡述Spark的寬窄依賴,以及Spark如何劃分Stage,每個Stage又根據什麼決定Task個數
寬窄依賴可以用來解決資料容錯時的高效性以及參與任務的劃分,不同的是窄依賴沒有shuffle操作,寬依賴有shuffle操作;Spark根據RDD之間的依賴關係將不同的Job劃分為不同的Stage,一個寬依賴劃分一個Stage;每個Stage根據分割槽數劃分Task,Stage就是一個TaskSet集合
10.7 請舉例Spark的Transformation運算元和Action運算元,並簡述功能
10.7.1 Transformation運算元
1)map():輸入一個rdd,輸出一個新rdd
2)mapPartitions():以分割槽為單位進行map()操作
3)flatMap():扁平化操作
4)groupBy():分組
5)filter():過濾
6)sample():取樣
7)coalesce():加減分割槽;當進行縮減分割槽時,第二個引數為true時,會進行shuffle操作
8)repartition():重分割槽
9)sortBy():排序
10)intersection():取交集
11)union():取並集
12)subtract():取差集
13)zip():製作拉鍊表
14)partitionBy():按照key重分割槽
15)reduceByKey():按照key聚合value,溢寫磁碟時進行了類似於MR的Combiner預聚合,效率比groupBy + map高
16)gourpByKey():按照key分組
17)sortByKey():按照key排序
18)mapValues():只對V進行操作
19)join()
20)aggregateByKey():按照key處理分割槽核心分割槽間的邏輯
21)foldByKey():分割槽內和分割槽間邏輯相同的aggregateByKey()
22)combineByKey():轉換結構後分區內和分割槽間的操作
10.7.2 Action運算元
1)reduce():聚合
2)collect():以資料形式返回資料集,所有資料會被拉取到Driver端,儘量不用
3)first():返回rdd中的第一個元素
4)count():返回rdd中元素的個數
5)take():返回由rdd前n個元素組成的陣列
6)takeOrdered():返回rdd排序後前n個元素組成的陣列
7)countByKey():按key統計key的個數
8)foreach():遍歷rdd中的每一個元素
9)saveAsTextFile(path)儲存成Text檔案
10)saveAsSequenceFile(path) 儲存成Sequencefile檔案(只有kv型別RDD有該操作,單值的沒有)
11)saveAsObjectFile(path) 序列化成物件儲存到檔案
10.8 請舉例會引起Shuffle過程的Spark運算元,並簡述功能
1)groupBy():分組
2)coalesce():加減分割槽;當進行縮減分割槽時,第二個引數為true時,會進行shuffle操作
3)repartition():重分割槽
4)subtract():取差集
5)gourpByKey():按照key分組
6)reduceByKey():按照key聚合value,溢寫磁碟時進行了類似於MR的Combiner預聚合,效率比groupBy + map高
7)aggregateByKey():按照key處理分割槽核心分割槽間的邏輯
8)foldByKey():分割槽內和分割槽間邏輯相同的aggregateByKey()
9)combineByKey():轉換結構後分區內和分割槽間的操作
10)sortByKey():按照key排序
10.9 簡述Spark的兩個核心Shuffle的工作流程,包括未優化的和優化的HashShuffle,普通的SortShuffle和Bypass的SortShuffle
10.9.1未經優化的HashShuffle
不同Task中Key相同的資料不會在Executor中提前進行聚合,而是每個Task各自輸出自己的資料
10.9.2優化後的HashShuffle
同一個Executor中的不同Task產生的資料可以根據相同的Key提前進行聚合,這樣在shuffle過程中就能大大減少網路IO的壓力
10.9.3普通的SortShuffle
10.9.1bypass中的SortShuffle
當 shuffle read task 的 數 量 小 於 等 於 spark.shuffle.sort。bypassMergeThreshold 引數的值時(預設為 200),就會啟用 bypass 機制
10.10 Spark常用運算元ReduceByKey與GroupByKey的區別,哪一種更具優勢?
reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,但是需要注意是否會影響業務邏輯,效能比groupByKey高;
groupByKey:按照key進行分組,直接進行shuffle;
10.11 Repartition和Coalesce的關係與區別?
1)關係:兩者都是用來改變rdd的分割槽數的,repartition底層呼叫的就是coalesce
2)區別:repartition一定發生shuffle,coalesce根據傳入的引數判斷是否會發生shuffle;一般情況下增大rdd分割槽數使用repartition,減少rdd分割槽數使用coalesce;當然coalesce也能增大分割槽數;
10.12 分別簡述Spark中的快取機制與Checkpoint機制,並指出兩者的區別與聯絡
1)聯絡:都是做rdd持久化的
2)快取機制:不會截斷血緣關係,計算過程中優先使用資料快取
3)checkpoint:磁碟,截斷血緣關係,在ck之前必須沒有任何任務提交才會生效,ck過程會額外提交一次任務
10.13 簡述Spark中共享變數的基本原理與用途
累加器是Spark中提供的一種分散式的變數機制,其原理類似於mapreduce,即分散式的改變,然後聚合這些改變。累加器的一個常見用途是在除錯時對作業執行過程中的事件進行計數,而廣播變數用來高效分發較大的物件。
共享變量出現的原因:通常在向Spark傳遞函式時,比如使用map()函式或者filter()傳條件時,可以使用驅動器中定義的變數,但叢集中執行的每個任務都會得到這些變數的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變數。
Spark的兩個共享變數,累加器和廣播變數,分別為結果聚合與廣播這兩種常見的通訊模式突破了這一限制。
10.14 當Spark涉及到資料庫的操作時,如何減少Spark執行中的資料庫連線數?
使用foreachPartition代替foreach,在foreachPartition內獲取資料庫的連線
10.15 如何使用Spark實現TopN的獲取(描述思路或虛擬碼)?
10.15.1 方法一
1)按Key對資料進行聚合,groupByKey
2)將value轉為陣列,利用scala的sortBy或者sortWith進行排序,但是可能資料量比較大,會產生OOM
10.15.2 方法二
1)取出所有Key
2)對Key進行迭代,每次取出一個key利用spark的排序運算元進行排序
10.15.3 方法三
1)自定義分割槽器,按照key進行分割槽,使不同的key進入不同的分割槽
2)對每個分割槽運用spark的排序運算元進行排序
10.16 調優之前與調優之後的效能對比
這裡舉個例子。比如我們有幾百個檔案,會有幾百個map出現,讀取之後進行join操作,會非常的慢。這個時候我們使用coalesce減少分割槽,比如240個map,我們合成60個map,也就是窄依賴。這樣再shuffle,過程產生的檔案數會大大減少。提高join的時間效能。
10.17 簡述SparkSQL中RDD、DataFrame、DataSet三者之間的區別與聯絡?
10.18 append和overwrite的區別
append在原有分割槽上進行追加,overwrite在原有分割槽上進行全量重新整理
10.19 coalesce和repartition的區別
coalesce和repartition都用於改變分割槽,coalesce用於縮小分割槽且不會進行shuffle,repartition用於增大分割槽(提供並行度)會進行shuffle,在spark中減少檔案個數會使用coalesce來減少分割槽來到這個目的。但是如果資料量過大,分割槽數過少會出現OOM所以coalesce縮小分割槽個數也需合理
10.20 cache快取級別
DataFrame的cache預設採用 MEMORY_AND_DISK 這和RDD 的預設方式不一樣RDD cache 預設採用MEMORY_ONLY
10.21 釋放快取和快取
快取:(1)dataFrame.cache (2)sparkSession.catalog.cacheTable(“tableName”)
釋放快取:(1)dataFrame.unpersist (2)sparkSession.catalog.uncacheTable(“tableName”)
10.22 Spark Shuffle預設並行度
spark.sql.shuffle.partitions 決定 預設並行度200
10.23 kryo序列化
kryo序列化比java序列化更快更緊湊,但spark預設的序列化是java序列化並不是spark序列化,因為spark並不支援所有序列化型別,而且每次使用都必須進行註冊。註冊只針對於RDD。在DataFrames和DataSet當中自動實現了kryo序列化
10.24 建立臨時表和全域性臨時表
DataFrame.createTempView() 建立普通臨時表
DataFrame.createGlobalTempView()、DataFrame.createOrReplaceTempView() 建立全域性臨時表
10.25 BroadCast Join
原理:先將小表資料查詢出來聚合到driver端,再廣播到各個executor端,使表與表join時進行本地join,避免進行網路傳輸產生shuffle。
使用場景:大表join小表 只能廣播小表
10.26 控制Spark Reduce快取,調優shuffle
spark.reducer.maxSizeInFilght 此引數為reduce task能夠拉取多少資料量的一個引數預設48MB,當叢集資源足夠時,增大此引數可減少reduce拉取資料量的次數,從而達到優化shuffle的效果,一般調大為96MB,資源夠大可繼續往上跳。
spark.shuffle.file.buffer 此引數為每個shuffle檔案輸出流的記憶體緩衝區大小,調大此引數可以減少在建立shuffle檔案時進行磁碟搜尋和系統呼叫的次數,預設引數為32k 一般調大為64k
10.27 註冊UDF函式
SparkSession.udf.register 方法進行註冊
10.28 SparkSQL中Join操作與Left Join的區別?
join和sql中的inner join操作很相似,返回結果是前面一個集合和後面一個集合中匹配成功的,過濾掉關聯不上的。
leftJoin類似於SQL中的左外關聯left outer join,返回結果以第一個RDD為主,關聯不上的記錄為空。部分場景下可以使用left semi join替代left join:因為 left semi join 是 in(keySet) 的關係,遇到右表重複記錄,左表會跳過,效能更高,而 left join 則會一直遍歷。但是left semi join 中最後 select 的結果中只許出現左表中的列名,因為右表只有 join key 參與關聯計算了
十一、Spark Streaming
11.1 Spark Streaming第一次執行不丟失資料?
kafka引數 auto.offset.reset 引數設定成earliest 從最初始偏移量開始消費資料
11.2Spark Streaming精準一次消費
手動維護偏移量,處理完業務資料後,再進行提交偏移量操作,極端情況下,如在提交偏移量時斷網或停電會造成spark程式第二次啟動時重複消費問題,所以在涉及到金額或精確性非常高的場景會使用事物保證精準一次消費
11.3Spark Streaming控制每秒消費資料的速度
通過spark.streaming.kafka.maxRatePerPartition引數來設定Spark Streaming從kafka分割槽每秒拉取的條數
11.4Spark Streaming背壓機制
把spark.streaming.backpressure.enabled 引數設定為ture,開啟背壓機制後Spark Streaming會根據延遲動態去kafka消費資料,上限由spark.streaming.kafka.maxRatePerPartition引數控制,所以兩個引數一般會一起使用
11.5Spark Streaming 一個stage耗時
Spark Streaming stage耗時由最慢的task決定,所以資料傾斜時某個task執行慢會導致整個Spark Streaming都執行非常慢
11.6Spark Streaming 優雅關閉
把spark.streaming.stopGracefullyOnShutdown引數設定成ture,Spark會在JVM關閉時正常關閉StreamingContext,而不是立馬關閉
Kill 命令:yarn application -kill 後面跟 applicationid
11.7Spark Streaming 預設分割槽個數
Spark Streaming預設分割槽個數與所對接的kafka topic分割槽個數一致,Spark Streaming裡一般不會使用repartition運算元增大分割槽,因為repartition會進行shuffle增加耗時
11.8SparkStreaming有哪幾種方式消費Kafka中的資料,它們之間的區別是什麼?
11.8.1 基於Receiver的方式
這種方式使用Receiver來獲取資料。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的資料都是儲存在Spark Executor的記憶體中的(如果突然資料暴增,大量batch堆積,很容易出現記憶體溢位的問題),然後Spark Streaming啟動的job會去處理那些資料。然而,在預設的配置下,這種方式可能會因為底層的失敗而丟失資料。如果要啟用高可靠機制,讓資料零丟失,就必須啟用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka資料寫入分散式檔案系統(比如HDFS)上的預寫日誌中。所以,即使底層節點出現了失敗,也可以使用預寫日誌中的資料進行恢復
11.8.2基於Direct的方式
這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收資料後,這種方式會週期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的資料。
優點如下:
簡化並行讀取:如果要讀取多個partition,不需要建立多個輸入DStream然後對它們進行union操作。Spark會建立跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取資料。所以在Kafka partition和RDD partition之間,有一個一對一的對映關係。
高效能:如果要保證零資料丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為資料實際上被複制了兩份,Kafka自己本身就有高可靠的機制,會對資料複製一份,而這裡又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了資料的複製,那麼就可以通過Kafka的副本進行恢復。
一次且僅一次的事務機制
11.8.3 對比
基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中儲存消費過的offset的。這是消費Kafka資料的傳統方式。這種方式配合著WAL機制可以保證資料零丟失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並儲存在checkpoint中。Spark自己一定是同步的,因此可以保證資料是消費一次且僅消費一次。
在實際生產環境中大都用Direct方式
11.9簡述SparkStreaming視窗函式的原理(重點)
視窗函式就是在原來定義的SparkStreaming計算批次大小的基礎上再次進行封裝,每次計算多個批次的資料,同時還需要傳遞一個滑動步長的引數,用來設定當次計算任務完成之後下一次從什麼地方開始計算。
圖中time1就是SparkStreaming計算批次大小,虛線框以及實線大框就是視窗的大小,必須為批次的整數倍。虛線框到大實線框的距離(相隔多少批次),就是滑動步長