/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.io.Closer;
import com.google.common.primitives.ImmutableIntArray;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.trino.connector.CatalogHandle;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.FaultTolerantPartitioningScheme;
import io.trino.execution.scheduler.NodeRequirements;
import io.trino.execution.scheduler.SplitAssigner;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.spi.QueryId;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import io.trino.split.RemoteSplit;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
class EventDrivenTaskSource
implements Closeable {
    private final QueryId queryId;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final Map<PlanFragmentId, Exchange> sourceExchanges;
    private final Map<PlanFragmentId, PlanNodeId> remoteSources;
    private final Supplier<Map<PlanNodeId, SplitSource>> splitSourceSupplier;
    @GuardedBy(value="assignerLock")
    private final SplitAssigner assigner;
    @GuardedBy(value="assignerLock")
    private final Callback callback;
    private final Executor executor;
    private final int splitBatchSize;
    private final long targetExchangeSplitSizeInBytes;
    private final FaultTolerantPartitioningScheme sourcePartitioningScheme;
    private final LongConsumer getSplitTimeRecorder;
    private final SetMultimap<PlanNodeId, PlanFragmentId> remoteSourceFragments;
    @GuardedBy(value="this")
    private boolean started;
    @GuardedBy(value="this")
    private boolean closed;
    @GuardedBy(value="this")
    private final Closer closer = Closer.create();
    private final Object assignerLock = new Object();
    @GuardedBy(value="assignerLock")
    private final Set<PlanFragmentId> finishedFragments = new HashSet<PlanFragmentId>();
    @GuardedBy(value="assignerLock")
    private final Set<PlanNodeId> allSources = new HashSet<PlanNodeId>();
    @GuardedBy(value="assignerLock")
    private final Set<PlanNodeId> finishedSources = new HashSet<PlanNodeId>();

    EventDrivenTaskSource(QueryId queryId, TableExecuteContextManager tableExecuteContextManager, Map<PlanFragmentId, Exchange> sourceExchanges, Map<PlanFragmentId, PlanNodeId> remoteSources, Supplier<Map<PlanNodeId, SplitSource>> splitSourceSupplier, SplitAssigner assigner, Callback callback, Executor executor, int splitBatchSize, long targetExchangeSplitSizeInBytes, FaultTolerantPartitioningScheme sourcePartitioningScheme, LongConsumer getSplitTimeRecorder) {
        this.queryId = Objects.requireNonNull(queryId, "queryId is null");
        this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.sourceExchanges = ImmutableMap.copyOf(Objects.requireNonNull(sourceExchanges, "sourceExchanges is null"));
        this.remoteSources = ImmutableMap.copyOf(Objects.requireNonNull(remoteSources, "remoteSources is null"));
        Preconditions.checkArgument((boolean)sourceExchanges.keySet().equals(remoteSources.keySet()), (String)"sourceExchanges and remoteSources are expected to contain the same set of keys: %s != %s", sourceExchanges.keySet(), remoteSources.keySet());
        this.splitSourceSupplier = Objects.requireNonNull(splitSourceSupplier, "splitSourceSupplier is null");
        this.assigner = Objects.requireNonNull(assigner, "assigner is null");
        this.callback = Objects.requireNonNull(callback, "callback is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.splitBatchSize = splitBatchSize;
        this.targetExchangeSplitSizeInBytes = targetExchangeSplitSizeInBytes;
        this.sourcePartitioningScheme = Objects.requireNonNull(sourcePartitioningScheme, "sourcePartitioningScheme is null");
        this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
        this.remoteSourceFragments = (SetMultimap)remoteSources.entrySet().stream().collect(ImmutableSetMultimap.toImmutableSetMultimap(Map.Entry::getValue, Map.Entry::getKey));
    }

    public synchronized void start() {
        Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"already closed");
        this.started = true;
        try {
            ArrayList<SplitLoader> splitLoaders = new ArrayList<SplitLoader>();
            for (Map.Entry<PlanFragmentId, Exchange> entry : this.sourceExchanges.entrySet()) {
                PlanFragmentId fragmentId = entry.getKey();
                PlanNodeId remoteSourceNodeId = this.getRemoteSourceNode(fragmentId);
                this.allSources.add(remoteSourceNodeId);
                ExchangeSourceHandleSource handleSource = (ExchangeSourceHandleSource)this.closer.register((Closeable)entry.getValue().getSourceHandles());
                ExchangeSplitSource splitSource = (ExchangeSplitSource)this.closer.register((Closeable)new ExchangeSplitSource(handleSource, this.targetExchangeSplitSizeInBytes));
                SplitLoader splitLoader = (SplitLoader)this.closer.register((Closeable)this.createExchangeSplitLoader(fragmentId, remoteSourceNodeId, splitSource));
                splitLoaders.add(splitLoader);
            }
            for (Map.Entry<Object, Object> entry : this.splitSourceSupplier.get().entrySet()) {
                PlanNodeId planNodeId = (PlanNodeId)entry.getKey();
                this.allSources.add(planNodeId);
                SplitLoader splitLoader = (SplitLoader)this.closer.register((Closeable)this.createTableScanSplitLoader(planNodeId, (SplitSource)entry.getValue()));
                splitLoaders.add(splitLoader);
            }
            if (splitLoaders.isEmpty()) {
                this.executor.execute(() -> {
                    try {
                        Object object = this.assignerLock;
                        synchronized (object) {
                            this.assigner.finish().update(this.callback);
                        }
                    }
                    catch (Throwable t) {
                        this.fail(t);
                    }
                });
            } else {
                splitLoaders.forEach(SplitLoader::start);
            }
        }
        catch (Throwable t) {
            block8: {
                try {
                    this.closer.close();
                }
                catch (Throwable closerFailure) {
                    if (closerFailure == t) break block8;
                    t.addSuppressed(closerFailure);
                }
            }
            throw t;
        }
    }

    private SplitLoader createExchangeSplitLoader(final PlanFragmentId fragmentId, final PlanNodeId remoteSourceNodeId, ExchangeSplitSource splitSource) {
        return new SplitLoader(splitSource, this.executor, ExchangeSplitSource::getSplitPartition, new SplitLoader.Callback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void update(ListMultimap<Integer, Split> splits, boolean noMoreSplitsForFragment) {
                try {
                    Object object = EventDrivenTaskSource.this.assignerLock;
                    synchronized (object) {
                        if (noMoreSplitsForFragment) {
                            EventDrivenTaskSource.this.finishedFragments.add(fragmentId);
                        }
                        boolean noMoreSplitsForRemoteSource = EventDrivenTaskSource.this.finishedFragments.containsAll(EventDrivenTaskSource.this.remoteSourceFragments.get((Object)remoteSourceNodeId));
                        EventDrivenTaskSource.this.assigner.assign(remoteSourceNodeId, splits, noMoreSplitsForRemoteSource).update(EventDrivenTaskSource.this.callback);
                        if (noMoreSplitsForRemoteSource) {
                            EventDrivenTaskSource.this.finishedSources.add(remoteSourceNodeId);
                        }
                        if (EventDrivenTaskSource.this.finishedSources.containsAll(EventDrivenTaskSource.this.allSources)) {
                            EventDrivenTaskSource.this.assigner.finish().update(EventDrivenTaskSource.this.callback);
                        }
                    }
                }
                catch (Throwable t) {
                    EventDrivenTaskSource.this.fail(t);
                }
            }

            @Override
            public void failed(Throwable t) {
                EventDrivenTaskSource.this.fail(t);
            }
        }, this.splitBatchSize, this.getSplitTimeRecorder);
    }

    private SplitLoader createTableScanSplitLoader(final PlanNodeId planNodeId, final SplitSource splitSource) {
        return new SplitLoader(splitSource, this.executor, this::getSplitPartition, new SplitLoader.Callback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void update(ListMultimap<Integer, Split> splits, boolean noMoreSplits) {
                try {
                    Object object = EventDrivenTaskSource.this.assignerLock;
                    synchronized (object) {
                        EventDrivenTaskSource.this.assigner.assign(planNodeId, splits, noMoreSplits).update(EventDrivenTaskSource.this.callback);
                        if (noMoreSplits) {
                            EventDrivenTaskSource.this.finishedSources.add(planNodeId);
                            Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
                            tableExecuteSplitsInfo.ifPresent(info -> {
                                TableExecuteContext tableExecuteContext = EventDrivenTaskSource.this.tableExecuteContextManager.getTableExecuteContextForQuery(EventDrivenTaskSource.this.queryId);
                                tableExecuteContext.setSplitsInfo((List<Object>)info);
                            });
                        }
                        if (EventDrivenTaskSource.this.finishedSources.containsAll(EventDrivenTaskSource.this.allSources)) {
                            EventDrivenTaskSource.this.assigner.finish().update(EventDrivenTaskSource.this.callback);
                        }
                    }
                }
                catch (Throwable t) {
                    EventDrivenTaskSource.this.fail(t);
                }
            }

            @Override
            public void failed(Throwable t) {
                EventDrivenTaskSource.this.fail(t);
            }
        }, this.splitBatchSize, this.getSplitTimeRecorder);
    }

    private PlanNodeId getRemoteSourceNode(PlanFragmentId fragmentId) {
        PlanNodeId planNodeId = this.remoteSources.get(fragmentId);
        Verify.verify((planNodeId != null ? 1 : 0) != 0, (String)"remote source not found for fragment: %s", (Object)fragmentId);
        return planNodeId;
    }

    private int getSplitPartition(Split split) {
        return this.sourcePartitioningScheme.getPartition(split);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fail(Throwable failure) {
        Object object = this.assignerLock;
        synchronized (object) {
            this.callback.failed(failure);
        }
        this.close();
    }

    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            this.closer.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static interface Callback {
        public void partitionsAdded(List<Partition> var1);

        public void noMorePartitions();

        public void partitionsUpdated(List<PartitionUpdate> var1);

        public void partitionsSealed(ImmutableIntArray var1);

        public void failed(Throwable var1);
    }

    private static class ExchangeSplitSource
    implements SplitSource {
        private final ExchangeSourceHandleSource handleSource;
        private final long targetSplitSizeInBytes;
        private final AtomicBoolean finished = new AtomicBoolean();

        private ExchangeSplitSource(ExchangeSourceHandleSource handleSource, long targetSplitSizeInBytes) {
            this.handleSource = Objects.requireNonNull(handleSource, "handleSource is null");
            this.targetSplitSizeInBytes = targetSplitSizeInBytes;
        }

        @Override
        public CatalogHandle getCatalogHandle() {
            return ExchangeOperator.REMOTE_CATALOG_HANDLE;
        }

        @Override
        public ListenableFuture<SplitSource.SplitBatch> getNextBatch(int maxSize) {
            ListenableFuture sourceHandlesFuture = MoreFutures.toListenableFuture((CompletableFuture)this.handleSource.getNextBatch());
            return Futures.transform((ListenableFuture)sourceHandlesFuture, batch -> {
                List handles = batch.handles();
                ListMultimap partitionToHandles = (ListMultimap)handles.stream().collect(ImmutableListMultimap.toImmutableListMultimap(ExchangeSourceHandle::getPartitionId, Function.identity()));
                ImmutableList.Builder splits = ImmutableList.builder();
                Iterator iterator = partitionToHandles.keySet().iterator();
                while (iterator.hasNext()) {
                    int partition = (Integer)iterator.next();
                    splits.addAll(this.createRemoteSplits(partitionToHandles.get((Object)partition)));
                }
                if (batch.lastBatch()) {
                    this.finished.set(true);
                }
                return new SplitSource.SplitBatch((List<Split>)splits.build(), batch.lastBatch());
            }, (Executor)MoreExecutors.directExecutor());
        }

        private List<Split> createRemoteSplits(List<ExchangeSourceHandle> handles) {
            ImmutableList.Builder result = ImmutableList.builder();
            ImmutableList.Builder currentSplitHandles = ImmutableList.builder();
            long currentSplitHandlesSize = 0L;
            long currentSplitHandlesCount = 0L;
            for (ExchangeSourceHandle handle : handles) {
                if (currentSplitHandlesCount > 0L && currentSplitHandlesSize + handle.getDataSizeInBytes() > this.targetSplitSizeInBytes) {
                    result.add((Object)ExchangeSplitSource.createRemoteSplit((List<ExchangeSourceHandle>)currentSplitHandles.build()));
                    currentSplitHandles = ImmutableList.builder();
                    currentSplitHandlesSize = 0L;
                    currentSplitHandlesCount = 0L;
                }
                currentSplitHandles.add((Object)handle);
                currentSplitHandlesSize += handle.getDataSizeInBytes();
                ++currentSplitHandlesCount;
            }
            if (currentSplitHandlesCount > 0L) {
                result.add((Object)ExchangeSplitSource.createRemoteSplit((List<ExchangeSourceHandle>)currentSplitHandles.build()));
            }
            return result.build();
        }

        private static Split createRemoteSplit(List<ExchangeSourceHandle> handles) {
            return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(handles, Optional.empty())));
        }

        private static int getSplitPartition(Split split) {
            RemoteSplit remoteSplit = (RemoteSplit)split.getConnectorSplit();
            SpoolingExchangeInput exchangeInput = (SpoolingExchangeInput)remoteSplit.getExchangeInput();
            List<ExchangeSourceHandle> handles = exchangeInput.getExchangeSourceHandles();
            return handles.get(0).getPartitionId();
        }

        @Override
        public void close() {
            this.handleSource.close();
        }

        @Override
        public boolean isFinished() {
            return this.finished.get();
        }

        @Override
        public Optional<List<Object>> getTableExecuteSplitsInfo() {
            return Optional.empty();
        }
    }

    private static class SplitLoader
    implements Closeable {
        private final SplitSource splitSource;
        private final Executor executor;
        private final ToIntFunction<Split> splitToPartition;
        private final Callback callback;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;
        @GuardedBy(value="this")
        private boolean started;
        @GuardedBy(value="this")
        private boolean closed;
        @GuardedBy(value="this")
        private ListenableFuture<SplitSource.SplitBatch> splitLoadingFuture;

        public SplitLoader(SplitSource splitSource, Executor executor, ToIntFunction<Split> splitToPartition, Callback callback, int splitBatchSize, LongConsumer getSplitTimeRecorder) {
            this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
            this.executor = Objects.requireNonNull(executor, "executor is null");
            this.splitToPartition = Objects.requireNonNull(splitToPartition, "splitToPartition is null");
            this.callback = Objects.requireNonNull(callback, "callback is null");
            this.splitBatchSize = splitBatchSize;
            this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
        }

        public synchronized void start() {
            Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
            Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"already closed");
            this.started = true;
            this.processNext();
        }

        private synchronized void processNext() {
            if (this.closed) {
                return;
            }
            Verify.verify((this.splitLoadingFuture == null || this.splitLoadingFuture.isDone() ? 1 : 0) != 0, (String)"splitLoadingFuture is still running", (Object[])new Object[0]);
            final long start = System.nanoTime();
            this.splitLoadingFuture = this.splitSource.getNextBatch(this.splitBatchSize);
            Futures.addCallback(this.splitLoadingFuture, (FutureCallback)new FutureCallback<SplitSource.SplitBatch>(){

                public void onSuccess(SplitSource.SplitBatch result) {
                    try {
                        getSplitTimeRecorder.accept(start);
                        ListMultimap splits = (ListMultimap)result.getSplits().stream().collect(ImmutableListMultimap.toImmutableListMultimap(splitToPartition::applyAsInt, Function.identity()));
                        boolean finished = result.isLastBatch() && splitSource.isFinished();
                        callback.update((ListMultimap<Integer, Split>)splits, finished);
                        if (!finished) {
                            this.processNext();
                        }
                    }
                    catch (Throwable t) {
                        callback.failed(t);
                    }
                }

                public void onFailure(Throwable t) {
                    callback.failed(t);
                }
            }, (Executor)this.executor);
        }

        @Override
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.splitLoadingFuture != null) {
                this.splitLoadingFuture.cancel(true);
                this.splitLoadingFuture = null;
            }
            this.splitSource.close();
        }

        public static interface Callback {
            public void update(ListMultimap<Integer, Split> var1, boolean var2);

            public void failed(Throwable var1);
        }
    }

    record PartitionUpdate(int partitionId, PlanNodeId planNodeId, List<Split> splits, boolean noMoreSplits) {
        public PartitionUpdate {
            Objects.requireNonNull(planNodeId, "planNodeId is null");
            splits = ImmutableList.copyOf((Collection)Objects.requireNonNull(splits, "splits is null"));
        }
    }

    record Partition(int partitionId, NodeRequirements nodeRequirements) {
        public Partition {
            Objects.requireNonNull(nodeRequirements, "nodeRequirements is null");
        }
    }
}

