1. 程式人生 > >presto原始碼分析(PartitionedOutputOperator)

presto原始碼分析(PartitionedOutputOperator)

1 addInput方法

        requireNonNull(page, "page is null");
        checkState(isBlocked().isDone(), "output is already blocked");

        if (page.getPositionCount() == 0) {
            return;
        }

        page = pagePreprocessor.apply(page);//
        blocked = partitionFunction.partitionPage
(page);//對page中的每行資料進行分割槽處理 operatorContext.recordGeneratedOutput(page.getSizeInBytes(), page.getPositionCount());

partitionPage方法

        public ListenableFuture<?> partitionPage(Page page)
        {
            requireNonNull(page, "page is null");

            Page partitionFunctionArgs = getPartitionFunctionArguments(page);//把page中的blockbuilder按照一定的順序進行調整,即把hash值所在的block放在首位
for (int position = 0; position < page.getPositionCount(); position++) { if (nullChannel.isPresent() && page.getBlock(nullChannel.getAsInt()).isNull(position)) {//如果沒有hash channel,即group by的欄位是一個bigint型別 for (PageBuilder pageBuilder : pageBuilders) { pageBuilder.declarePosition(); for
(int channel = 0; channel < sourceTypes.size(); channel++) { Type type = sourceTypes.get(channel); type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel)); } } } else { int partition = partitionFunction.getPartition(partitionFunctionArgs, position);//計算該行所在的partition PageBuilder pageBuilder = pageBuilders.get(partition); pageBuilder.declarePosition(); for (int channel = 0; channel < sourceTypes.size(); channel++) { Type type = sourceTypes.get(channel); type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel)); } } } return flush(false); }

flush方法

        public ListenableFuture<?> flush(boolean force)
        {
            // add all full pages to output buffer
            List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
            for (int partition = 0; partition < pageBuilders.size(); partition++) {
                PageBuilder partitionPageBuilder = pageBuilders.get(partition);
                if (!partitionPageBuilder.isEmpty() && (force || partitionPageBuilder.isFull())) {
                    Page pagePartition = partitionPageBuilder.build();
                    partitionPageBuilder.reset();

                    blockedFutures.add(outputBuffer.enqueue(partition, pagePartition));
                    pagesAdded.incrementAndGet();
                    rowsAdded.addAndGet(pagePartition.getPositionCount());
                }
            }
            ListenableFuture<?> future = Futures.allAsList(blockedFutures);
            if (future.isDone()) {
                return NOT_BLOCKED;
            }
            return future;
        }