/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator.join;

import com.google.common.collect.Iterators;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.trino.operator.WorkProcessor;
import io.trino.operator.join.DefaultPageJoiner;
import io.trino.operator.join.LookupSource;
import io.trino.operator.join.LookupSourceFactory;
import io.trino.operator.join.LookupSourceProvider;
import io.trino.operator.join.PageJoiner;
import io.trino.operator.join.PartitionedConsumption;
import io.trino.operator.join.StaticLookupSourceProvider;
import io.trino.spi.Page;
import io.trino.spiller.PartitioningSpillerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import javax.annotation.Nullable;

public class SpillingJoinProcessor
implements WorkProcessor.Process<WorkProcessor<Page>> {
    private final Runnable afterClose;
    private final OptionalInt lookupJoinsCount;
    private final boolean waitForBuild;
    private final LookupSourceFactory lookupSourceFactory;
    private final ListenableFuture<LookupSourceProvider> lookupSourceProvider;
    private final PageJoiner.PageJoinerFactory pageJoinerFactory;
    private final PageJoiner sourcePagesJoiner;
    private final WorkProcessor<Page> joinedSourcePages;
    private boolean closed;
    @Nullable
    private ListenableFuture<PartitionedConsumption<Supplier<LookupSource>>> partitionedConsumption;
    @Nullable
    private Iterator<PartitionedConsumption.Partition<Supplier<LookupSource>>> lookupPartitions;
    @Nullable
    private PartitionedConsumption.Partition<Supplier<LookupSource>> previousPartition;
    @Nullable
    private ListenableFuture<Supplier<LookupSource>> previousPartitionLookupSource;

    public SpillingJoinProcessor(Runnable afterClose, OptionalInt lookupJoinsCount, boolean waitForBuild, LookupSourceFactory lookupSourceFactory, ListenableFuture<LookupSourceProvider> lookupSourceProvider, PartitioningSpillerFactory partitioningSpillerFactory, PageJoiner.PageJoinerFactory pageJoinerFactory, WorkProcessor<Page> sourcePages) {
        this.afterClose = Objects.requireNonNull(afterClose, "afterClose is null");
        this.lookupJoinsCount = Objects.requireNonNull(lookupJoinsCount, "lookupJoinsCount is null");
        this.waitForBuild = waitForBuild;
        this.lookupSourceFactory = Objects.requireNonNull(lookupSourceFactory, "lookupSourceFactory is null");
        this.lookupSourceProvider = Objects.requireNonNull(lookupSourceProvider, "lookupSourceProvider is null");
        this.pageJoinerFactory = Objects.requireNonNull(pageJoinerFactory, "pageJoinerFactory is null");
        this.sourcePagesJoiner = pageJoinerFactory.getPageJoiner(lookupSourceProvider, Optional.of(partitioningSpillerFactory), Collections.emptyIterator());
        this.joinedSourcePages = sourcePages.transform(this.sourcePagesJoiner);
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try (Closer closer = Closer.create();){
            closer.register(this.afterClose::run);
            closer.register((Closeable)this.sourcePagesJoiner);
            this.sourcePagesJoiner.getSpiller().ifPresent(arg_0 -> ((Closer)closer).register(arg_0));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public WorkProcessor.ProcessState<WorkProcessor<Page>> process() {
        if (this.waitForBuild && !this.lookupSourceProvider.isDone()) {
            return WorkProcessor.ProcessState.blocked(SpillingJoinProcessor.asVoid(this.lookupSourceProvider));
        }
        if (!this.joinedSourcePages.isFinished()) {
            return WorkProcessor.ProcessState.ofResult(this.joinedSourcePages);
        }
        if (this.partitionedConsumption == null) {
            this.partitionedConsumption = this.lookupSourceFactory.finishProbeOperator(this.lookupJoinsCount);
            return WorkProcessor.ProcessState.blocked(SpillingJoinProcessor.asVoid(this.partitionedConsumption));
        }
        if (this.lookupPartitions == null) {
            this.lookupPartitions = ((PartitionedConsumption)MoreFutures.getDone(this.partitionedConsumption)).beginConsumption();
        }
        if (this.previousPartition != null) {
            if (!this.previousPartitionLookupSource.isDone()) {
                return WorkProcessor.ProcessState.blocked(SpillingJoinProcessor.asVoid(this.previousPartitionLookupSource));
            }
            this.previousPartition.release();
            this.previousPartition = null;
            this.previousPartitionLookupSource = null;
        }
        if (!this.lookupPartitions.hasNext()) {
            this.close();
            return WorkProcessor.ProcessState.finished();
        }
        PartitionedConsumption.Partition<Supplier<LookupSource>> partition = this.lookupPartitions.next();
        this.previousPartition = partition;
        this.previousPartitionLookupSource = partition.load();
        return WorkProcessor.ProcessState.ofResult(this.joinUnspilledPages(partition));
    }

    private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> future) {
        return Futures.transform(future, v -> null, (Executor)MoreExecutors.directExecutor());
    }

    private WorkProcessor<Page> joinUnspilledPages(PartitionedConsumption.Partition<Supplier<LookupSource>> partition) {
        int partitionNumber = partition.number();
        WorkProcessor<Page> unspilledInputPages = WorkProcessor.fromIterator(this.sourcePagesJoiner.getSpiller().map(spiller -> spiller.getSpilledPages(partitionNumber)).orElse(Collections.emptyIterator()));
        Iterator<DefaultPageJoiner.SavedRow> savedRow = Optional.ofNullable(this.sourcePagesJoiner.getSpilledRows().remove(partitionNumber)).map(row -> Iterators.singletonIterator((Object)row)).orElse(Collections.emptyIterator());
        ListenableFuture unspilledLookupSourceProvider = Futures.transform(partition.load(), supplier -> new StaticLookupSourceProvider((LookupSource)supplier.get()), (Executor)MoreExecutors.directExecutor());
        return unspilledInputPages.transform(this.pageJoinerFactory.getPageJoiner((ListenableFuture<LookupSourceProvider>)unspilledLookupSourceProvider, Optional.empty(), savedRow));
    }
}

