Spark如何實現MapReduce中的setup和cleanup方法
在MapReduce中,Mapper和Reducer可以宣告一個setup方法,在處理一個split輸入之前執行,來進行分配資料庫連線等昂貴資源,同時可以用cleanup函式可以釋放資源。
public class SetupCleanupMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Connection dbConnection; @Override protected void setup(Context context) { dbConnection = ...; } ... @Override protected void cleanup(Context context) { dbConnection.close(); } }
Spark中的map和flatMap等方法每次只能在一個input(一行)上操作,而且沒有提供在轉換大批值前後執行程式碼的方法。
但是可以用mapPartitions或mapPartitionsToPair方法來實現類似setup的目的。
mapPartitions方法和map方法類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的過。
比如,將RDD中的所有資料通過JDBC連線寫入資料庫,如果使用map函式,可能要為每一個元素都建立一個connection,這樣開銷很大,如果使用mapPartitions,那麼只需要針對每一個分割槽建立一個connection。
JavaRDD<Integer> mapOrder = sc.textFile(logFile,3).map(new Function<String,Integer>(){//讀取文字,分成3個分割槽 public Integer call(String v1) throws Exception { // TODO Auto-generated method stub return Integer.parseInt(v1); } }); TTLPartition ttl=new TTLPartition(); // ttl.setup(); JavaPairRDD<Integer, String> res1Pair= mapOrder.mapPartitionsToPair( ttl).partitionBy(new HashPartitioner(1)).//ttl物件是你要處理資料的邏輯 reduceByKey(new Function2<String,String,String>(){ public String call(String v1, String v2) throws Exception { // TODO Auto-generated method stub return v1+v2; }}).sortByKey();
mapPartitionsToPair的call方法實現如下:
public Iterator<Tuple2<Integer, String>> call(Iterator<Integer> t) throws Exception {
// TODO Auto-generated method stub
setup();//這樣就實現mapreduce中對每一個split做預處理,之後才是該split中每一個數據的處理邏輯:迭代器t遍歷split,每個資料執行一次map方法
int vi=0;
while(t.hasNext())
{
vi=t.next();
map(vi);
}
cleanup();
相關推薦
Spark如何實現MapReduce中的setup和cleanup方法
在MapReduce中,Mapper和Reducer可以宣告一個setup方法,在處理一個split輸入之前執行,來進行分配資料庫連線等昂貴資源,同時可以用cleanup函式可以釋放資源。 public class SetupCleanupMapper extend
JUnit測試中setup()和teardown()方法
這幾天做Junit測試接觸到了setup和teardown兩個方法,簡單的可以這樣理解它們,setup主要實現測試前的初始化工作,而teardown則主要實現測試完成後的垃圾回收等工作。
Spark中DenseMatrix中values()和toArray方法的區別
之前一直以為DenseMatrix中的values()和toArray方法獲取到的矩陣的資料是一樣的,結果今日一次矩陣轉置測試時發現兩者獲取到的資料是不一樣的,values()獲取到的資料是將DenseMatrix中的資料以行優先的形式將矩陣中的資料儲存到陣列中,而
MapReduce階段map的setup() 和cleanup()
setup() 此方法被MapReduce框架僅且執行一次,在執行Map任務前,進行相關變數或者資源的集中初始化工作。若是將資源初始化工作放在方法map()中,導致Mapper任務在解析每一行輸入時都會進行資源初始化工作,導致重複,程式執行效率不高! c
Spark機器學習中ml和mllib中矩陣、向量
int reg index mac matrix 對比 判斷 bsp ive 1:Spark ML與Spark MLLIB區別? Spark MLlib是面向RDD數據抽象的編程工具類庫,現在已經逐漸不再被Spark團隊支持,逐漸轉向Spark ML庫,Spark ML是面
servlet表單中get和post方法的區別
pos span 轉化 不可見 上傳文件 post div font 支持 Form中的get和post方法,在數據傳輸過程中分別對應了HTTP協議中的GET和POST方法。二者主要區別如下:1、Get是用來從服務器上獲得數據,而Post是用來向服務器上傳遞數據。2、Get
Java中wait和sleep方法的區別
lee join 告訴 inter art 過程 lam 兩個 一次 1、兩者的區別 這兩個方法來自不同的類分別是Thread和Object 最主要是sleep方法沒有釋放鎖,而wait方法釋放了鎖,使得其他線程可以使用同步控制塊或者方法(鎖代碼塊和方法鎖)。 w
淺析c#中==操作符和equals方法
邏輯 mce 需求 ram margin width 通過 否則 可用 在之前的文章中,我們講到了使用C#中提供的Object類的虛Equals方法來判斷Equality,但實際上它還提供了另外一種判斷Equality的方法,那就是使用==運算符。許多童鞋也許會想當然的
java 中sendredirect()和forward()方法的區別
rect 次數 報錯 nec 重定向 web服務 單獨 exception aca 一.文章1 HttpServletResponse.sendRedirect與RequestDispatcher.forward方法都可以實現獲取相應URL資源。 sendRedirect
mapreduce中map和reduce個數
case when 生成 task 輸入 slots align reducer 進行 很多 一、 控制hive任務中的map數: 1. 通常情況下,作業會通過input的目錄產生一個或者多個map任務。 主要的決定因素有: input的文件總個數,input的
javascript中encodeURI和decodeURI方法
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!  
Java 中hashCode()和equals()方法
Java中,涉及到兩個物件的比較時,我們會用到hashCode()和equals()。這兩個方法是Object類中定義的方法。 1. api中的描述 (1)hashCode() hashCode()方法給物件返回一個hash code值。這個方法被用於hash tables,
Java中synchronized和同步方法
在多執行緒中,有一個經典問題:存票售票問題 如果只用兩個Thread子類則容易陷入死迴圈。 有一個很好的解決辦法就是synchronized。 方法一:在thread子類的run中直接通過synchronized來申請物件的鎖旗標,即用synchronized把存售票程式碼框起來。 方法二:在票類中直
面試官問:能否模擬實現JS的call和apply方法
之前寫過兩篇《面試官問:能否模擬實現JS的new操作符》和《面試官問:能否模擬實現JS的bind方法》 其中模擬bind方法時是使用的call和apply修改this指向。但面試官可能問:能否不用call和apply來實現呢。意思也就是需要模擬實現call和apply的了。 附上之前寫文章寫過的一段
關於python中loc和iloc方法
pandas以類似字典的方式來獲取某一列的值 import pandas as pd import numpy as np table = pd.DataFrame(np.zeros((4,2)), index=['a','b','c','d'], columns=['left', 'right'])
java中set()和get()方法的理解
1.名詞理解 從名字看set是設定的意思而get是獲取的意思,所以顧名思義這兩個方法是對資料進行設定和獲取操作的,我們往往不會單獨的使用它們而是用一些修飾詞配合使用,比如setname(), getname() ,setage(), getage(),等等 2.使用場景 JAVA
JavaScript中call和apply方法的使用
acvaScript中的call()方法和apply()方法,在某些時候這兩個方法還確實是十分重要的。1. 每個函式都包含兩個非繼承而來的方法:call()方法和apply()方法。2. 相同點:這兩個方法的作用是一樣的。都是在特定的作用域中呼叫函式,等於設定函式體內this物件的值,以擴充函式賴以執行的作用
HTTP中GET和POST方法的區別
HTTP請求的方法有很多:GET、POST、HEAD、TRACE、OPTIONS等,但是GET和POST是兩個最常用的方法。 GET是最簡單的一種請求方法,其主要功能是從伺服器端獲取使用者所需資源,並將其作為響應返回給客戶端,需要注意的是:GET方法的作用主要用來獲取伺
java中compareTo和compare方法之比較
這兩個方法經常搞混淆,現對其進行總結以加深記憶。 compareTo(Object o)方法是java.lang.Comparable介面中的方法,當需要對某個類的物件進行排序時,該類需要實現Comparable介面的,必須重寫public int compar
pandas中apply和transform方法的效能比較
1. apply與transform 首先講一下apply() 與transform()的相同點與不同點 相同點: 都能針對dataframe完成特徵的計算,並且常常與groupby()方法一起使用。 不同點: apply()裡面可以跟自定義的函式,包括簡單的求和函式以及複雜的特徵間的差值函式等(注:appl