presto原始碼分析(PartitionedOutputOperator)
阿新 • • 發佈:2019-01-22
1 addInput方法
requireNonNull(page, "page is null");
checkState(isBlocked().isDone(), "output is already blocked");
if (page.getPositionCount() == 0) {
return;
}
page = pagePreprocessor.apply(page);//
blocked = partitionFunction.partitionPage (page);//對page中的每行資料進行分割槽處理
operatorContext.recordGeneratedOutput(page.getSizeInBytes(), page.getPositionCount());
partitionPage方法
public ListenableFuture<?> partitionPage(Page page)
{
requireNonNull(page, "page is null");
Page partitionFunctionArgs = getPartitionFunctionArguments(page);//把page中的blockbuilder按照一定的順序進行調整,即把hash值所在的block放在首位
for (int position = 0; position < page.getPositionCount(); position++) {
if (nullChannel.isPresent() && page.getBlock(nullChannel.getAsInt()).isNull(position)) {//如果沒有hash channel,即group by的欄位是一個bigint型別
for (PageBuilder pageBuilder : pageBuilders) {
pageBuilder.declarePosition();
for (int channel = 0; channel < sourceTypes.size(); channel++) {
Type type = sourceTypes.get(channel);
type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel));
}
}
}
else {
int partition = partitionFunction.getPartition(partitionFunctionArgs, position);//計算該行所在的partition
PageBuilder pageBuilder = pageBuilders.get(partition);
pageBuilder.declarePosition();
for (int channel = 0; channel < sourceTypes.size(); channel++) {
Type type = sourceTypes.get(channel);
type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel));
}
}
}
return flush(false);
}
flush方法
public ListenableFuture<?> flush(boolean force)
{
// add all full pages to output buffer
List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
for (int partition = 0; partition < pageBuilders.size(); partition++) {
PageBuilder partitionPageBuilder = pageBuilders.get(partition);
if (!partitionPageBuilder.isEmpty() && (force || partitionPageBuilder.isFull())) {
Page pagePartition = partitionPageBuilder.build();
partitionPageBuilder.reset();
blockedFutures.add(outputBuffer.enqueue(partition, pagePartition));
pagesAdded.incrementAndGet();
rowsAdded.addAndGet(pagePartition.getPositionCount());
}
}
ListenableFuture<?> future = Futures.allAsList(blockedFutures);
if (future.isDone()) {
return NOT_BLOCKED;
}
return future;
}