/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler.faulttolerant;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.MoreFutures;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.faulttolerant.FaultTolerantPartitioningScheme;
import io.trino.execution.scheduler.faulttolerant.SplitAssigner;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.spi.QueryId;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import io.trino.split.RemoteSplit;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

@ThreadSafe
class EventDrivenTaskSource
implements Closeable {
    private final QueryId queryId;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final Map<PlanFragmentId, Exchange> sourceExchanges;
    private final SetMultimap<PlanNodeId, PlanFragmentId> remoteSources;
    private final Supplier<Map<PlanNodeId, SplitSource>> splitSourceSupplier;
    private final SplitAssigner assigner;
    private final Executor executor;
    private final int splitBatchSize;
    private final long targetExchangeSplitSizeInBytes;
    private final FaultTolerantPartitioningScheme sourcePartitioningScheme;
    private final LongConsumer getSplitTimeRecorder;
    @GuardedBy(value="this")
    private boolean initialized;
    @GuardedBy(value="this")
    private List<IdempotentSplitSource> splitSources;
    @GuardedBy(value="this")
    private final Set<PlanFragmentId> completedFragments = new HashSet<PlanFragmentId>();
    @GuardedBy(value="this")
    private ListenableFuture<SplitAssigner.AssignmentResult> future;
    @GuardedBy(value="this")
    private boolean finished;
    @GuardedBy(value="this")
    private boolean closed;
    @GuardedBy(value="this")
    private final Closer closer = Closer.create();

    EventDrivenTaskSource(QueryId queryId, TableExecuteContextManager tableExecuteContextManager, Map<PlanFragmentId, Exchange> sourceExchanges, SetMultimap<PlanNodeId, PlanFragmentId> remoteSources, Supplier<Map<PlanNodeId, SplitSource>> splitSourceSupplier, SplitAssigner assigner, Executor executor, int splitBatchSize, long targetExchangeSplitSizeInBytes, FaultTolerantPartitioningScheme sourcePartitioningScheme, LongConsumer getSplitTimeRecorder) {
        this.queryId = Objects.requireNonNull(queryId, "queryId is null");
        this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.sourceExchanges = ImmutableMap.copyOf(Objects.requireNonNull(sourceExchanges, "sourceExchanges is null"));
        this.remoteSources = ImmutableSetMultimap.copyOf((Multimap)((Multimap)Objects.requireNonNull(remoteSources, "remoteSources is null")));
        this.splitSourceSupplier = Objects.requireNonNull(splitSourceSupplier, "splitSourceSupplier is null");
        this.assigner = Objects.requireNonNull(assigner, "assigner is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.splitBatchSize = splitBatchSize;
        this.targetExchangeSplitSizeInBytes = targetExchangeSplitSizeInBytes;
        this.sourcePartitioningScheme = Objects.requireNonNull(sourcePartitioningScheme, "sourcePartitioningScheme is null");
        this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
    }

    public synchronized Optional<ListenableFuture<SplitAssigner.AssignmentResult>> process() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"closed");
        Preconditions.checkState((this.future == null || this.future.isDone() ? 1 : 0) != 0, (Object)"still in process");
        if (this.finished) {
            return Optional.empty();
        }
        if (!this.initialized) {
            this.initialize();
            this.initialized = true;
        }
        this.future = this.processNext();
        return Optional.of(this.future);
    }

    @GuardedBy(value="this")
    private void initialize() {
        HashMap remoteSourceNodeIds = new HashMap();
        this.remoteSources.forEach((planNodeId, planFragmentId) -> remoteSourceNodeIds.put(planFragmentId, planNodeId));
        ImmutableList.Builder splitSources = ImmutableList.builder();
        for (Map.Entry<PlanFragmentId, Exchange> entry : this.sourceExchanges.entrySet()) {
            PlanFragmentId sourceFragmentId = entry.getKey();
            PlanNodeId remoteSourceNodeId = (PlanNodeId)remoteSourceNodeIds.get(sourceFragmentId);
            Verify.verify((remoteSourceNodeId != null ? 1 : 0) != 0, (String)"remote source not found for fragment: %s", (Object)sourceFragmentId);
            ExchangeSourceHandleSource handleSource = (ExchangeSourceHandleSource)this.closer.register((Closeable)entry.getValue().getSourceHandles());
            ExchangeSplitSource splitSource = (ExchangeSplitSource)this.closer.register((Closeable)new ExchangeSplitSource(handleSource, this.targetExchangeSplitSizeInBytes));
            splitSources.add((Object)((IdempotentSplitSource)this.closer.register((Closeable)new IdempotentSplitSource(this.queryId, this.tableExecuteContextManager, remoteSourceNodeId, Optional.of(sourceFragmentId), splitSource, this.splitBatchSize, this.getSplitTimeRecorder))));
        }
        for (Map.Entry<Object, Object> entry : this.splitSourceSupplier.get().entrySet()) {
            splitSources.add((Object)((IdempotentSplitSource)this.closer.register((Closeable)new IdempotentSplitSource(this.queryId, this.tableExecuteContextManager, (PlanNodeId)entry.getKey(), Optional.empty(), (SplitSource)this.closer.register((Closeable)((SplitSource)entry.getValue())), this.splitBatchSize, this.getSplitTimeRecorder))));
        }
        this.splitSources = splitSources.build();
    }

    @GuardedBy(value="this")
    private ListenableFuture<SplitAssigner.AssignmentResult> processNext() {
        List futures = (List)this.splitSources.stream().map(IdempotentSplitSource::getNext).filter(Optional::isPresent).map(Optional::get).collect(ImmutableList.toImmutableList());
        if (futures.isEmpty()) {
            this.finished = true;
            return Futures.immediateFuture((Object)this.assigner.finish());
        }
        ListenableFuture firstCompleted = MoreFutures.whenAnyCompleteCancelOthers((Iterable)futures);
        return Futures.transform((ListenableFuture)firstCompleted, this::process, (Executor)this.executor);
    }

    private synchronized SplitAssigner.AssignmentResult process(IdempotentSplitSource.SplitBatchReference batchReference) {
        PlanNodeId sourceNodeId = batchReference.getPlanNodeId();
        Optional<PlanFragmentId> sourceFragmentId = batchReference.getSourceFragmentId();
        SplitSource.SplitBatch splitBatch = batchReference.getSplitBatchAndAdvance();
        boolean noMoreSplits = false;
        if (splitBatch.isLastBatch()) {
            if (sourceFragmentId.isPresent()) {
                this.completedFragments.add(sourceFragmentId.get());
                noMoreSplits = this.completedFragments.containsAll(this.remoteSources.get((Object)sourceNodeId));
            } else {
                noMoreSplits = true;
            }
        }
        ListMultimap splits = (ListMultimap)splitBatch.getSplits().stream().collect(ImmutableListMultimap.toImmutableListMultimap(this::getSplitPartition, Function.identity()));
        return this.assigner.assign(sourceNodeId, (ListMultimap<Integer, Split>)splits, noMoreSplits);
    }

    private int getSplitPartition(Split split) {
        ConnectorSplit connectorSplit = split.getConnectorSplit();
        if (connectorSplit instanceof RemoteSplit) {
            RemoteSplit remoteSplit = (RemoteSplit)connectorSplit;
            SpoolingExchangeInput exchangeInput = (SpoolingExchangeInput)remoteSplit.getExchangeInput();
            List<ExchangeSourceHandle> handles = exchangeInput.getExchangeSourceHandles();
            return handles.get(0).getPartitionId();
        }
        return this.sourcePartitioningScheme.getPartition(split);
    }

    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            this.closer.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public String getDebugInfo() {
        return MoreObjects.toStringHelper((Object)this).add("sourceExchanges", (Object)Maps.transformValues(this.sourceExchanges, Exchange::getId)).add("remoteSources", this.remoteSources).add("assigner", (Object)this.assigner).add("splitBatchSize", this.splitBatchSize).add("targetExchangeSplitSizeInBytes", this.targetExchangeSplitSizeInBytes).add("sourcePartitioningScheme", (Object)this.sourcePartitioningScheme).add("initialized", this.initialized).add("splitSources", this.splitSources).add("completedFragments", this.completedFragments).add("future", this.future).add("closed", this.closed).toString();
    }

    private static class ExchangeSplitSource
    implements SplitSource {
        private final ExchangeSourceHandleSource handleSource;
        private final long targetSplitSizeInBytes;

        private ExchangeSplitSource(ExchangeSourceHandleSource handleSource, long targetSplitSizeInBytes) {
            this.handleSource = Objects.requireNonNull(handleSource, "handleSource is null");
            this.targetSplitSizeInBytes = targetSplitSizeInBytes;
        }

        @Override
        public CatalogHandle getCatalogHandle() {
            return ExchangeOperator.REMOTE_CATALOG_HANDLE;
        }

        @Override
        public ListenableFuture<SplitSource.SplitBatch> getNextBatch(int maxSize) {
            ListenableFuture sourceHandlesFuture = MoreFutures.toListenableFuture((CompletableFuture)this.handleSource.getNextBatch());
            return Futures.transform((ListenableFuture)sourceHandlesFuture, batch -> {
                List handles = batch.handles();
                ListMultimap partitionToHandles = (ListMultimap)handles.stream().collect(ImmutableListMultimap.toImmutableListMultimap(ExchangeSourceHandle::getPartitionId, Function.identity()));
                ImmutableList.Builder splits = ImmutableList.builder();
                Iterator iterator = partitionToHandles.keySet().iterator();
                while (iterator.hasNext()) {
                    int partition = (Integer)iterator.next();
                    splits.addAll(this.createRemoteSplits(partitionToHandles.get((Object)partition)));
                }
                return new SplitSource.SplitBatch((List<Split>)splits.build(), batch.lastBatch());
            }, (Executor)MoreExecutors.directExecutor());
        }

        private List<Split> createRemoteSplits(List<ExchangeSourceHandle> handles) {
            ImmutableList.Builder result = ImmutableList.builder();
            ImmutableList.Builder currentSplitHandles = ImmutableList.builder();
            long currentSplitHandlesSize = 0L;
            long currentSplitHandlesCount = 0L;
            for (ExchangeSourceHandle handle : handles) {
                if (currentSplitHandlesCount > 0L && currentSplitHandlesSize + handle.getDataSizeInBytes() > this.targetSplitSizeInBytes) {
                    result.add((Object)ExchangeSplitSource.createRemoteSplit((List<ExchangeSourceHandle>)currentSplitHandles.build()));
                    currentSplitHandles = ImmutableList.builder();
                    currentSplitHandlesSize = 0L;
                    currentSplitHandlesCount = 0L;
                }
                currentSplitHandles.add((Object)handle);
                currentSplitHandlesSize += handle.getDataSizeInBytes();
                ++currentSplitHandlesCount;
            }
            if (currentSplitHandlesCount > 0L) {
                result.add((Object)ExchangeSplitSource.createRemoteSplit((List<ExchangeSourceHandle>)currentSplitHandles.build()));
            }
            return result.build();
        }

        private static Split createRemoteSplit(List<ExchangeSourceHandle> handles) {
            return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(handles, Optional.empty())));
        }

        @Override
        public void close() {
            this.handleSource.close();
        }

        @Override
        public boolean isFinished() {
            throw new UnsupportedOperationException();
        }

        @Override
        public Optional<List<Object>> getTableExecuteSplitsInfo() {
            return Optional.empty();
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("handleSource", (Object)this.handleSource).add("targetSplitSizeInBytes", this.targetSplitSizeInBytes).toString();
        }
    }

    private static class IdempotentSplitSource
    implements Closeable {
        private final QueryId queryId;
        private final TableExecuteContextManager tableExecuteContextManager;
        private final PlanNodeId planNodeId;
        private final Optional<PlanFragmentId> sourceFragmentId;
        private final SplitSource splitSource;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;
        @GuardedBy(value="this")
        private Optional<CallbackProxyFuture<SplitBatchReference>> future = Optional.empty();
        @GuardedBy(value="this")
        private boolean closed;
        @GuardedBy(value="this")
        private boolean finished;

        private IdempotentSplitSource(QueryId queryId, TableExecuteContextManager tableExecuteContextManager, PlanNodeId planNodeId, Optional<PlanFragmentId> sourceFragmentId, SplitSource splitSource, int splitBatchSize, LongConsumer getSplitTimeRecorder) {
            this.queryId = Objects.requireNonNull(queryId, "queryId is null");
            this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
            this.planNodeId = Objects.requireNonNull(planNodeId, "planNodeId is null");
            this.sourceFragmentId = Objects.requireNonNull(sourceFragmentId, "sourceFragmentId is null");
            this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
            this.splitBatchSize = splitBatchSize;
            this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
        }

        public synchronized Optional<ListenableFuture<SplitBatchReference>> getNext() {
            if (this.future.isEmpty() && !this.finished) {
                long start = System.nanoTime();
                this.future = Optional.of(new CallbackProxyFuture(Futures.transform(this.splitSource.getNextBatch(this.splitBatchSize), batch -> {
                    this.getSplitTimeRecorder.accept(start);
                    if (batch.isLastBatch()) {
                        Optional<List<Object>> tableExecuteSplitsInfo = this.splitSource.getTableExecuteSplitsInfo();
                        tableExecuteSplitsInfo.ifPresent(info -> {
                            TableExecuteContext tableExecuteContext = this.tableExecuteContextManager.getTableExecuteContextForQuery(this.queryId);
                            tableExecuteContext.setSplitsInfo((List<Object>)info);
                        });
                    }
                    return new SplitBatchReference((SplitSource.SplitBatch)batch);
                }, (Executor)MoreExecutors.directExecutor())));
            }
            return this.future.map(CallbackProxyFuture::addListener);
        }

        @Override
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.future.isPresent() && !this.future.get().isDone()) {
                this.future.get().cancel(true);
            }
            this.splitSource.close();
        }

        private synchronized void advance(boolean lastBatch) {
            this.finished = lastBatch;
            this.future = Optional.empty();
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper((Object)this).add("planNodeId", (Object)this.planNodeId).add("sourceFragmentId", this.sourceFragmentId).add("splitSource", (Object)this.splitSource).add("splitBatchSize", this.splitBatchSize).add("closed", this.closed).add("finished", this.finished).toString();
        }

        public class SplitBatchReference {
            private final SplitSource.SplitBatch splitBatch;

            public SplitBatchReference(SplitSource.SplitBatch splitBatch) {
                this.splitBatch = Objects.requireNonNull(splitBatch, "splitBatch is null");
            }

            public PlanNodeId getPlanNodeId() {
                return IdempotentSplitSource.this.planNodeId;
            }

            public Optional<PlanFragmentId> getSourceFragmentId() {
                return IdempotentSplitSource.this.sourceFragmentId;
            }

            public SplitSource.SplitBatch getSplitBatchAndAdvance() {
                IdempotentSplitSource.this.advance(this.splitBatch.isLastBatch());
                return this.splitBatch;
            }
        }
    }

    private record FutureRef<V>(SettableFuture<V> future) {
        private FutureRef(SettableFuture<V> future) {
            this.future = Objects.requireNonNull(future, "future is null");
        }

        @Override
        public boolean equals(Object o) {
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FutureRef futureRef = (FutureRef)o;
            return this.future == futureRef.future;
        }

        @Override
        public int hashCode() {
            return Objects.hashCode(this.future);
        }
    }

    private static class CallbackProxyFuture<T>
    extends ForwardingListenableFuture<T> {
        private final ListenableFuture<T> delegate;
        @GuardedBy(value="listeners")
        private final Set<FutureRef<T>> listeners = new HashSet<FutureRef<T>>();

        private CallbackProxyFuture(ListenableFuture<T> delegate) {
            this.delegate = Objects.requireNonNull(delegate, "delegate is null");
            delegate.addListener(this::propagateIfNecessary, MoreExecutors.directExecutor());
        }

        protected ListenableFuture<T> delegate() {
            return this.delegate;
        }

        public void addListener(Runnable listener, Executor executor) {
            throw new UnsupportedOperationException();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ListenableFuture<T> addListener() {
            SettableFuture listener = SettableFuture.create();
            Set<FutureRef<T>> set = this.listeners;
            synchronized (set) {
                this.listeners.add(new FutureRef(listener));
            }
            listener.addListener(() -> {
                if (listener.isCancelled()) {
                    Set<FutureRef<T>> set = this.listeners;
                    synchronized (set) {
                        this.listeners.remove(new FutureRef(listener));
                    }
                }
            }, MoreExecutors.directExecutor());
            this.propagateIfNecessary();
            return listener;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void propagateIfNecessary() {
            List futures;
            if (!this.delegate.isDone()) {
                return;
            }
            Set<FutureRef<T>> set = this.listeners;
            synchronized (set) {
                futures = (List)ImmutableList.copyOf(this.listeners).stream().map(FutureRef::future).collect(ImmutableList.toImmutableList());
                this.listeners.clear();
            }
            for (SettableFuture future : futures) {
                future.setFuture(this.delegate);
            }
        }
    }
}

