/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.buffer;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.BufferState;
import io.trino.execution.buffer.OutputBufferStateMachine;
import io.trino.execution.buffer.PageSerializer;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.SpoolingExchangeOutputBuffer;
import io.trino.execution.buffer.SpoolingOutputBuffers;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.QueryId;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.block.TestingBlockEncodingSerde;
import io.trino.spi.block.VariableWidthBlockBuilder;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.type.VarcharType;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestSpoolingExchangeOutputBuffer {
    @Test
    public void testIsFull() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        TestSpoolingExchangeOutputBuffer.assertNotBlocked((ListenableFuture<Void>)outputBuffer.isFull());
        CompletableFuture<Void> blocked = new CompletableFuture<Void>();
        exchangeSink.setBlocked(blocked);
        ListenableFuture full = outputBuffer.isFull();
        TestSpoolingExchangeOutputBuffer.assertBlocked((ListenableFuture<Void>)full);
        blocked.complete(null);
        TestSpoolingExchangeOutputBuffer.assertNotBlocked((ListenableFuture<Void>)full);
    }

    @Test
    public void testFinishSuccess() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> finish = new CompletableFuture<Void>();
        exchangeSink.setFinish(finish);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.setNoMorePages();
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FLUSHING);
        finish.complete(null);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FINISHED);
    }

    @Test
    public void testFinishFailure() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> finish = new CompletableFuture<Void>();
        exchangeSink.setFinish(finish);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.setNoMorePages();
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FLUSHING);
        RuntimeException failure = new RuntimeException("failure");
        finish.completeExceptionally(failure);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FAILED);
        Assertions.assertThat((Optional)outputBuffer.getFailureCause()).isEqualTo(Optional.of(failure));
    }

    @Test
    public void testDestroyAfterFinishCompletion() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> finish = new CompletableFuture<Void>();
        exchangeSink.setFinish(finish);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.setNoMorePages();
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FLUSHING);
        finish.complete(null);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FINISHED);
        outputBuffer.destroy();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FINISHED);
    }

    @Test
    public void testDestroyBeforeFinishCompletion() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> finish = new CompletableFuture<Void>();
        exchangeSink.setFinish(finish);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FLUSHING);
        outputBuffer.destroy();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
        finish.complete(null);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
    }

    @Test
    public void testAbortBeforeNoMorePages() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.abort();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
    }

    @Test
    public void testAbortBeforeFinishCompletion() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> finish = new CompletableFuture<Void>();
        exchangeSink.setFinish(finish);
        CompletableFuture<Void> abort = new CompletableFuture<Void>();
        exchangeSink.setAbort(abort);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.setNoMorePages();
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FLUSHING);
        outputBuffer.abort();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
        abort.completeExceptionally(new RuntimeException("failure"));
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
    }

    @Test
    public void testAbortAfterFinishCompletion() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> finish = new CompletableFuture<Void>();
        exchangeSink.setFinish(finish);
        CompletableFuture<Void> abort = new CompletableFuture<Void>();
        exchangeSink.setAbort(abort);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.setNoMorePages();
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FLUSHING);
        finish.complete(null);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FINISHED);
        outputBuffer.abort();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FINISHED);
        abort.complete(null);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FINISHED);
    }

    @Test
    public void testEnqueueAfterFinish() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> finish = new CompletableFuture<Void>();
        exchangeSink.setFinish(finish);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.enqueue(0, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page1")));
        outputBuffer.enqueue(1, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page2"), (Object)TestSpoolingExchangeOutputBuffer.createPage("page3")));
        ImmutableListMultimap expectedDataBufferState = ImmutableListMultimap.builder().put((Object)0, (Object)TestSpoolingExchangeOutputBuffer.createPage("page1")).put((Object)1, (Object)TestSpoolingExchangeOutputBuffer.createPage("page2")).put((Object)1, (Object)TestSpoolingExchangeOutputBuffer.createPage("page3")).build();
        Assertions.assertThat(exchangeSink.getDataBuffer()).isEqualTo((Object)expectedDataBufferState);
        outputBuffer.setNoMorePages();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FLUSHING);
        outputBuffer.enqueue(0, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page4")));
        Assertions.assertThat(exchangeSink.getDataBuffer()).isEqualTo((Object)expectedDataBufferState);
        finish.complete(null);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.FINISHED);
        outputBuffer.enqueue(0, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page5")));
        Assertions.assertThat(exchangeSink.getDataBuffer()).isEqualTo((Object)expectedDataBufferState);
    }

    @Test
    public void testEnqueueAfterAbort() {
        TestingExchangeSink exchangeSink = new TestingExchangeSink();
        CompletableFuture<Void> abort = new CompletableFuture<Void>();
        exchangeSink.setAbort(abort);
        SpoolingExchangeOutputBuffer outputBuffer = TestSpoolingExchangeOutputBuffer.createSpoolingExchangeOutputBuffer(exchangeSink, 2);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.NO_MORE_BUFFERS);
        outputBuffer.enqueue(0, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page1")));
        outputBuffer.enqueue(1, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page2"), (Object)TestSpoolingExchangeOutputBuffer.createPage("page3")));
        ImmutableListMultimap expectedDataBufferState = ImmutableListMultimap.builder().put((Object)0, (Object)TestSpoolingExchangeOutputBuffer.createPage("page1")).put((Object)1, (Object)TestSpoolingExchangeOutputBuffer.createPage("page2")).put((Object)1, (Object)TestSpoolingExchangeOutputBuffer.createPage("page3")).build();
        Assertions.assertThat(exchangeSink.getDataBuffer()).isEqualTo((Object)expectedDataBufferState);
        outputBuffer.abort();
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
        outputBuffer.enqueue(0, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page4")));
        Assertions.assertThat(exchangeSink.getDataBuffer()).isEqualTo((Object)expectedDataBufferState);
        abort.complete(null);
        Assertions.assertThat((Comparable)outputBuffer.getState()).isEqualTo((Object)BufferState.ABORTED);
        outputBuffer.enqueue(0, (List)ImmutableList.of((Object)TestSpoolingExchangeOutputBuffer.createPage("page5")));
        Assertions.assertThat(exchangeSink.getDataBuffer()).isEqualTo((Object)expectedDataBufferState);
    }

    private static SpoolingExchangeOutputBuffer createSpoolingExchangeOutputBuffer(ExchangeSink exchangeSink, int outputPartitionCount) {
        return new SpoolingExchangeOutputBuffer(new OutputBufferStateMachine(new TaskId(new StageId(new QueryId("query"), 0), 0, 0), MoreExecutors.directExecutor()), SpoolingOutputBuffers.createInitial((ExchangeSinkInstanceHandle)TestingExchangeSinkInstanceHandle.INSTANCE, (int)outputPartitionCount), exchangeSink, TestingLocalMemoryContext::new);
    }

    private static void assertNotBlocked(ListenableFuture<Void> blocked) {
        Assertions.assertThat((boolean)blocked.isDone()).isTrue();
    }

    private static void assertBlocked(ListenableFuture<Void> blocked) {
        Assertions.assertThat((boolean)blocked.isDone()).isFalse();
    }

    private static Slice createPage(String value) {
        PageBuilder pageBuilder = new PageBuilder((List)ImmutableList.of((Object)VarcharType.VARCHAR));
        pageBuilder.declarePosition();
        Slice valueSlice = Slices.utf8Slice((String)value);
        VariableWidthBlockBuilder blockBuilder = (VariableWidthBlockBuilder)pageBuilder.getBlockBuilder(0);
        blockBuilder.writeEntry(valueSlice);
        Page page = pageBuilder.build();
        PageSerializer serializer = new PagesSerdeFactory((BlockEncodingSerde)new TestingBlockEncodingSerde(), false).createSerializer(Optional.empty());
        return serializer.serialize(page);
    }

    private static class TestingExchangeSink
    implements ExchangeSink {
        private final ListMultimap<Integer, Slice> dataBuffer = ArrayListMultimap.create();
        private CompletableFuture<Void> blocked = CompletableFuture.completedFuture(null);
        private CompletableFuture<Void> finish = CompletableFuture.completedFuture(null);
        private CompletableFuture<Void> abort = CompletableFuture.completedFuture(null);
        private boolean finishCalled;
        private boolean abortCalled;

        private TestingExchangeSink() {
        }

        public boolean isHandleUpdateRequired() {
            return false;
        }

        public void updateHandle(ExchangeSinkInstanceHandle handle) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Void> isBlocked() {
            return this.blocked;
        }

        public void setBlocked(CompletableFuture<Void> blocked) {
            this.blocked = Objects.requireNonNull(blocked, "blocked is null");
        }

        public void add(int partitionId, Slice data) {
            this.dataBuffer.put((Object)partitionId, (Object)data);
        }

        public ListMultimap<Integer, Slice> getDataBuffer() {
            return this.dataBuffer;
        }

        public long getMemoryUsage() {
            return 0L;
        }

        public CompletableFuture<Void> finish() {
            Assertions.assertThat((boolean)this.abortCalled).isFalse();
            Assertions.assertThat((boolean)this.finishCalled).isFalse();
            this.finishCalled = true;
            return this.finish;
        }

        public void setFinish(CompletableFuture<Void> finish) {
            this.finish = Objects.requireNonNull(finish, "finish is null");
        }

        public CompletableFuture<Void> abort() {
            Assertions.assertThat((boolean)this.abortCalled).isFalse();
            this.abortCalled = true;
            return this.abort;
        }

        public void setAbort(CompletableFuture<Void> abort) {
            this.abort = Objects.requireNonNull(abort, "abort is null");
        }
    }

    private static enum TestingExchangeSinkInstanceHandle implements ExchangeSinkInstanceHandle
    {
        INSTANCE;

    }

    private static class TestingLocalMemoryContext
    implements LocalMemoryContext {
        private TestingLocalMemoryContext() {
        }

        public long getBytes() {
            return 0L;
        }

        public ListenableFuture<Void> setBytes(long bytes) {
            return Futures.immediateVoidFuture();
        }

        public boolean trySetBytes(long bytes) {
            return true;
        }

        public void close() {
        }
    }
}

