/*
 * Decompiled with CFR 0.152.
 */
package io.trino.split;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.trino.metadata.Split;
import io.trino.spi.connector.CatalogHandle;
import io.trino.split.SplitSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;

public class BufferingSplitSource
implements SplitSource {
    private final int bufferSize;
    private final SplitSource source;
    private final Executor executor;

    public BufferingSplitSource(SplitSource source, Executor executor, int bufferSize) {
        this.source = Objects.requireNonNull(source, "source is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.bufferSize = bufferSize;
    }

    @Override
    public CatalogHandle getCatalogHandle() {
        return this.source.getCatalogHandle();
    }

    @Override
    public ListenableFuture<SplitSource.SplitBatch> getNextBatch(int maxSize) {
        Preconditions.checkArgument((maxSize > 0 ? 1 : 0) != 0, (Object)"Cannot fetch a batch of zero size");
        return GetNextBatch.fetchNextBatchAsync(this.source, this.executor, Math.min(this.bufferSize, maxSize), maxSize);
    }

    @Override
    public void close() {
        this.source.close();
    }

    @Override
    public boolean isFinished() {
        return this.source.isFinished();
    }

    @Override
    public Optional<List<Object>> getTableExecuteSplitsInfo() {
        return this.source.getTableExecuteSplitsInfo();
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("bufferSize", this.bufferSize).add("source", (Object)this.source).toString();
    }

    private static class GetNextBatch
    extends AbstractFuture<SplitSource.SplitBatch> {
        private final Context context = Context.current();
        private final SplitSource splitSource;
        private final Executor executor;
        private final int min;
        private final int max;
        @GuardedBy(value="this")
        private final List<Split> splits = new ArrayList<Split>();
        @GuardedBy(value="this")
        private ListenableFuture<SplitSource.SplitBatch> nextBatchFuture;

        public static ListenableFuture<SplitSource.SplitBatch> fetchNextBatchAsync(SplitSource splitSource, Executor executor, int min, int max) {
            GetNextBatch getNextBatch = new GetNextBatch(splitSource, executor, min, max);
            getNextBatch.fetchSplits();
            return getNextBatch;
        }

        private GetNextBatch(SplitSource splitSource, Executor executor, int min, int max) {
            this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
            this.executor = Objects.requireNonNull(executor, "executor is null");
            Preconditions.checkArgument((min <= max ? 1 : 0) != 0, (Object)"Min splits greater than max splits");
            this.min = min;
            this.max = max;
        }

        private synchronized void fetchSplits() {
            Preconditions.checkState((this.nextBatchFuture == null || this.nextBatchFuture.isDone() ? 1 : 0) != 0, (Object)"nextBatchFuture is expected to be done");
            try (Scope scope = this.context.makeCurrent();){
                this.nextBatchFuture = this.splitSource.getNextBatch(this.max - this.splits.size());
                while (this.nextBatchFuture.isDone()) {
                    Futures.addCallback(this.nextBatchFuture, (FutureCallback)new FutureCallback<SplitSource.SplitBatch>(){

                        public void onSuccess(SplitSource.SplitBatch splitBatch) {
                            this.processBatch(splitBatch);
                        }

                        public void onFailure(Throwable throwable) {
                            this.setException(throwable);
                        }
                    }, (Executor)MoreExecutors.directExecutor());
                    if (this.isDone()) {
                        return;
                    }
                    this.nextBatchFuture = this.splitSource.getNextBatch(this.max - this.splits.size());
                }
            }
            Futures.addCallback(this.nextBatchFuture, (FutureCallback)new FutureCallback<SplitSource.SplitBatch>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void onSuccess(SplitSource.SplitBatch splitBatch) {
                    GetNextBatch getNextBatch = this;
                    synchronized (getNextBatch) {
                        if (this.processBatch(splitBatch)) {
                            return;
                        }
                        this.fetchSplits();
                    }
                }

                public void onFailure(Throwable throwable) {
                    this.setException(throwable);
                }
            }, (Executor)this.executor);
        }

        private synchronized boolean processBatch(SplitSource.SplitBatch splitBatch) {
            this.splits.addAll(splitBatch.getSplits());
            boolean isLastBatch = splitBatch.isLastBatch();
            if (this.splits.size() >= this.min || isLastBatch) {
                this.set(new SplitSource.SplitBatch((List<Split>)ImmutableList.copyOf(this.splits), isLastBatch));
                this.splits.clear();
                return true;
            }
            return false;
        }
    }
}

