/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.SetMultimap;
import com.google.inject.Inject;
import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.ForQueryExecution;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.ArbitraryDistributionSplitAssigner;
import io.trino.execution.scheduler.EventDrivenTaskSource;
import io.trino.execution.scheduler.FaultTolerantPartitioningScheme;
import io.trino.execution.scheduler.HashDistributionSplitAssigner;
import io.trino.execution.scheduler.OutputDataSizeEstimate;
import io.trino.execution.scheduler.SingleDistributionSplitAssigner;
import io.trino.execution.scheduler.SplitAssigner;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.HostAddress;
import io.trino.spi.exchange.Exchange;
import io.trino.sql.planner.MergePartitioningHandle;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.SplitSourceFactory;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;

public class EventDrivenTaskSourceFactory {
    private final SplitSourceFactory splitSourceFactory;
    private final Executor executor;
    private final InternalNodeManager nodeManager;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final int splitBatchSize;

    @Inject
    public EventDrivenTaskSourceFactory(SplitSourceFactory splitSourceFactory, @ForQueryExecution ExecutorService executor, InternalNodeManager nodeManager, TableExecuteContextManager tableExecuteContextManager, QueryManagerConfig queryManagerConfig) {
        this(splitSourceFactory, (Executor)executor, nodeManager, tableExecuteContextManager, Objects.requireNonNull(queryManagerConfig, "queryManagerConfig is null").getScheduleSplitBatchSize());
    }

    public EventDrivenTaskSourceFactory(SplitSourceFactory splitSourceFactory, Executor executor, InternalNodeManager nodeManager, TableExecuteContextManager tableExecuteContextManager, int splitBatchSize) {
        this.splitSourceFactory = Objects.requireNonNull(splitSourceFactory, "splitSourceFactory is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.splitBatchSize = splitBatchSize;
    }

    public EventDrivenTaskSource create(Session session, Span stageSpan, PlanFragment fragment, Map<PlanFragmentId, Exchange> sourceExchanges, FaultTolerantPartitioningScheme sourcePartitioningScheme, LongConsumer getSplitTimeRecorder, Map<PlanNodeId, OutputDataSizeEstimate> outputDataSizeEstimates) {
        ImmutableSetMultimap.Builder remoteSources = ImmutableSetMultimap.builder();
        for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) {
            for (PlanFragmentId sourceFragment : remoteSource.getSourceFragmentIds()) {
                remoteSources.put((Object)remoteSource.getId(), (Object)sourceFragment);
            }
        }
        long standardSplitSizeInBytes = SystemSessionProperties.getFaultTolerantExecutionStandardSplitSize(session).toBytes();
        int maxTaskSplitCount = SystemSessionProperties.getFaultTolerantExecutionMaxTaskSplitCount(session);
        return new EventDrivenTaskSource(session.getQueryId(), this.tableExecuteContextManager, sourceExchanges, (SetMultimap<PlanNodeId, PlanFragmentId>)remoteSources.build(), () -> this.splitSourceFactory.createSplitSources(session, stageSpan, fragment), this.createSplitAssigner(session, fragment, outputDataSizeEstimates, sourcePartitioningScheme, standardSplitSizeInBytes, maxTaskSplitCount), this.executor, this.splitBatchSize, standardSplitSizeInBytes, sourcePartitioningScheme, getSplitTimeRecorder);
    }

    private SplitAssigner createSplitAssigner(Session session, PlanFragment fragment, Map<PlanNodeId, OutputDataSizeEstimate> outputDataSizeEstimates, FaultTolerantPartitioningScheme sourcePartitioningScheme, long standardSplitSizeInBytes, int maxArbitraryDistributionTaskSplitCount) {
        PartitioningHandle partitioning = fragment.getPartitioning();
        Set partitionedRemoteSources = (Set)fragment.getRemoteSourceNodes().stream().filter(node -> node.getExchangeType() != ExchangeNode.Type.REPLICATE).map(PlanNode::getId).collect(ImmutableSet.toImmutableSet());
        ImmutableSet partitionedSources = ImmutableSet.builder().addAll((Iterable)partitionedRemoteSources).addAll(fragment.getPartitionedSources()).build();
        Set replicatedSources = (Set)fragment.getRemoteSourceNodes().stream().filter(node -> node.getExchangeType() == ExchangeNode.Type.REPLICATE).map(PlanNode::getId).collect(ImmutableSet.toImmutableSet());
        boolean coordinatorOnly = partitioning.equals(SystemPartitioningHandle.COORDINATOR_DISTRIBUTION);
        if (partitioning.equals(SystemPartitioningHandle.SINGLE_DISTRIBUTION) || coordinatorOnly) {
            ImmutableSet hostRequirement = ImmutableSet.of();
            if (coordinatorOnly) {
                InternalNode currentNode = this.nodeManager.getCurrentNode();
                Verify.verify((boolean)currentNode.isCoordinator(), (String)"current node is expected to be a coordinator", (Object[])new Object[0]);
                hostRequirement = ImmutableSet.of((Object)currentNode.getHostAndPort());
            }
            return new SingleDistributionSplitAssigner((Set<HostAddress>)hostRequirement, (Set<PlanNodeId>)ImmutableSet.builder().addAll((Iterable)partitionedSources).addAll((Iterable)replicatedSources).build());
        }
        int arbitraryDistributionComputeTaskTargetSizeGrowthPeriod = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(session);
        double arbitraryDistributionComputeTaskTargetSizeGrowthFactor = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor(session);
        long arbitraryDistributionComputeTaskTargetSizeInBytesMin = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(session).toBytes();
        long arbitraryDistributionComputeTaskTargetSizeInBytesMax = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax(session).toBytes();
        Preconditions.checkArgument((arbitraryDistributionComputeTaskTargetSizeInBytesMax >= arbitraryDistributionComputeTaskTargetSizeInBytesMin ? 1 : 0) != 0, (String)"arbitraryDistributionComputeTaskTargetSizeInBytesMax %s should be no smaller than arbitraryDistributionComputeTaskTargetSizeInBytesMin %s", (long)arbitraryDistributionComputeTaskTargetSizeInBytesMax, (long)arbitraryDistributionComputeTaskTargetSizeInBytesMin);
        int arbitraryDistributionWriteTaskTargetSizeGrowthPeriod = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(session);
        double arbitraryDistributionWriteTaskTargetSizeGrowthFactor = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor(session);
        long arbitraryDistributionWriteTaskTargetSizeInBytesMin = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin(session).toBytes();
        long arbitraryDistributionWriteTaskTargetSizeInBytesMax = SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax(session).toBytes();
        Preconditions.checkArgument((arbitraryDistributionWriteTaskTargetSizeInBytesMax >= arbitraryDistributionWriteTaskTargetSizeInBytesMin ? 1 : 0) != 0, (String)"arbitraryDistributionWriteTaskTargetSizeInBytesMax %s should be larger than arbitraryDistributionWriteTaskTargetSizeInBytesMin %s", (long)arbitraryDistributionWriteTaskTargetSizeInBytesMax, (long)arbitraryDistributionWriteTaskTargetSizeInBytesMin);
        if (partitioning.equals(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            return new ArbitraryDistributionSplitAssigner(partitioning.getCatalogHandle(), (Set<PlanNodeId>)partitionedSources, replicatedSources, arbitraryDistributionComputeTaskTargetSizeGrowthPeriod, arbitraryDistributionComputeTaskTargetSizeGrowthFactor, arbitraryDistributionComputeTaskTargetSizeInBytesMin, arbitraryDistributionComputeTaskTargetSizeInBytesMax, standardSplitSizeInBytes, maxArbitraryDistributionTaskSplitCount);
        }
        if (partitioning.equals(SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
            return new ArbitraryDistributionSplitAssigner(partitioning.getCatalogHandle(), (Set<PlanNodeId>)partitionedSources, replicatedSources, arbitraryDistributionWriteTaskTargetSizeGrowthPeriod, arbitraryDistributionWriteTaskTargetSizeGrowthFactor, arbitraryDistributionWriteTaskTargetSizeInBytesMin, arbitraryDistributionWriteTaskTargetSizeInBytesMax, standardSplitSizeInBytes, maxArbitraryDistributionTaskSplitCount);
        }
        if (partitioning.equals(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() || partitioning.getConnectorHandle() instanceof MergePartitioningHandle) {
            return HashDistributionSplitAssigner.create(partitioning.getCatalogHandle(), (Set<PlanNodeId>)partitionedSources, replicatedSources, sourcePartitioningScheme, outputDataSizeEstimates, fragment, SystemSessionProperties.getFaultTolerantExecutionHashDistributionComputeTaskTargetSize(session).toBytes(), Integer.MAX_VALUE);
        }
        if (partitioning.equals(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION)) {
            return HashDistributionSplitAssigner.create(partitioning.getCatalogHandle(), (Set<PlanNodeId>)partitionedSources, replicatedSources, sourcePartitioningScheme, outputDataSizeEstimates, fragment, SystemSessionProperties.getFaultTolerantExecutionHashDistributionWriteTaskTargetSize(session).toBytes(), SystemSessionProperties.getFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(session));
        }
        throw new IllegalArgumentException("Unexpected partitioning: " + partitioning);
    }
}

