/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker;
import com.facebook.presto.cost.PlanNodeStatsEstimate;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.QueryStateTimer;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spark.ErrorClassifier;
import com.facebook.presto.spark.PrestoSparkMetadataStorage;
import com.facebook.presto.spark.PrestoSparkQueryData;
import com.facebook.presto.spark.PrestoSparkQueryStatusInfo;
import com.facebook.presto.spark.PrestoSparkServiceWaitTimeMetrics;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.RddAndMore;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskExecutorFactoryProvider;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.AbstractPrestoSparkQueryExecution;
import com.facebook.presto.spark.execution.FragmentExecutionResult;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.execution.RuntimeStatistics;
import com.facebook.presto.spark.node.PrestoSparkNodePartitioningManager;
import com.facebook.presto.spark.planner.IterativePlanFragmenter;
import com.facebook.presto.spark.planner.PrestoSparkPlanFragmenter;
import com.facebook.presto.spark.planner.PrestoSparkQueryPlanner;
import com.facebook.presto.spark.planner.PrestoSparkRddFactory;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.PlanFragmenterUtils;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.units.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.spark.MapOutputStatistics;
import org.apache.spark.SimpleFutureAction;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.ThreadUtils;
import scala.Function1;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.impl.ExecutionContextImpl;
import scala.runtime.AbstractFunction1;
import scala.util.Try;

public class PrestoSparkAdaptiveQueryExecution
extends AbstractPrestoSparkQueryExecution {
    private static final Logger log = Logger.get(PrestoSparkAdaptiveQueryExecution.class);
    private final IterativePlanFragmenter iterativePlanFragmenter;
    private final Set<PlanFragmentId> executedFragments = ConcurrentHashMap.newKeySet();
    private final BlockingQueue<FragmentCompletionEvent> fragmentEventQueue = new LinkedBlockingQueue<FragmentCompletionEvent>();

    public PrestoSparkAdaptiveQueryExecution(JavaSparkContext sparkContext, Session session, QueryMonitor queryMonitor, CollectionAccumulator<SerializedTaskInfo> taskInfoCollector, CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector, PrestoSparkTaskExecutorFactory taskExecutorFactory, PrestoSparkTaskExecutorFactoryProvider taskExecutorFactoryProvider, QueryStateTimer queryStateTimer, WarningCollector warningCollector, String query, PrestoSparkQueryPlanner.PlanAndMore planAndMore, Optional<String> sparkQueueName, Codec<TaskInfo> taskInfoCodec, JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec, JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec, JsonCodec<PrestoSparkQueryData> queryDataJsonCodec, PrestoSparkRddFactory rddFactory, TransactionManager transactionManager, PagesSerde pagesSerde, PrestoSparkExecutionExceptionFactory executionExceptionFactory, Duration queryTimeout, long queryCompletionDeadline, PrestoSparkMetadataStorage metadataStorage, Optional<String> queryStatusInfoOutputLocation, Optional<String> queryDataOutputLocation, TempStorage tempStorage, NodeMemoryConfig nodeMemoryConfig, FeaturesConfig featuresConfig, QueryManagerConfig queryManagerConfig, Set<PrestoSparkServiceWaitTimeMetrics> waitTimeMetrics, Optional<ErrorClassifier> errorClassifier, PrestoSparkPlanFragmenter planFragmenter, Metadata metadata, PartitioningProviderManager partitioningProviderManager, HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker, Optional<CollectionAccumulator<Map<String, Long>>> bootstrapMetricsCollector) {
        super(sparkContext, session, queryMonitor, taskInfoCollector, shuffleStatsCollector, taskExecutorFactory, taskExecutorFactoryProvider, queryStateTimer, warningCollector, query, planAndMore, sparkQueueName, taskInfoCodec, sparkTaskDescriptorJsonCodec, queryStatusInfoJsonCodec, queryDataJsonCodec, rddFactory, transactionManager, pagesSerde, executionExceptionFactory, queryTimeout, queryCompletionDeadline, metadataStorage, queryStatusInfoOutputLocation, queryDataOutputLocation, tempStorage, nodeMemoryConfig, featuresConfig, queryManagerConfig, waitTimeMetrics, errorClassifier, planFragmenter, metadata, partitioningProviderManager, historyBasedPlanStatisticsTracker, bootstrapMetricsCollector);
        this.iterativePlanFragmenter = this.createIterativePlanFragmenter();
    }

    private IterativePlanFragmenter createIterativePlanFragmenter() {
        boolean forceSingleNode = false;
        Function<PlanFragmentId, Boolean> isFragmentFinished = this.executedFragments::contains;
        return new IterativePlanFragmenter(this.planAndMore.getPlan(), isFragmentFinished, this.metadata, new PlanChecker(this.featuresConfig, forceSingleNode), new SqlParser(), new PlanNodeIdAllocator(), new PrestoSparkNodePartitioningManager(this.partitioningProviderManager), this.queryManagerConfig, this.session, this.warningCollector, forceSingleNode);
    }

    @Override
    protected List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> doExecute() throws SparkException, TimeoutException {
        this.queryStateTimer.beginRunning();
        log.info("Using AdaptiveQueryExecutor");
        log.info(String.format("Logical plan : %s", PlanPrinter.textLogicalPlan((PlanNode)this.planAndMore.getPlan().getRoot(), (TypeProvider)this.planAndMore.getPlan().getTypes(), (StatsAndCosts)this.planAndMore.getPlan().getStatsAndCosts(), (FunctionAndTypeManager)this.metadata.getFunctionAndTypeManager(), (Session)this.session, (int)0)));
        IterativePlanFragmenter.PlanAndFragments planAndFragments = this.iterativePlanFragmenter.createReadySubPlans(this.planAndMore.getPlan().getRoot());
        ExecutionContextExecutorService executorService = !planAndFragments.hasRemainingPlan() ? null : (ExecutionContextExecutorService)ExecutionContextImpl.fromExecutorService((ExecutorService)ThreadUtils.newDaemonCachedThreadPool((String)"AdaptiveExecution", (int)16, (int)60), null);
        TableWriteInfo tableWriteInfo = this.getTableWriteInfo(this.session, this.planAndMore.getPlan().getRoot());
        while (planAndFragments.hasRemainingPlan()) {
            FragmentCompletionEvent fragmentEvent;
            List<SubPlan> readyFragments = planAndFragments.getReadyFragments();
            Set<PlanFragmentId> rootChildren = PrestoSparkAdaptiveQueryExecution.getRootChildNodeFragmentIDs(planAndFragments.getRemainingPlan().get());
            for (SubPlan fragment : readyFragments) {
                SubPlan currentFragment;
                FragmentExecutionResult fragmentExecutionResult;
                Optional<SimpleFutureAction<MapOutputStatistics>> fragmentFuture;
                log.info(String.format("Executing fragment : %s", PlanPrinter.textPlanFragment((PlanFragment)fragment.getFragment(), (FunctionAndTypeManager)this.metadata.getFunctionAndTypeManager(), (Session)this.session, (boolean)true)));
                Optional<Class<?>> outputType = Optional.empty();
                if (this.isCoordinatorOnly(this.planAndMore.getPlan()) && rootChildren.contains(fragment.getFragment().getId())) {
                    outputType = Optional.of(PrestoSparkSerializedPage.class);
                }
                if ((fragmentFuture = (fragmentExecutionResult = this.executeFragment(currentFragment = this.configureOutputPartitioning(this.session, fragment, this.planAndMore.getPhysicalResourceSettings().getHashPartitionCount()), tableWriteInfo, outputType)).getMapOutputStatisticsFutureAction()).isPresent()) {
                    SimpleFutureAction<MapOutputStatistics> mapOutputStatsFuture = fragmentFuture.get();
                    mapOutputStatsFuture.onComplete((Function1)new AbstractFunction1<Try<MapOutputStatistics>, Void>(){

                        public Void apply(Try<MapOutputStatistics> result) {
                            if (result.isSuccess()) {
                                Optional<Object> mapOutputStats = Optional.ofNullable(result.get());
                                PrestoSparkAdaptiveQueryExecution.this.publishFragmentCompletionEvent(new FragmentCompletionSuccessEvent(currentFragment.getFragment().getId(), mapOutputStats));
                            } else {
                                Throwable throwable = (Throwable)result.failed().get();
                                PrestoSparkAdaptiveQueryExecution.this.publishFragmentCompletionEvent(new FragmentCompletionFailureEvent(currentFragment.getFragment().getId(), throwable));
                            }
                            return null;
                        }
                    }, (ExecutionContext)executorService);
                    continue;
                }
                log.info("Fragment %s will not get executed now either because there was no exchange involved (a broadcast is present) or because of an unknown issue.", new Object[]{fragment.getFragment().getId()});
                this.publishFragmentCompletionEvent(new FragmentCompletionSuccessEvent(currentFragment.getFragment().getId(), Optional.empty()));
            }
            try {
                fragmentEvent = this.fragmentEventQueue.poll(PrestoSparkUtils.computeNextTimeout(this.queryCompletionDeadline), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (fragmentEvent == null) {
                throw this.executionExceptionFactory.toPrestoSparkExecutionException(new RuntimeException("Adaptive query execution failed due to timeout."));
            }
            if (fragmentEvent instanceof FragmentCompletionFailureEvent) {
                FragmentCompletionFailureEvent failureEvent = (FragmentCompletionFailureEvent)fragmentEvent;
                Throwables.propagateIfPossible((Throwable)failureEvent.getExecutionError(), SparkException.class);
                Throwables.propagateIfPossible((Throwable)failureEvent.getExecutionError(), RuntimeException.class);
                throw new UncheckedExecutionException(failureEvent.getExecutionError());
            }
            Verify.verify((boolean)(fragmentEvent instanceof FragmentCompletionSuccessEvent), (String)String.format("Unexpected FragmentCompletionEvent type: %s", fragmentEvent.getClass().getSimpleName()), (Object[])new Object[0]);
            FragmentCompletionSuccessEvent successEvent = (FragmentCompletionSuccessEvent)fragmentEvent;
            this.executedFragments.add(successEvent.getFragmentId());
            Optional<PlanNodeStatsEstimate> runtimeStats = RuntimeStatistics.createRuntimeStats(successEvent.getMapOutputStats());
            planAndFragments = this.iterativePlanFragmenter.createReadySubPlans(planAndFragments.getRemainingPlan().get());
        }
        Verify.verify((planAndFragments.getReadyFragments().size() == 1 ? 1 : 0) != 0, (String)"The last step of the adaptive execution is expected to have a single fragment remaining.", (Object[])new Object[0]);
        SubPlan finalFragment = planAndFragments.getReadyFragments().get(0);
        this.setFinalFragmentedPlan(finalFragment);
        return this.executeFinalFragment(this.session, finalFragment, tableWriteInfo);
    }

    private static Set<PlanFragmentId> getRootChildNodeFragmentIDs(PlanNode rootPlanNode) {
        return PlanNodeSearcher.searchFrom((PlanNode)rootPlanNode).recurseOnlyWhen(node -> !(node instanceof ExchangeNode) || ((ExchangeNode)node).getScope() != ExchangeNode.Scope.REMOTE_STREAMING).where(node1 -> node1 instanceof RemoteSourceNode).findAll().stream().map(n -> ((RemoteSourceNode)n).getSourceFragmentIds()).flatMap(l -> l.stream()).collect(Collectors.toSet());
    }

    private boolean isCoordinatorOnly(Plan plan) {
        if (!(plan.getRoot() instanceof OutputNode)) {
            return false;
        }
        PlanNode outputSourceNode = ((OutputNode)plan.getRoot()).getSource();
        return PlanFragmenterUtils.isCoordinatorOnlyDistribution((PlanNode)outputSourceNode);
    }

    private void publishFragmentCompletionEvent(FragmentCompletionEvent fragmentCompletionEvent) {
        try {
            this.fragmentEventQueue.put(fragmentCompletionEvent);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> executeFinalFragment(Session session, SubPlan finalFragment, TableWriteInfo tableWriteInfo) throws SparkException, TimeoutException {
        if (finalFragment.getFragment().getPartitioning().equals((Object)SystemPartitioningHandle.COORDINATOR_DISTRIBUTION)) {
            HashMap<PlanFragmentId, RddAndMore<PrestoSparkSerializedPage>> inputRdds = new HashMap<PlanFragmentId, RddAndMore<PrestoSparkSerializedPage>>();
            for (SubPlan child : finalFragment.getChildren()) {
                inputRdds.put(child.getFragment().getId(), this.getRdd(child.getFragment().getId()).get());
            }
            return this.collectPages(tableWriteInfo, finalFragment.getFragment(), inputRdds);
        }
        RddAndMore rddAndMore = this.createRddForSubPlan(finalFragment, tableWriteInfo, Optional.of(PrestoSparkSerializedPage.class));
        return rddAndMore.collectAndDestroyDependenciesWithTimeout(PrestoSparkUtils.computeNextTimeout(this.queryCompletionDeadline), TimeUnit.MILLISECONDS, this.waitTimeMetrics);
    }

    private class FragmentCompletionFailureEvent
    extends FragmentCompletionEvent {
        private Throwable executionError;

        private FragmentCompletionFailureEvent(PlanFragmentId fragmentId, Throwable executionError) {
            super(fragmentId);
            this.executionError = executionError;
        }

        public Throwable getExecutionError() {
            return this.executionError;
        }
    }

    private class FragmentCompletionSuccessEvent
    extends FragmentCompletionEvent {
        private Optional<MapOutputStatistics> mapOutputStats;

        private FragmentCompletionSuccessEvent(PlanFragmentId fragmentId, Optional<MapOutputStatistics> mapOutputStats) {
            super(fragmentId);
            this.mapOutputStats = mapOutputStats;
        }

        public Optional<MapOutputStatistics> getMapOutputStats() {
            return this.mapOutputStats;
        }
    }

    private class FragmentCompletionEvent {
        protected final PlanFragmentId fragmentId;

        private FragmentCompletionEvent(PlanFragmentId fragmentId) {
            this.fragmentId = fragmentId;
        }

        public PlanFragmentId getFragmentId() {
            return this.fragmentId;
        }
    }
}

