/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.operator.StreamingDirectExchangeBuffer;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestStreamingDirectExchangeBuffer {
    private static final StageId STAGE_ID = new StageId(new QueryId("query"), 0);
    private static final TaskId TASK_0 = new TaskId(STAGE_ID, 0, 0);
    private static final TaskId TASK_1 = new TaskId(STAGE_ID, 1, 0);
    private static final Slice PAGE_0 = Slices.utf8Slice((String)"page0");
    private static final Slice PAGE_1 = Slices.utf8Slice((String)"page-1");
    private static final Slice PAGE_2 = Slices.utf8Slice((String)"page-_2");

    @Test
    public void testHappyPath() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            ListenableFuture blocked = buffer.isBlocked();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
            buffer.addTask(TASK_0);
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
            buffer.addTask(TASK_1);
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
            buffer.noMoreTasks();
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
            buffer.addPages(TASK_0, (List)ImmutableList.of((Object)PAGE_0));
            Assertions.assertThat((int)buffer.getBufferedPageCount()).isEqualTo(1);
            Assertions.assertThat((long)buffer.getRetainedSizeInBytes()).isEqualTo(PAGE_0.getRetainedSize());
            Assertions.assertThat((long)buffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_0.getRetainedSize());
            Assertions.assertThat((long)buffer.getRemainingCapacityInBytes()).isEqualTo(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes() - PAGE_0.getRetainedSize());
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)blocked.isDone()).isTrue();
            Assertions.assertThat((Comparable)buffer.pollPage()).isEqualTo((Object)PAGE_0);
            blocked = buffer.isBlocked();
            Assertions.assertThat((long)buffer.getRetainedSizeInBytes()).isEqualTo(0L);
            Assertions.assertThat((long)buffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_0.getRetainedSize());
            Assertions.assertThat((long)buffer.getRemainingCapacityInBytes()).isEqualTo(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes());
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            buffer.taskFinished(TASK_0);
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)buffer.isBlocked().isDone()).isFalse();
            buffer.addPages(TASK_1, (List)ImmutableList.of((Object)PAGE_1, (Object)PAGE_2));
            Assertions.assertThat((int)buffer.getBufferedPageCount()).isEqualTo(2);
            Assertions.assertThat((long)buffer.getRetainedSizeInBytes()).isEqualTo(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assertions.assertThat((long)buffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assertions.assertThat((long)buffer.getRemainingCapacityInBytes()).isEqualTo(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes() - PAGE_1.getRetainedSize() - PAGE_2.getRetainedSize());
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)buffer.isBlocked().isDone()).isTrue();
            Assertions.assertThat((Comparable)buffer.pollPage()).isEqualTo((Object)PAGE_1);
            Assertions.assertThat((Comparable)buffer.pollPage()).isEqualTo((Object)PAGE_2);
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)buffer.isBlocked().isDone()).isFalse();
            Assertions.assertThat((long)buffer.getRetainedSizeInBytes()).isEqualTo(0L);
            Assertions.assertThat((long)buffer.getMaxRetainedSizeInBytes()).isEqualTo(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assertions.assertThat((long)buffer.getRemainingCapacityInBytes()).isEqualTo(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes());
            buffer.taskFinished(TASK_1);
            Assertions.assertThat((boolean)buffer.isFinished()).isTrue();
            Assertions.assertThat((boolean)blocked.isDone()).isTrue();
        }
    }

    @Test
    public void testClose() {
        StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        buffer.addTask(TASK_0);
        buffer.addTask(TASK_1);
        Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
        Assertions.assertThat((boolean)buffer.isBlocked().isDone()).isFalse();
        Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
        buffer.close();
        Assertions.assertThat((boolean)buffer.isFinished()).isTrue();
        Assertions.assertThat((boolean)buffer.isBlocked().isDone()).isTrue();
        Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
    }

    @Test
    public void testIsFinished() {
        ListenableFuture blocked;
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            blocked = buffer.isBlocked();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            buffer.noMoreTasks();
            Assertions.assertThat((boolean)buffer.isFinished()).isTrue();
            Assertions.assertThat((boolean)blocked.isDone()).isTrue();
        }
        buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        try {
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            blocked = buffer.isBlocked();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            buffer.addTask(TASK_0);
            buffer.noMoreTasks();
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            buffer.taskFinished(TASK_0);
            Assertions.assertThat((boolean)buffer.isFinished()).isTrue();
            Assertions.assertThat((boolean)blocked.isDone()).isTrue();
        }
        finally {
            buffer.close();
        }
        buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        try {
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            blocked = buffer.isBlocked();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            buffer.addTask(TASK_0);
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)blocked.isDone()).isFalse();
            RuntimeException error = new RuntimeException();
            buffer.taskFailed(TASK_0, (Throwable)error);
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)buffer.isFailed()).isTrue();
            Assertions.assertThat((boolean)blocked.isDone()).isTrue();
            Assertions.assertThatThrownBy(() -> ((StreamingDirectExchangeBuffer)buffer).pollPage()).isEqualTo((Object)error);
        }
        finally {
            buffer.close();
        }
    }

    @Test
    public void testFutureCancellationDoesNotAffectOtherFutures() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            ListenableFuture blocked1 = buffer.isBlocked();
            ListenableFuture blocked2 = buffer.isBlocked();
            ListenableFuture blocked3 = buffer.isBlocked();
            Assertions.assertThat((boolean)blocked1.isDone()).isFalse();
            Assertions.assertThat((boolean)blocked2.isDone()).isFalse();
            Assertions.assertThat((boolean)blocked3.isDone()).isFalse();
            blocked3.cancel(true);
            Assertions.assertThat((boolean)blocked1.isDone()).isFalse();
            Assertions.assertThat((boolean)blocked2.isDone()).isFalse();
            buffer.noMoreTasks();
            Assertions.assertThat((boolean)buffer.isFinished()).isTrue();
            Assertions.assertThat((boolean)blocked1.isDone()).isTrue();
            Assertions.assertThat((boolean)blocked2.isDone()).isTrue();
        }
    }

    @Test
    public void testRemoteTaskFailedError() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            buffer.addTask(TASK_0);
            buffer.taskFailed(TASK_0, (Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
            buffer.noMoreTasks();
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)buffer.isFailed()).isFalse();
            Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
        }
        buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        try {
            buffer.addTask(TASK_0);
            buffer.noMoreTasks();
            buffer.taskFailed(TASK_0, (Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            Assertions.assertThat((boolean)buffer.isFailed()).isFalse();
            Assertions.assertThat((Comparable)buffer.pollPage()).isNull();
        }
        finally {
            buffer.close();
        }
    }

    @Test
    public void testSingleWakeUp() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assertions.assertThat((boolean)buffer.isFinished()).isFalse();
            ListenableFuture blocked1 = buffer.isBlocked();
            ListenableFuture blocked2 = buffer.isBlocked();
            Assertions.assertThat((boolean)blocked1.isDone()).isFalse();
            Assertions.assertThat((boolean)blocked2.isDone()).isFalse();
            buffer.addTask(TASK_0);
            buffer.addPages(TASK_0, (List)ImmutableList.of((Object)PAGE_0));
            buffer.pollPage();
            Assertions.assertThat((boolean)blocked1.isDone()).isTrue();
            Assertions.assertThat((boolean)blocked2.isDone()).isFalse();
            buffer.addPages(TASK_0, (List)ImmutableList.of((Object)PAGE_0));
            buffer.pollPage();
            Assertions.assertThat((boolean)blocked2.isDone()).isTrue();
        }
    }
}

