/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.operator.StreamingDirectExchangeBuffer;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testng.Assert;

public class TestStreamingDirectExchangeBuffer {
    private static final StageId STAGE_ID = new StageId(new QueryId("query"), 0);
    private static final TaskId TASK_0 = new TaskId(STAGE_ID, 0, 0);
    private static final TaskId TASK_1 = new TaskId(STAGE_ID, 1, 0);
    private static final Slice PAGE_0 = Slices.utf8Slice((String)"page0");
    private static final Slice PAGE_1 = Slices.utf8Slice((String)"page-1");
    private static final Slice PAGE_2 = Slices.utf8Slice((String)"page-_2");

    @Test
    public void testHappyPath() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assert.assertFalse((boolean)buffer.isFinished());
            ListenableFuture blocked = buffer.isBlocked();
            Assert.assertFalse((boolean)blocked.isDone());
            Assert.assertNull((Object)buffer.pollPage());
            buffer.addTask(TASK_0);
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)blocked.isDone());
            Assert.assertNull((Object)buffer.pollPage());
            buffer.addTask(TASK_1);
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)blocked.isDone());
            Assert.assertNull((Object)buffer.pollPage());
            buffer.noMoreTasks();
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)blocked.isDone());
            Assert.assertNull((Object)buffer.pollPage());
            buffer.addPages(TASK_0, (List)ImmutableList.of((Object)PAGE_0));
            Assert.assertEquals((int)buffer.getBufferedPageCount(), (int)1);
            Assert.assertEquals((long)buffer.getRetainedSizeInBytes(), (long)PAGE_0.getRetainedSize());
            Assert.assertEquals((long)buffer.getMaxRetainedSizeInBytes(), (long)PAGE_0.getRetainedSize());
            Assert.assertEquals((long)buffer.getRemainingCapacityInBytes(), (long)(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes() - PAGE_0.getRetainedSize()));
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertTrue((boolean)blocked.isDone());
            Assert.assertEquals((Object)buffer.pollPage(), (Object)PAGE_0);
            blocked = buffer.isBlocked();
            Assert.assertEquals((long)buffer.getRetainedSizeInBytes(), (long)0L);
            Assert.assertEquals((long)buffer.getMaxRetainedSizeInBytes(), (long)PAGE_0.getRetainedSize());
            Assert.assertEquals((long)buffer.getRemainingCapacityInBytes(), (long)DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes());
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)blocked.isDone());
            buffer.taskFinished(TASK_0);
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)buffer.isBlocked().isDone());
            buffer.addPages(TASK_1, (List)ImmutableList.of((Object)PAGE_1, (Object)PAGE_2));
            Assert.assertEquals((int)buffer.getBufferedPageCount(), (int)2);
            Assert.assertEquals((long)buffer.getRetainedSizeInBytes(), (long)(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize()));
            Assert.assertEquals((long)buffer.getMaxRetainedSizeInBytes(), (long)(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize()));
            Assert.assertEquals((long)buffer.getRemainingCapacityInBytes(), (long)(DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes() - PAGE_1.getRetainedSize() - PAGE_2.getRetainedSize()));
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertTrue((boolean)buffer.isBlocked().isDone());
            Assert.assertEquals((Object)buffer.pollPage(), (Object)PAGE_1);
            Assert.assertEquals((Object)buffer.pollPage(), (Object)PAGE_2);
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)buffer.isBlocked().isDone());
            Assert.assertEquals((long)buffer.getRetainedSizeInBytes(), (long)0L);
            Assert.assertEquals((long)buffer.getMaxRetainedSizeInBytes(), (long)(PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize()));
            Assert.assertEquals((long)buffer.getRemainingCapacityInBytes(), (long)DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes());
            buffer.taskFinished(TASK_1);
            Assert.assertTrue((boolean)buffer.isFinished());
            Assert.assertTrue((boolean)blocked.isDone());
        }
    }

    @Test
    public void testClose() {
        StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        buffer.addTask(TASK_0);
        buffer.addTask(TASK_1);
        Assert.assertFalse((boolean)buffer.isFinished());
        Assert.assertFalse((boolean)buffer.isBlocked().isDone());
        Assert.assertNull((Object)buffer.pollPage());
        buffer.close();
        Assert.assertTrue((boolean)buffer.isFinished());
        Assert.assertTrue((boolean)buffer.isBlocked().isDone());
        Assert.assertNull((Object)buffer.pollPage());
    }

    @Test
    public void testIsFinished() {
        ListenableFuture blocked;
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assert.assertFalse((boolean)buffer.isFinished());
            blocked = buffer.isBlocked();
            Assert.assertFalse((boolean)blocked.isDone());
            buffer.noMoreTasks();
            Assert.assertTrue((boolean)buffer.isFinished());
            Assert.assertTrue((boolean)blocked.isDone());
        }
        buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse((boolean)buffer.isFinished());
            blocked = buffer.isBlocked();
            Assert.assertFalse((boolean)blocked.isDone());
            buffer.addTask(TASK_0);
            buffer.noMoreTasks();
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)blocked.isDone());
            buffer.taskFinished(TASK_0);
            Assert.assertTrue((boolean)buffer.isFinished());
            Assert.assertTrue((boolean)blocked.isDone());
        }
        finally {
            buffer.close();
        }
        buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse((boolean)buffer.isFinished());
            blocked = buffer.isBlocked();
            Assert.assertFalse((boolean)blocked.isDone());
            buffer.addTask(TASK_0);
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)blocked.isDone());
            RuntimeException error = new RuntimeException();
            buffer.taskFailed(TASK_0, (Throwable)error);
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertTrue((boolean)buffer.isFailed());
            Assert.assertTrue((boolean)blocked.isDone());
            Assertions.assertThatThrownBy(() -> ((StreamingDirectExchangeBuffer)buffer).pollPage()).isEqualTo((Object)error);
        }
        finally {
            buffer.close();
        }
    }

    @Test
    public void testFutureCancellationDoesNotAffectOtherFutures() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assert.assertFalse((boolean)buffer.isFinished());
            ListenableFuture blocked1 = buffer.isBlocked();
            ListenableFuture blocked2 = buffer.isBlocked();
            ListenableFuture blocked3 = buffer.isBlocked();
            Assert.assertFalse((boolean)blocked1.isDone());
            Assert.assertFalse((boolean)blocked2.isDone());
            Assert.assertFalse((boolean)blocked3.isDone());
            blocked3.cancel(true);
            Assert.assertFalse((boolean)blocked1.isDone());
            Assert.assertFalse((boolean)blocked2.isDone());
            buffer.noMoreTasks();
            Assert.assertTrue((boolean)buffer.isFinished());
            Assert.assertTrue((boolean)blocked1.isDone());
            Assert.assertTrue((boolean)blocked2.isDone());
        }
    }

    @Test
    public void testRemoteTaskFailedError() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            buffer.addTask(TASK_0);
            buffer.taskFailed(TASK_0, (Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
            buffer.noMoreTasks();
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)buffer.isFailed());
            Assert.assertNull((Object)buffer.pollPage());
        }
        buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));
        try {
            buffer.addTask(TASK_0);
            buffer.noMoreTasks();
            buffer.taskFailed(TASK_0, (Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
            Assert.assertFalse((boolean)buffer.isFinished());
            Assert.assertFalse((boolean)buffer.isFailed());
            Assert.assertNull((Object)buffer.pollPage());
        }
        finally {
            buffer.close();
        }
    }

    @Test
    public void testSingleWakeUp() {
        try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of((long)1L, (DataSize.Unit)DataSize.Unit.KILOBYTE));){
            Assert.assertFalse((boolean)buffer.isFinished());
            ListenableFuture blocked1 = buffer.isBlocked();
            ListenableFuture blocked2 = buffer.isBlocked();
            Assert.assertFalse((boolean)blocked1.isDone());
            Assert.assertFalse((boolean)blocked2.isDone());
            buffer.addTask(TASK_0);
            buffer.addPages(TASK_0, (List)ImmutableList.of((Object)PAGE_0));
            buffer.pollPage();
            Assert.assertTrue((boolean)blocked1.isDone());
            Assert.assertFalse((boolean)blocked2.isDone());
            buffer.addPages(TASK_0, (List)ImmutableList.of((Object)PAGE_0));
            buffer.pollPage();
            Assert.assertTrue((boolean)blocked2.isDone());
        }
    }
}

