/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator.exchange;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.trino.operator.WorkProcessor;
import io.trino.operator.exchange.LocalExchangeBufferInfo;
import io.trino.operator.exchange.LocalExchangeMemoryManager;
import io.trino.spi.Page;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class LocalExchangeSource {
    private static final ListenableFuture<Void> NOT_BLOCKED = Futures.immediateVoidFuture();
    private final LocalExchangeMemoryManager memoryManager;
    private final Consumer<LocalExchangeSource> onFinish;
    @GuardedBy(value="this")
    private final Queue<Page> buffer = new ArrayDeque<Page>();
    private final AtomicLong bufferedBytes = new AtomicLong();
    private final AtomicInteger bufferedPages = new AtomicInteger();
    @Nullable
    @GuardedBy(value="this")
    private SettableFuture<Void> notEmptyFuture;
    private volatile boolean finishing;

    public LocalExchangeSource(LocalExchangeMemoryManager memoryManager, Consumer<LocalExchangeSource> onFinish) {
        this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager is null");
        this.onFinish = Objects.requireNonNull(onFinish, "onFinish is null");
    }

    public LocalExchangeBufferInfo getBufferInfo() {
        return new LocalExchangeBufferInfo(this.bufferedBytes.get(), this.bufferedPages.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addPage(Page page) {
        this.assertNotHoldsLock();
        boolean added = false;
        SettableFuture<Void> notEmptyFuture = null;
        long retainedSizeInBytes = page.getRetainedSizeInBytes();
        LocalExchangeSource localExchangeSource = this;
        synchronized (localExchangeSource) {
            if (!this.finishing) {
                this.bufferedBytes.addAndGet(retainedSizeInBytes);
                this.bufferedPages.incrementAndGet();
                this.buffer.add(page);
                added = true;
            }
            if (this.notEmptyFuture != null) {
                notEmptyFuture = this.notEmptyFuture;
                this.notEmptyFuture = null;
            }
        }
        if (!added) {
            this.memoryManager.updateMemoryUsage(-retainedSizeInBytes);
        }
        if (notEmptyFuture != null) {
            notEmptyFuture.set(null);
        }
    }

    public WorkProcessor<Page> pages() {
        return WorkProcessor.create(() -> {
            Page page = this.removePage();
            if (page == null) {
                if (this.isFinished()) {
                    return WorkProcessor.ProcessState.finished();
                }
                ListenableFuture<Void> blocked = this.waitForReading();
                if (!blocked.isDone()) {
                    return WorkProcessor.ProcessState.blocked(blocked);
                }
                return WorkProcessor.ProcessState.yielded();
            }
            return WorkProcessor.ProcessState.ofResult(page);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Page removePage() {
        Page page;
        this.assertNotHoldsLock();
        LocalExchangeSource localExchangeSource = this;
        synchronized (localExchangeSource) {
            page = this.buffer.poll();
        }
        if (page == null) {
            return null;
        }
        long retainedSizeInBytes = page.getRetainedSizeInBytes();
        this.memoryManager.updateMemoryUsage(-retainedSizeInBytes);
        this.bufferedBytes.addAndGet(-retainedSizeInBytes);
        this.bufferedPages.decrementAndGet();
        this.checkFinished();
        return page;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<Void> waitForReading() {
        this.assertNotHoldsLock();
        if (this.finishing || this.bufferedPages.get() > 0) {
            return NOT_BLOCKED;
        }
        LocalExchangeSource localExchangeSource = this;
        synchronized (localExchangeSource) {
            if (this.finishing || this.bufferedPages.get() > 0) {
                return NOT_BLOCKED;
            }
            if (this.notEmptyFuture == null) {
                this.notEmptyFuture = SettableFuture.create();
            }
            return this.notEmptyFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isFinished() {
        if (!this.finishing) {
            return false;
        }
        LocalExchangeSource localExchangeSource = this;
        synchronized (localExchangeSource) {
            return this.finishing && this.bufferedPages.get() == 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finish() {
        SettableFuture<Void> notEmptyFuture;
        this.assertNotHoldsLock();
        LocalExchangeSource localExchangeSource = this;
        synchronized (localExchangeSource) {
            if (this.finishing) {
                return;
            }
            this.finishing = true;
            notEmptyFuture = this.notEmptyFuture;
            this.notEmptyFuture = null;
        }
        if (notEmptyFuture != null) {
            notEmptyFuture.set(null);
        }
        this.checkFinished();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        SettableFuture<Void> notEmptyFuture;
        this.assertNotHoldsLock();
        int remainingPagesCount = 0;
        long remainingPagesRetainedSizeInBytes = 0L;
        LocalExchangeSource localExchangeSource = this;
        synchronized (localExchangeSource) {
            this.finishing = true;
            for (Page page : this.buffer) {
                ++remainingPagesCount;
                remainingPagesRetainedSizeInBytes += page.getRetainedSizeInBytes();
            }
            this.buffer.clear();
            this.bufferedBytes.addAndGet(-remainingPagesRetainedSizeInBytes);
            this.bufferedPages.addAndGet(-remainingPagesCount);
            notEmptyFuture = this.notEmptyFuture;
            this.notEmptyFuture = null;
        }
        this.memoryManager.updateMemoryUsage(-remainingPagesRetainedSizeInBytes);
        if (notEmptyFuture != null) {
            notEmptyFuture.set(null);
        }
        Preconditions.checkState((boolean)this.isFinished(), (Object)"Expected buffer to be finished");
        this.checkFinished();
    }

    private void checkFinished() {
        this.assertNotHoldsLock();
        if (this.isFinished()) {
            this.onFinish.accept(this);
        }
    }

    private void assertNotHoldsLock() {
        assert (!Thread.holdsLock(this)) : "Cannot execute this method while holding the lock";
    }
}

