/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.buffer;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.BufferState;
import io.trino.execution.buffer.OutputBuffer;
import io.trino.execution.buffer.OutputBufferInfo;
import io.trino.execution.buffer.OutputBufferStateMachine;
import io.trino.execution.buffer.OutputBufferStatus;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.PagesSerdeUtil;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.SpoolingOutputBuffers;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.spi.exchange.ExchangeSink;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class SpoolingExchangeOutputBuffer
implements OutputBuffer {
    private static final Logger log = Logger.get(SpoolingExchangeOutputBuffer.class);
    private final OutputBufferStateMachine stateMachine;
    private volatile SpoolingOutputBuffers outputBuffers;
    private ExchangeSink exchangeSink;
    private final Supplier<LocalMemoryContext> memoryContextSupplier;
    private final AtomicLong peakMemoryUsage = new AtomicLong();
    private final AtomicLong totalPagesAdded = new AtomicLong();
    private final AtomicLong totalRowsAdded = new AtomicLong();
    private final SpoolingOutputStats outputStats;

    public SpoolingExchangeOutputBuffer(OutputBufferStateMachine stateMachine, SpoolingOutputBuffers outputBuffers, ExchangeSink exchangeSink, Supplier<LocalMemoryContext> memoryContextSupplier) {
        this.stateMachine = Objects.requireNonNull(stateMachine, "stateMachine is null");
        this.outputBuffers = Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        this.exchangeSink = Objects.requireNonNull(exchangeSink, "exchangeSink is null");
        this.memoryContextSupplier = Objects.requireNonNull(memoryContextSupplier, "memoryContextSupplier is null");
        stateMachine.noMoreBuffers();
        this.outputStats = new SpoolingOutputStats(outputBuffers.getOutputPartitionCount());
    }

    @Override
    public OutputBufferInfo getInfo() {
        BufferState state = this.stateMachine.getState();
        LocalMemoryContext memoryContext = this.getSystemMemoryContextOrNull();
        return new OutputBufferInfo("EXTERNAL", state, false, state.canAddPages(), memoryContext == null ? 0L : memoryContext.getBytes(), this.totalPagesAdded.get(), this.totalRowsAdded.get(), this.totalPagesAdded.get(), Optional.empty(), Optional.empty(), this.outputStats.getFinalSnapshot());
    }

    @Override
    public BufferState getState() {
        return this.stateMachine.getState();
    }

    @Override
    public double getUtilization() {
        return 0.0;
    }

    @Override
    public OutputBufferStatus getStatus() {
        OutputBufferStatus.Builder result = OutputBufferStatus.builder(this.outputBuffers.getVersion());
        ExchangeSink sink = this.exchangeSink;
        if (sink != null) {
            result.setExchangeSinkInstanceHandleUpdateRequired(sink.isHandleUpdateRequired());
        }
        return result.build();
    }

    @Override
    public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener) {
        this.stateMachine.addStateChangeListener(stateChangeListener);
    }

    @Override
    public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers) {
        Objects.requireNonNull(newOutputBuffers, "newOutputBuffers is null");
        if (this.stateMachine.getState().isTerminal() || this.outputBuffers.getVersion() >= newOutputBuffers.getVersion()) {
            return;
        }
        this.outputBuffers.checkValidTransition(newOutputBuffers);
        ExchangeSink sink = this.exchangeSink;
        if (sink != null) {
            sink.updateHandle(((SpoolingOutputBuffers)newOutputBuffers).getExchangeSinkInstanceHandle());
        }
        this.outputBuffers = (SpoolingOutputBuffers)newOutputBuffers;
    }

    @Override
    public ListenableFuture<BufferResult> get(PipelinedOutputBuffers.OutputBufferId bufferId, long token, DataSize maxSize) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void acknowledge(PipelinedOutputBuffers.OutputBufferId bufferId, long token) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void destroy(PipelinedOutputBuffers.OutputBufferId bufferId) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ListenableFuture<Void> isFull() {
        ExchangeSink sink = this.exchangeSink;
        if (sink != null) {
            return MoreFutures.toListenableFuture((CompletableFuture)sink.isBlocked());
        }
        return Futures.immediateVoidFuture();
    }

    @Override
    public void enqueue(List<Slice> pages) {
        this.enqueue(0, pages);
    }

    @Override
    public void enqueue(int partition, List<Slice> pages) {
        Objects.requireNonNull(pages, "pages is null");
        if (!this.stateMachine.getState().canAddPages()) {
            return;
        }
        ExchangeSink sink = this.exchangeSink;
        Preconditions.checkState((sink != null ? 1 : 0) != 0, (Object)"exchangeSink is null");
        long dataSizeInBytes = 0L;
        for (Slice page : pages) {
            dataSizeInBytes += (long)PagesSerdeUtil.getSerializedPageUncompressedSizeInBytes(page);
            sink.add(partition, page);
            this.totalRowsAdded.addAndGet(PagesSerdeUtil.getSerializedPagePositionCount(page));
        }
        this.updateMemoryUsage(sink.getMemoryUsage());
        this.totalPagesAdded.addAndGet(pages.size());
        this.outputStats.update(partition, dataSizeInBytes);
    }

    @Override
    public void setNoMorePages() {
        if (!this.stateMachine.noMorePages()) {
            return;
        }
        this.outputStats.finish();
        ExchangeSink sink = this.exchangeSink;
        if (sink == null) {
            return;
        }
        sink.finish().whenComplete((value, failure) -> {
            if (failure != null) {
                this.stateMachine.fail((Throwable)failure);
            } else {
                this.stateMachine.finish();
            }
            this.exchangeSink = null;
            this.forceFreeMemory();
        });
    }

    @Override
    public void destroy() {
        this.abort();
    }

    @Override
    public void abort() {
        if (!this.stateMachine.abort()) {
            return;
        }
        ExchangeSink sink = this.exchangeSink;
        if (sink == null) {
            return;
        }
        sink.abort().whenComplete((value, failure) -> {
            if (failure != null) {
                log.warn(failure, "Error aborting exchange sink");
            }
            this.exchangeSink = null;
            this.forceFreeMemory();
        });
    }

    @Override
    public long getPeakMemoryUsage() {
        return this.peakMemoryUsage.get();
    }

    @Override
    public Optional<Throwable> getFailureCause() {
        return this.stateMachine.getFailureCause();
    }

    private void updateMemoryUsage(long bytes) {
        LocalMemoryContext context = this.getSystemMemoryContextOrNull();
        if (context != null) {
            context.setBytes(bytes);
        }
        this.updatePeakMemoryUsage(bytes);
    }

    private void updatePeakMemoryUsage(long bytes) {
        long currentValue;
        do {
            if ((currentValue = this.peakMemoryUsage.get()) < bytes) continue;
            return;
        } while (!this.peakMemoryUsage.compareAndSet(currentValue, bytes));
    }

    private void forceFreeMemory() {
        LocalMemoryContext context = this.getSystemMemoryContextOrNull();
        if (context != null) {
            context.close();
        }
    }

    private LocalMemoryContext getSystemMemoryContextOrNull() {
        try {
            return this.memoryContextSupplier.get();
        }
        catch (RuntimeException ignored) {
            return null;
        }
    }
}

