/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.buffer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.BufferState;
import io.trino.execution.buffer.ClientBuffer;
import io.trino.execution.buffer.OutputBuffer;
import io.trino.execution.buffer.OutputBufferInfo;
import io.trino.execution.buffer.OutputBufferMemoryManager;
import io.trino.execution.buffer.OutputBufferStateMachine;
import io.trino.execution.buffer.OutputBufferStatus;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.PagesSerdeUtil;
import io.trino.execution.buffer.PipelinedBufferInfo;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.SerializedPageReference;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.plugin.base.metrics.TDigestHistogram;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

public class PartitionedOutputBuffer
implements OutputBuffer {
    private final OutputBufferStateMachine stateMachine;
    private final PipelinedOutputBuffers outputBuffers;
    private final OutputBufferMemoryManager memoryManager;
    private final SerializedPageReference.PagesReleasedListener onPagesReleased;
    private final List<ClientBuffer> partitions;
    private final AtomicLong totalPagesAdded = new AtomicLong();
    private final AtomicLong totalRowsAdded = new AtomicLong();

    public PartitionedOutputBuffer(String taskInstanceId, OutputBufferStateMachine stateMachine, PipelinedOutputBuffers outputBuffers, DataSize maxBufferSize, Supplier<LocalMemoryContext> memoryContextSupplier, Executor notificationExecutor) {
        this.stateMachine = Objects.requireNonNull(stateMachine, "stateMachine is null");
        Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        Preconditions.checkArgument((outputBuffers.getType() == PipelinedOutputBuffers.BufferType.PARTITIONED ? 1 : 0) != 0, (Object)"Expected a PARTITIONED output buffer descriptor");
        Preconditions.checkArgument((boolean)outputBuffers.isNoMoreBufferIds(), (Object)"Expected a final output buffer descriptor");
        this.outputBuffers = outputBuffers;
        this.memoryManager = new OutputBufferMemoryManager(maxBufferSize.toBytes(), Objects.requireNonNull(memoryContextSupplier, "memoryContextSupplier is null"), Objects.requireNonNull(notificationExecutor, "notificationExecutor is null"));
        this.onPagesReleased = SerializedPageReference.PagesReleasedListener.forOutputBufferMemoryManager(this.memoryManager);
        ImmutableList.Builder partitions = ImmutableList.builderWithExpectedSize((int)outputBuffers.getBuffers().keySet().size());
        for (PipelinedOutputBuffers.OutputBufferId bufferId : outputBuffers.getBuffers().keySet()) {
            ClientBuffer partition = new ClientBuffer(taskInstanceId, bufferId, this.onPagesReleased);
            partitions.add((Object)partition);
        }
        this.partitions = partitions.build();
        stateMachine.noMoreBuffers();
        this.checkFlushComplete();
    }

    @Override
    public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener) {
        this.stateMachine.addStateChangeListener(stateChangeListener);
    }

    @Override
    public double getUtilization() {
        return this.memoryManager.getUtilization();
    }

    @Override
    public OutputBufferStatus getStatus() {
        return OutputBufferStatus.builder(this.outputBuffers.getVersion()).setOverutilized(this.memoryManager.isOverutilized()).build();
    }

    @Override
    public OutputBufferInfo getInfo() {
        BufferState state = this.stateMachine.getState();
        int totalBufferedPages = 0;
        ImmutableList.Builder infos = ImmutableList.builderWithExpectedSize((int)this.partitions.size());
        for (ClientBuffer partition : this.partitions) {
            PipelinedBufferInfo bufferInfo = partition.getInfo();
            infos.add((Object)bufferInfo);
            totalBufferedPages += bufferInfo.getBufferedPages();
        }
        return new OutputBufferInfo("PARTITIONED", state, state.canAddBuffers(), state.canAddPages(), this.memoryManager.getBufferedBytes(), totalBufferedPages, this.totalRowsAdded.get(), this.totalPagesAdded.get(), Optional.of(infos.build()), Optional.of(new TDigestHistogram(this.memoryManager.getUtilizationHistogram())), Optional.empty(), Optional.empty());
    }

    @Override
    public BufferState getState() {
        return this.stateMachine.getState();
    }

    @Override
    public void setOutputBuffers(OutputBuffers newOutputBuffers) {
        Objects.requireNonNull(newOutputBuffers, "newOutputBuffers is null");
        if (this.stateMachine.getState().isTerminal() || this.outputBuffers.getVersion() >= newOutputBuffers.getVersion()) {
            return;
        }
        this.outputBuffers.checkValidTransition(newOutputBuffers);
    }

    @Override
    public ListenableFuture<Void> isFull() {
        return this.memoryManager.getBufferBlockedFuture();
    }

    @Override
    public void enqueue(List<Slice> pages) {
        Preconditions.checkState((this.partitions.size() == 1 ? 1 : 0) != 0, (Object)"Expected exactly one partition");
        this.enqueue(0, pages);
    }

    @Override
    public void enqueue(int partitionNumber, List<Slice> pages) {
        Objects.requireNonNull(pages, "pages is null");
        if (!this.stateMachine.getState().canAddPages()) {
            return;
        }
        ImmutableList.Builder references = ImmutableList.builderWithExpectedSize((int)pages.size());
        long bytesAdded = 0L;
        long rowCount = 0L;
        for (Slice page : pages) {
            bytesAdded += page.getRetainedSize();
            int positionCount = PagesSerdeUtil.getSerializedPagePositionCount(page);
            rowCount += (long)positionCount;
            references.add((Object)new SerializedPageReference(page, positionCount, 1));
        }
        ImmutableList serializedPageReferences = references.build();
        this.totalRowsAdded.addAndGet(rowCount);
        this.totalPagesAdded.addAndGet(serializedPageReferences.size());
        this.memoryManager.updateMemoryUsage(bytesAdded);
        this.partitions.get(partitionNumber).enqueuePages((Collection<SerializedPageReference>)serializedPageReferences);
        SerializedPageReference.dereferencePages((List<SerializedPageReference>)serializedPageReferences, this.onPagesReleased);
    }

    @Override
    public ListenableFuture<BufferResult> get(PipelinedOutputBuffers.OutputBufferId outputBufferId, long startingSequenceId, DataSize maxSize) {
        Objects.requireNonNull(outputBufferId, "outputBufferId is null");
        Preconditions.checkArgument((maxSize.toBytes() > 0L ? 1 : 0) != 0, (Object)"maxSize must be at least 1 byte");
        return this.partitions.get(outputBufferId.getId()).getPages(startingSequenceId, maxSize);
    }

    @Override
    public void acknowledge(PipelinedOutputBuffers.OutputBufferId outputBufferId, long sequenceId) {
        Objects.requireNonNull(outputBufferId, "outputBufferId is null");
        this.partitions.get(outputBufferId.getId()).acknowledgePages(sequenceId);
    }

    @Override
    public void destroy(PipelinedOutputBuffers.OutputBufferId bufferId) {
        Objects.requireNonNull(bufferId, "bufferId is null");
        this.partitions.get(bufferId.getId()).destroy();
        this.checkFlushComplete();
    }

    @Override
    public void setNoMorePages() {
        this.stateMachine.noMorePages();
        this.memoryManager.setNoBlockOnFull();
        this.partitions.forEach(ClientBuffer::setNoMorePages);
        this.checkFlushComplete();
    }

    @Override
    public void destroy() {
        if (this.stateMachine.finish()) {
            this.partitions.forEach(ClientBuffer::destroy);
            this.memoryManager.setNoBlockOnFull();
            this.forceFreeMemory();
        }
    }

    @Override
    public void abort() {
        if (this.stateMachine.abort()) {
            this.memoryManager.setNoBlockOnFull();
            this.forceFreeMemory();
        }
    }

    @Override
    public long getPeakMemoryUsage() {
        return this.memoryManager.getPeakMemoryUsage();
    }

    @Override
    public Optional<Throwable> getFailureCause() {
        return this.stateMachine.getFailureCause();
    }

    @VisibleForTesting
    void forceFreeMemory() {
        this.memoryManager.close();
    }

    private void checkFlushComplete() {
        BufferState state = this.stateMachine.getState();
        if (state != BufferState.FLUSHING && state != BufferState.NO_MORE_BUFFERS) {
            return;
        }
        if (this.partitions.stream().allMatch(ClientBuffer::isDestroyed)) {
            this.destroy();
        }
    }

    @VisibleForTesting
    OutputBufferMemoryManager getMemoryManager() {
        return this.memoryManager;
    }
}

