Spark HadoopRdd partition的開始位置計算
阿新 • • 發佈:2018-12-10
Spark HadoopRdd partition的開始位置計算
- Hadoop RDD partition資料範圍計算
更多資源
- SPARK 原始碼分析技術分享(bilibilid視訊彙總套裝視訊): https://www.bilibili.com/video/av37442139/
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(彙總視訊線上看): https://blog.csdn.net/thinktothings/article/details/84726769
bilibili 視訊說明
- HadoopRdd partition的開始位置計算圖解(bilibili視訊) : https://www.bilibili.com/video/av37442139/?p=26
- HadoopRdd partition的開始位置計算原始碼分析(bilibili視訊) : https://www.bilibili.com/video/av37442139/?p=27
<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=37442139&cid=66303785&page=26" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>
<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=37442139&cid=66303785&page=27" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>
前置條件
- Hadoop版本: Hadoop 2.6.0-cdh5.15.0
- Spark版本: SPARK 1.6.0-cdh5.15.0
概述
- 原始碼分析Spark HadoopRDD是如何讀取HDFS上的檔案
- 分析HadoopRDD預分割槽的計算方式,非首個分割槽的開始位置計算
HDFS資料檔案
a b k l j
c a n m o
HDFS 資料檔案圖解
HDFS 資料檔案圖解(對比)
圖一
圖二
HadoopRDD partition預劃分方式(實際會有小的調整)
- 每個partition的長度= 檔案的總長度 / 最小的分割槽數(預設分割槽數為2) //注意,是除,結果會取整, 即 goalSize = totalSize / numSplits
- 示例中每個partition的長度 = 20 / 2 =10 // 即為10個byte
- 然後依次從0開始劃分10個byte長度為一個partition,最後一個小於等於10個byte的為最後一個partition
- 所以 parition(0) = hdfs檔案(0 + 10) //即從檔案偏移量為0開始,共10byte,0 <= 值 < 10
- 所以 parition(1) = hdfs檔案(10 + 10) //即從檔案偏移量為10開始,共10byte,10 <= 值 < 20
- 即 partition(i) = hdfs檔案( i * goalSize + 10 )
HadoopRDD partition劃分原理
- 由於需要考慮,每個partition誰先執行是不確定的,所以每個partition執行時,都需要可明確計算當前partition的資料範圍
- 由於直接按partition預劃分方式,會把有的一行資料拆分,有些場景不適合(如錢金額,片語一般都不希望被拆分,所以一般按行拆分)
- 所以需要按行做為最小的資料劃分單元,來進行partition的資料範圍劃分
- HadoopRDD是這樣劃分的partition,還是按partition預劃分方式進行預先劃分,不過在計算時會進行調整
- 對於首個partition,也就是partition(0),分割槽資料範圍的開始位置就是從0開始(0 + goalSize )
- 對於非首個partition,的開始位置需要從新計算,從預劃分的當前partition的開始位置開始找第一個換行符位置(indexNewLine),當前partition的開始位置為= indexNewLine + 1,長度還是goalSize
- 對於首個partition一定能分到資料(只要HDFS檔案有資料)
- 非首個partition,有可能分不到資料的情況,分不到資料的情況,就是資料被上一個partition劃分完了
partition分不到資料(以下情況同時滿足)
- 是非首個partition,也就是不是partition為索引為0
- partition從預分割槽開始位置往後讀到的第一個換行符大於等於預分割槽的結束位置 (或者該partition就沒有一個換行符)
原始碼分析
- HadoopRDD
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
// Sets the thread local variable for the file's name
split.inputSplit.value match {
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDDState.unsetInputFileName()
}
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
}
inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
//返回TextInputFormat物件
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
//例項化物件 org.apache.hadoop.mapred.LineRecordReader
//new LineRecordReader()例項方法中, 並且會重新計算當前partition的開始位置(與預分割槽的會有出入)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
override def getNext(): (K, V) = {
try {
//呼叫 org.apache.hadoop.mapred.LineRecordReader.next()方法
finished = !reader.next(key, value)
} catch {
case _: EOFException if ignoreCorruptFiles => finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
//返回當前一對(key,value)對應的值
(key, value)
}
override def close() {
if (reader != null) {
SqlNewHadoopRDDState.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
// corruption issues when reading compressed input.
try {
reader.close()
} catch {
case e: Exception =>
if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
} finally {
reader = null
}
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
- TextInputFormat
- 返回LineRecordReader
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}
- LineRecordReader
- 例項方法中,重新定位當前partition的開始位置
- 如果是partition(0),開始位置是0
- 如果不是partition(0),開始位置重新計算
- 呼叫 in.readLine()方法,等於呼叫 UncompressedSplitLineReader.readLine(),注意此時傳的maxLineLength引數為0
public LineRecordReader(Configuration job, FileSplit split,
byte[] recordDelimiter) throws IOException {
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, recordDelimiter);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
//讀取檔案,定位的檔案偏移量為,當前partition預分割槽的開始位置
in = new UncompressedSplitLineReader(
fileIn, job, recordDelimiter, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
//呼叫 in.readLine()方法,等於呼叫 UncompressedSplitLineReader.readLine(),
//注意此時傳的maxLineLength引數為0
//定位當前分割槽的開始位置,等於預分割槽的位置 + 讀到的第一個換行符的長度
//也就是從當前partition開始位置計算,到讀到的第一次換行符,屬於上一個partition,在向後位置偏移位置+1,就是當前分割槽的實時開始位置
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
- UncompressedSplitLineReader.readLine()
- 呼叫LineReader.readLine()方法
@Override
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
int bytesRead = 0;
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
if (totalBytesRead > splitLength) {
finished = true;
}
bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
}
return bytesRead;
}
- LineReader.readLine()方法
- 呼叫 LineReader.readDefaultLine()方法
/**
* Read one line from the InputStream into the given Text.
*
* @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str;
* the rest of the line is silently discarded.
* @param maxBytesToConsume the maximum number of bytes to consume
* in this call. This is only a hint, because if the line cross
* this threshold, we allow it to happen. It can overshoot
* potentially by as much as one buffer length.
*
* @return the number of bytes read including the (longest) newline
* found.
*
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
}
- LineReader.readDefaultLine()方法
- 具體計算partition的開始位置的方法
- 注意,此時傳過來的maxLineLength引數值為0,也就是先不實際讀取資料放到(key,value)的value中
- 呼叫 UncompressedSplitLineReader.fillBuffer()方法,實際讀取HDFS上的檔案
/**
* Read a line terminated by one of CR, LF, or CRLF.
* 當maxLineLength=0時,也就是partition不為0時,定位開始位置的時候,該方法會讀取到
*/
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it's LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
//bytesConsumed:總計讀取的資料長度(包括換行符)
++bytesConsumed; //account for CR from previous read
}
/**
* 實際讀取HDFS檔案的方法
* buffer:緩衝區
* bufferLength : 這一次讀到的資料長度
*/
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
//對讀到的buffer陣列資料進行遍歷,找找第一個換行符
// bufferPosn: 讀到換行符時的位置(索引),同一個分割槽中這個值是會儲存的
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
//除錯時prevCharCR = false, 當找到換行符\n時,newlineLength=1
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
//在linux平臺測試資料中沒看到等於\r的,也就是除錯prevCharCR一直等於false
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;//這一次讀取的資料長度(包括換行符)
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
//總計讀取的資料長度(包括換行符)
bytesConsumed += readLength;
//這一次讀取的資料長度(不包括換行符)
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
//如果讀到的資料長度,大於最大長度限制,做個控制
//如果maxLineLength=0, txtLength =0 時,此時是不需要讀資料的,就給appendLength賦值為0
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
//如果計算appendLength >0 時,把值賦值給str,也就是我們讀到的值
str.append(buffer, startPosn, appendLength);
//txtLength變數累加每次實際讀到的長度(不包括換行符)
txtLength += appendLength;
}
//迴圈條件,是沒有讀到換行符,並且
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}
- UncompressedSplitLineReader.fillBuffer()方法
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
int maxBytesToRead = buffer.length; //緩衝的大小,預設為64KB
//splitLength 當前partition的預分割槽大小(長度)
// totalBytesRead 當前partitition總共讀取了的資料長度
if (totalBytesRead < splitLength) {
//說明當前partition預分割槽長度還沒有讀完,還需要繼續讀取剩下的長度
long leftBytesForSplit = splitLength - totalBytesRead;
// check if leftBytesForSplit exceed Integer.MAX_VALUE
if (leftBytesForSplit <= Integer.MAX_VALUE) {
//做個比較,當前分割槽剩餘的長度小於等於Integer.MAX_VALUE),取64KB預設長度和實際長度的一個小的值
maxBytesToRead = Math.min(maxBytesToRead, (int)leftBytesForSplit);
}
}
//實際讀取的資料長度
int bytesRead = in.read(buffer, 0, maxBytesToRead);
// If the split ended in the middle of a record delimiter then we need
// to read one additional record, as the consumer of the next split will
// not recognize the partial delimiter as a record.
// However if using the default delimiter and the next character is a
// linefeed then next split will treat it as a delimiter all by itself
// and the additional record read should not be performed.
if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) {
if (usingCRLF) {
needAdditionalRecord = (buffer[0] != '\n');
} else {
needAdditionalRecord = true;
}
}
if (bytesRead > 0) {
//讀到了資料,當前partitition讀到的總資料長度做個累加
totalBytesRead += bytesRead;
}
return bytesRead;
}