MapReduce原始碼分析之MapTask分析(二)
SpillThread分析
為什麼需要Spill
記憶體大小總是有效,因此在Mapper在處理過程中,資料持續輸出到記憶體中時,必然需要有機制能將記憶體中的資料換出,合理的刷出到磁碟上。SpillThread就是用來完成這部分工作。
SpillThread的執行緒處理函式只是做一層封裝,當索引表中的kvstart和kvend指向一樣的索引位置時,會持續處於等待過程,等待外部通知需要觸發spill動作,當有spill請求時,會觸發StartSpill來喚醒SpillThread執行緒,進入到sortAndSpill。
下面就是SpillThread執行緒體函式。
protected class SpillThread extends Thread { @Override public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); while (kvstart == kvend) { // 等待被喚醒 spillReady.await(); } try { spillLock.unlock(); // spill處理 sortAndSpill(); } catch (...) { ... } finally { spillLock.lock(); // 重置索引區,更新buf緩衝區的尾部位置資訊 if (bufend < bufindex && bufindex < bufstart) { bufvoid = kvbuffer.length; } kvstart = kvend; bufstart = bufend; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { spillLock.unlock(); spillThreadRunning = false; } } }
執行緒函式內的處理邏輯比較簡單,主要分為三個步驟:
1.等待喚醒
2.對記憶體中的資料進行排序並將資料溢位寫入到磁碟,這部分內部分析見下文。
3.重置索引區和快取區的end標記
sortAndSpill
記憶體資料的溢位處理是有此函式進行封裝,下面我們將該函式按塊進行詳細分析。
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // part1 // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); // part2 final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { // part3 long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); // part4 if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; } } else { // part5 int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // part6 // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } // part7 if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }
part1:建立SpillRecord,建立檔案流
SpillRecord是一個記錄集,用於記錄分割槽在資料檔案中的檔案起始位置,原始長度,壓縮後的長度資訊。
SpillRecord的成員只有兩個。一個是buf,長度為分割槽個數*每條分割槽索引資訊佔用的長度,另一個是為記錄方便轉換成的LogBuffer。
每條分割槽索引資訊佔用的長度由MAP_OUTPUT_INDEX_RECORD_LENGTH來表示,佔用24個位元組,即3個Long。
public SpillRecord(int numPartitions) {
buf = ByteBuffer.allocate(
numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
entries = buf.asLongBuffer();
}
建立檔案流,檔案的路徑是根據numspill來產生的,第一個溢位的檔案就是spill0.out,以此類推後續的溢位的檔案就是spill0.out,spill1.out ...
在每次滿足溢位的時候,都會產生一個溢位的檔案,這些溢位的檔案最後會在處理完Mapper在最後的flush階段觸發merge動作,將所有溢位的檔案進行合併為一個檔案。
part2:資料排序
獲取溢位的處理的索引區間的尾部位置,這個索引區間是有kvstart,kvend所標識出來,kvstart記錄了索引區開始使用的起始位置,kvend記錄了索引區使用的結束位置。這一段索引區所指向的資料緩衝區就是需要被處理刷入到檔案的。在上文,我們提到了因為是迴圈緩衝區,索引在沒有到緩衝區尾部時是kvstart<kvend,當kvend走到尾迴圈回來,kvstart>kvend。
在排序時,為處理簡單,指定出一個統一的區間,使用endpostion表示出尾部位置。當kvend在前,endposition為kvoffsets的長度+kvend。
MapReduce的核心是對資料排序,在MapTask需要對每次溢位的資料按分割槽進行排序,保證分割槽內的資料是有序的,分割槽從小到大遞增。排序的工作是由sorter完成,排序在記憶體中排列完成。
sorter是一個IndexedSorter型別,在MapOutputBuffer初始化時從conf中獲取map.sort.class所指定的sort類,預設是使用QuickSort。我們擷取部分排序函式的部分程式碼,來分析排序過程。
private static void sortInternal(final IndexedSortable s, int p, int r,
final Progressable rep, int depth) {
if (null != rep) {
rep.progress();
}
while (true) {
if (r-p < 13) {
//p為其實位置,r為結束位置,s為MapOutputBuffer
for (int i = p; i < r; ++i) {
for (int j = i; j > p && s.compare(j-1, j) > 0; --j) {
s.swap(j, j-1);
}
}
return;
}
....
}
sort的關鍵兩步就是key之間比較,和交換。compare使用的和swap呼叫的都是MapOutputBuffer中的兩個函式,先看compare函式,comapre傳入的是兩個kvoffsets索引區的兩個index,因為endposition有可能是大於kevoffsets的長度,因此在取真實index的時候,需要對kvoffsets的長度進行取餘。比較會先取出kvoffsets中的值,再通過該值定位到k,v在二級索引區kvindices中記錄的k,v所屬的分割槽,在kvbuffer的位置,長度。排序優先順序為,低分割槽->高分割槽,分割槽一樣則根據key排序。
當符合條件,使用swap函式,交換kvoffsets中記錄kvindices的索引值,因此排序的開銷很小,不需要每次移動key,僅通過kvoffsets就完成比較排序。
public int compare(int i, int j) {
final int ii = kvoffsets[i % kvoffsets.length];
final int ij = kvoffsets[j % kvoffsets.length];
// sort by partition
if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
}
// sort by key
return comparator.compare(kvbuffer,
kvindices[ii + KEYSTART],
kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
kvbuffer,
kvindices[ij + KEYSTART],
kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
}
public void swap(int i, int j) {
i %= kvoffsets.length;
j %= kvoffsets.length;
int tmp = kvoffsets[i];
kvoffsets[i] = kvoffsets[j];
kvoffsets[j] = tmp;
}
上圖就是排序前後的變化過程。排序前kvbuffer中有key為字串,value為int值。
第一個item的key為"ba",第二個item的key為"aa",所屬分割槽都為分割槽1,按照字典序排序"aa","ba"。排序後,二級索引kvindices和kvbuffer都沒有變動,只是在一級索引kvoffsets中交換指向,在kvoffsets[0]=1指向"aa",kvoffsets[1]=0指向"ba"。
part3: IFile資料格式
IFile是一種儲存格式,用於表示MapTask在處理資料溢位到磁碟檔案,資料在磁碟檔案中以什麼形式組織。儲存形式為如下
KeyLength |
valueLength |
key |
Value |
KeyLength |
valueLength |
key |
Value |
EOF_MARKER |
EOF_MARKER |
4位元組CRC |
每個key,value輸出到檔案中,都會以上述keylength,valuelength,key,value的形式逐個排列,在close時,會輸出兩個標記,非別是key,value長度的標記,標記為-1表示key,value輸出結束,在尾部會後一個針對整個檔案的crc校驗碼。
IFile類內有兩個子類,分別是Reader,Writer用於讀取和寫入IFile檔案。
Writer的內部成員:
//用於io操作的輸出流,基於checksum的流產生
FSDataOutputStream out;
//記錄原始的輸出流,也就是第一部分中產生的檔案流
FSDataOutputStream rawOut;
//基於檔案流產生的checksum輸出流,特點是write時內部會做crc
IFileOutputStream checksumOut;
//key,value的序列化,和"核心成員變數中的key,value序列化類一樣的功能"
Class<K> keyClass;
Class<V> valueClass;
Serializer<K> keySerializer;
Serializer<V> valueSerializer;
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec, Counters.Counter writesCounter)
throws IOException {
//根據檔案流封了一層可以在輸出時做crc
this.checksumOut = new IFileOutputStream(out);
this.rawOut = out;
this.start = this.rawOut.getPos();
if (codec != null) {
...
} else {
//writer內部用於io輸出的流是基於checksumOut產生的。
this.out = new FSDataOutputStream(checksumOut,null);
}
// key,value序列化類,是輸出key,value到buffer中,真正寫的時候從buffer中取出
this.keyClass = keyClass;
this.valueClass = valueClass;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.keySerializer.open(buffer);
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
}
part4:無Combiner處理
當用戶沒有指定commbiner,就不需要做combiner處理,可以通過IFile.Writer直接將已排序好的資料逐個按分割槽輸出到磁碟檔案。區分是否是同一個分割槽的資料,是根據當前spindex所指向的一級索引kvoffsets所標識的資料是否屬於當前分割槽號,如果是同一個分割槽,就使用writer進行輸出,否則切換到處理下一個分割槽。
這裡有一點需要注意的是,二級索引kvindices中每一項(分割槽號,keyOffset,valOffset)標識一對key,value,key的長度可以根據valOffset-keyOffset獲取到key的長度,而value的長度需要通過先取得kvindices中的下一項,通過下一個項中的key的偏移-當前的val的偏移獲取到val的長度。這部分的程式碼會封裝在getVBytesForOffset
writer的輸出比較簡單,輸出key,value之前,先輸出key長度,value長度。
public void append(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
if (keyLength < 0) {
throw new IOException("Negative key-length not allowed: " + keyLength +
" for " + key);
}
int valueLength = value.getLength() - value.getPosition();
if (valueLength < 0) {
throw new IOException("Negative value-length not allowed: " +
valueLength + " for " + value);
}
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(key.getData(), key.getPosition(), keyLength);
out.write(value.getData(), value.getPosition(), valueLength);
// Update bytes written
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
++numRecordsWritten;
}
part5: Combiner處理
如果使用者指定過Combiner,那麼處理和無Combiner有一點小差別。需要在輸出的時候,針對同一分割槽內的資料做一次過濾。同一分割槽的資料區間通過spstart,spindex標識出來。
combineCollector.setWriter(writer);這裡將IFile.Writer設定進去,在combiner處理中呼叫collect將會呼叫到CombineOutputCollector.collect,這一步就是和無Combiner一樣將資料輸出到IFile.Writer中。
public synchronized void collect(K key, V value)
throws IOException {
outCounter.increment(1);
writer.append(key, value);
if ((outCounter.getValue() % progressBar) == 0) {
progressable.progress();
}
}
combinerRunner.combine(kvIter, combineCollector);是如何執行的呢,這裡會因為combinerRunner的不同而不同,我們關注的是舊的MR處理,因此我們跟蹤到OldCombinerRunner.combiner,可以看到流程實際上非常簡單,整個迭代的過程是判斷是否還有資料沒有被處理掉,有則一直迴圈,依次呼叫reduce函式,每處理一次相同的key的資料後,通過nextKey切換到下一個不同的key再次重複。在使用者的reduce函式內,因為collector是CombineOutputCollector,因此使用者在collector.collect輸出key,value,實際上是輸出到IFile.Writer流中。
protected void combine(RawKeyValueIterator kvIter,
OutputCollector<K,V> combineCollector
) throws IOException {
//combiner是一個Reduer
Reducer<K,V,K,V> combiner =
ReflectionUtils.newInstance(combinerClass, job);
try {
//取得value的迭代器
CombineValuesIterator<K,V> values =
new CombineValuesIterator<K,V>(kvIter, comparator, keyClass,
valueClass, job, Reporter.NULL,
inputCounter);
//判斷依據是spstart是否走到spindex
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
//跳過相同key直到讀取到下一個不相同的key
values.nextKey();
}
} finally {
combiner.close();
}
}
Reducer的處理函式reduce大家都知道入參是key和一串value,這裡的一串value通過Iterator來表示。void reduce(K2 key,Iterator<V2> values, OutputCollector<K3, V3> output, Reporterreporter) throws IOException;
那這裡的Iterator<V2>values是如何從kvoffsets中將已經排序過的,相鄰的相同的key的value放在一起的呢,這部分功能是有CombineValuesIterator的父類ValuesIterator來實現的,ValuesIterator的基類是Iterator,Iterator的介面hasNext和next都有實現。
ValuesIterator有一個一直被呼叫到的方法,是readNextKey用來獲取下一個key並判斷是否後續還有資料(more標識)以及是否還有相同的key(hasNext標識)。
private void readNextKey() throws IOException {
//根據spstart是否到達spindex指向的區間尾部
more = in.next();
if (more) {
DataInputBuffer nextKeyBytes = in.getKey();
keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
nextKeyBytes.getLength() - nextKeyBytes.getPosition());
//將keyIn中的key反序列化到nextKey變數中
nextKey = keyDeserializer.deserialize(nextKey);
//判斷是否還有相同的key存在
hasNext = key != null && (comparator.compare(key, nextKey) == 0);
} else {
hasNext = false;
}
}
public boolean hasNext() { return hasNext; }
private int ctr = 0;
public VALUE next() {
if (!hasNext) {
throw new NoSuchElementException("iterate past last value");
}
try {
//返回相同的key的value,讀取下一個key,如果key相同,
//下一次仍會進入到next函式中,讀取到相同key的value
readNextValue();
readNextKey();
} catch (IOException ie) {
throw new RuntimeException("problem advancing post rec#"+ctr, ie);
}
reporter.progress();
return value;
}
//在每呼叫一次reduce處理完相同key所對應的一串value,
//會通過nextKey函式取得下一個不同的key,重新進入到reduce函式。
/** Start processing next unique key. */
void nextKey() throws IOException {
//讀取到下一個不同的key,實際上combiner的處理,是不會進入到while迴圈內
while (hasNext) {
readNextKey();
}
++ctr;
// move the next key to the current one
KEY tmpKey = key;
key = nextKey;
nextKey = tmpKey;
hasNext = more;
}
part6:關閉流,記錄索引資訊
在輸出輸出完成後,會呼叫IFile.Writer的close函式,插入兩個EOF_MARKER,並寫入checksum.
public void close() throws IOException {
// Close the serializers
keySerializer.close();
valueSerializer.close();
// Write EOF_MARKER for key/value length
WritableUtils.writeVInt(out, EOF_MARKER);
WritableUtils.writeVInt(out, EOF_MARKER);
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
//Flush the stream
out.flush();
if (compressOutput) {
// Flush
compressedOut.finish();
compressedOut.resetState();
}
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
}
else {
//寫入checksum
checksumOut.finish();
}
compressedBytesWritten = rawOut.getPos() - start;
if (compressOutput) {
// Return back the compressor
CodecPool.returnCompressor(compressor);
compressor = null;
}
out = null;
if(writtenRecordsCounter != null) {
writtenRecordsCounter.increment(numRecordsWritten);
}
}
checkSum的寫入可以看到IFileOutputStream的finish函式,會從DataChecksum取出4個位元組的checksum值寫入到檔案尾部。
public void finish() throws IOException {
if (finished) {
return;
}
finished = true;
sum.writeValue(barray, 0, false);
out.write (barray, 0, sum.getChecksumSize());
out.flush();
}
關閉掉輸出流後,會將當前分割槽在磁碟檔案中的起始位置,結束位置資訊記錄到索引資訊中。索引資訊在記憶體中是存放在SpillRecord中。
part7: IFile的索引檔案
每個IFile資料檔案就有一個對應的索引檔案和它一一對應,這個索引檔案有可能在記憶體,也有可能在磁碟上真實存在的索引檔案。IFile檔案對應的的索引資訊會在滿足條件的情況下記憶體中快取著,一個IFile對應的索引資訊封裝在SpillRecord中,這個索引資訊SpillRecord儲存在indexCacheList中,當索引的cache超過1M大小後,那麼會將後來產生的索引資訊輸出到磁碟上形成一個索引檔案。這個索引檔案的檔名為"spill"+ spillNumber +".out.index",spillNumber就是:numSpills變數所記錄的當前進行到第幾次spill。
以每個檔案使用者設定了兩個ReduceTask那麼paritition個數為2,那麼IFile的索引檔案在磁碟中的形式為:
索引對應的資料 |
索引檔案儲存內容 |
||
Spill0的partition0 |
startOffset |
rawLength |
partLength |
Spill0的partition1 |
startOffset |
rawLength |
partLength |
Spill1的partition0 |
startOffset |
rawLength |
partLength |
Spill1的partition1 |
startOffset |
rawLength |
partLength |
8位元組的crc |
Merge
sortAndSpill已經將記憶體中的資料寫成一個個IFile資料檔案,這些檔案最終會被合併為一個數據檔案以及該資料檔案對應的索引檔案。Merge這部分將會分析資料檔案是如何被merge成單個檔案。
先回到runOldMapper中,在前面我們介紹過這部分程式碼了,再次重新看看這部分。collector.flush將會觸發將MapOutputBuffer中的剩餘資料flush到磁碟上,並最終將已經存在磁碟上的資料檔案合併為一個檔案。
runOldMapper:
runner.run(in, new OldOutputCollector(collector, conf), reporter);
collector.flush();
// MapOutputBuffer.flush
public synchronized void flush() throws IOException, ClassNotFoundException,
InterruptedException {
LOG.info("Starting flush of map output");
spillLock.lock();
try {
//等待正在進行中的spill動作完成
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
if (sortSpillException != null) {
throw (IOException)new IOException("Spill failed"
).initCause(sortSpillException);
}
//快取中剩餘的資料,需要觸發一次spill動作將剩餘資料spill到磁碟上
if (kvend != kvindex) {
kvend = kvindex;
bufend = bufmark;
sortAndSpill();
}
} catch (InterruptedException e) {
throw (IOException)new IOException(
"Buffer interrupted while waiting for the writer"
).initCause(e);
} finally {
spillLock.unlock();
}
assert !spillLock.isHeldByCurrentThread();
// shut down spill thread and wait for it to exit. Since the preceding
// ensures that it is finished with its work (and sortAndSpill did not
// throw), we elect to use an interrupt instead of setting a flag.
// Spilling simultaneously from this thread while the spill thread
// finishes its work might be both a useful way to extend this and also
// sufficient motivation for the latter approach.
try {
spillThread.interrupt();
spillThread.join();
} catch (InterruptedException e) {
throw (IOException)new IOException("Spill failed"
).initCause(e);
}
// release sort buffer before the merge
kvbuffer = null;
//觸發將小的spill檔案合併為大的spill檔案。
mergeParts();
Path outputPath = mapOutputFile.getOutputFile();
fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}
merge的動作會有mergeParts函式觸發,先看該函式的實現:
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
final TaskAttemptID mapId = getTaskID();
//獲取磁碟上的所有spill檔案的檔名
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
//只有一個spill檔案,那麼只需要將資料檔案和索引檔案rename即可
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {
indexCacheList.get(0).writeToFile(
new Path(filename[0].getParent(),"file.out.index"), job);
}
return;
}
// read in paged indices
//載入因記憶體中快取不下而刷出到磁碟上的索引檔案到記憶體中
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job, null));
}
//make correction in the length to include the sequence file header
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
//獲取最終的spill檔案和index檔案的路徑名
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
//沒有觸發過spill動作,則形成一個空的spill檔案和index檔案。
if (numSpills == 0) {
//create dummy files
IndexRecord rec = new IndexRecord();
SpillRecord sr = new SpillRecord(partitions);
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
writer.close();
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, job);
} finally {
finalOut.close();
}
return;
}
{
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
//抽取spill檔案中屬於該paritition的索引形成一個segment
//所有屬於同一個分割槽的資訊效能一個segment列表
List<Segment<K,V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
for(int i = 0; i < numSpills; i++) {
IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment<K,V> s =
new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
indexRecord.partLength, codec, true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + indexRecord.startOffset + "," +
indexRecord.rawLength + ", " + indexRecord.partLength + ")");
}
}
//將屬於同一個分割槽的資料進行merge
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass, codec,
segmentList, job.getInt("io.sort.factor", 100),
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter,
null, spilledRecordsCounter);
//write merged output to disk
//將merge後的資料寫入到最終的spill檔案中
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector);
}
//close
writer.close();
// record offsets
//記錄當前分割槽在spill.out檔案中的索引資訊
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, parts);
}
//將索引資訊寫入到spill.index.out
spillRec.writeToFile(finalIndexFile, job);
finalOut.close();
//將每次觸發spill而產生的spill檔案刪除
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}
mergeParts的處理流程主要分為幾個步驟:
1.獲取到磁碟上屬於這個Task的所有spill檔名
2.整個MapTask執行過程中只是觸發過一次spill動作,那麼只需要做一下rename那mergeParts就算完成了。rename的過程是將spill0.out重新命名為spill.out,將索引檔案spill0.out.index重新命名為spill.out.index。如果spill0.out的索引檔案還在快取中,則只要將快取的索引寫入到spill.out.index。
3.前面觸發產生的spill檔案的索引會快取在cache中也就是在indexCacheList,因為cache大小有限,因此後面spill產生的索引資訊會落到索引檔案中,這裡需要載入因記憶體中快取不下而刷出到磁碟上的索引檔案。
4.獲取最終的spill檔案的路徑名:spill.out和索引檔案的路徑名:spill.out.index,並建立spill.out檔案的輸出流。
5.如果傳遞給這個MapTask的一個空的InputSplit,那麼就沒有後續的merge動作,只要在spill.out檔案中只是輸出兩個end標記和一個4個位元組的crc,在spill.out.index中記錄下索引資訊。
6.首先,先介紹下兩個變數型別,IndexRecord和Segment。
IndexRecord:是記錄一分割槽的索引資訊,一個spill檔案的索引資訊是由n個partition的索引IndexRecord組成一個Spill檔案的索引SpillRecord。
Segment:類似IndexRecord,但是還多一些資訊,表示這個分割槽的索引是對應的那個spill檔案。
1)在這部分處理中,標明瞭磁碟上必然有有多個spill檔案,需要將這些spill檔案屬於同一個partition的索引資訊封裝在segment列表中。
2)Merge.merge需要根據segment 列表將不同spill檔案中同一個parition的資料進行merge。
3)在merge完成後,如果沒有指定combiner那麼直接通過IFile.Writer將資料寫入到檔案中,如果有則呼叫使用者指定的Combiner,對同一個key的資料進行過濾,combiner的處理在前面已經分析過了,不再累贅。
4)在IndexRecord中記錄合併後屬於這個partition的索引資訊,將該索引資訊記錄到SpillRecord中。
5)重複1)到4)直至對所有partition處理完畢。
7.將spill.out的索引檔案寫入到spill.out.index中。
8.刪除spill檔案:spill0.out,...spilln.out,這裡有一點奇怪的是沒有刪除spill檔案對應的索引檔案。我看到在hadoop2.4.0中也沒有刪除,這個還不清楚是否故意而為之。
總結
至此,整個詳細的MapTask的分析就此為完成了,在分析過程中我們知道了MapTask是如何使用迴圈快取區管理資料,知道了資料在快取不下是如何做spill處理的,spill輸出的資料格式,combiner如何處理,如何將多一個檔案merge為一個等等。也希望通過閱讀這部分原始碼能學習到部分設計思路,能在未來的設計中提供多一種思路。