/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.buffer;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.BufferTestUtils;
import com.facebook.presto.execution.buffer.ClientBuffer;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.PageBufferInfo;
import com.facebook.presto.execution.buffer.SerializedPageReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestClientBuffer {
    private static final String TASK_INSTANCE_ID = "task-instance-id";
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of((Object)BigintType.BIGINT);
    private static final OutputBuffers.OutputBufferId BUFFER_ID = new OutputBuffers.OutputBufferId(33);
    private static final String INVALID_SEQUENCE_ID = "Invalid sequence id";
    private static final SerializedPageReference.PagesReleasedListener NOOP_RELEASE_LISTENER = (lifespan, releasedPagesCount, releasedSizeInBytes) -> {};

    @Test
    public void testSimplePushBuffer() {
        int i;
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        for (i = 0; i < 3; ++i) {
            TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(i));
        }
        TestClientBuffer.assertBufferInfo(buffer, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 0L, BufferTestUtils.sizeOfPages(10).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        TestClientBuffer.assertBufferInfo(buffer, 3, 0);
        buffer.getPages(3L, BufferTestUtils.sizeOfPages(10).toBytes()).cancel(true);
        TestClientBuffer.assertBufferInfo(buffer, 0, 3);
        for (i = 3; i < 6; ++i) {
            TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(i));
        }
        TestClientBuffer.assertBufferInfo(buffer, 3, 3);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 3L, BufferTestUtils.sizeOfPages(1).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(3L, BufferTestUtils.createPage(3), new Page[0]));
        TestClientBuffer.assertBufferInfo(buffer, 3, 3);
        buffer.setNoMorePages();
        TestClientBuffer.assertBufferInfo(buffer, 3, 3);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 4L, BufferTestUtils.sizeOfPages(1).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(4L, BufferTestUtils.createPage(4), new Page[0]));
        TestClientBuffer.assertBufferInfo(buffer, 2, 4);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 5L, BufferTestUtils.sizeOfPages(30).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(5L, BufferTestUtils.createPage(5), new Page[0]));
        TestClientBuffer.assertBufferInfo(buffer, 1, 5);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 6L, BufferTestUtils.sizeOfPages(10).toBytes(), BufferTestUtils.NO_WAIT), BufferResult.emptyResults((String)TASK_INSTANCE_ID, (long)6L, (boolean)true));
        TestClientBuffer.assertBufferInfo(buffer, 0, 6);
        buffer.destroy();
        TestClientBuffer.assertBufferDestroyed(buffer, 6);
    }

    @Test
    public void testSimplePullBuffer() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        TestingPagesSupplier supplier = new TestingPagesSupplier();
        for (int i = 0; i < 3; ++i) {
            supplier.addPage(BufferTestUtils.createPage(i));
        }
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)3);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, supplier, 0L, BufferTestUtils.sizeOfPages(10).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)0);
        TestClientBuffer.assertBufferInfo(buffer, 3, 0);
        ListenableFuture pendingRead = buffer.getPages(3L, BufferTestUtils.sizeOfPages(1).toBytes());
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)0);
        TestClientBuffer.assertBufferInfo(buffer, 0, 3);
        Assert.assertFalse((boolean)pendingRead.isDone());
        for (int i = 3; i < 6; ++i) {
            supplier.addPage(BufferTestUtils.createPage(i));
        }
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)3);
        buffer.loadPagesIfNecessary((ClientBuffer.PagesSupplier)supplier);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture((ListenableFuture<BufferResult>)pendingRead, BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(3L, BufferTestUtils.createPage(3), new Page[0]));
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)2);
        TestClientBuffer.assertBufferInfo(buffer, 1, 3);
        supplier.setNoMorePages();
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)2);
        TestClientBuffer.assertBufferInfo(buffer, 1, 3);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, supplier, 4L, BufferTestUtils.sizeOfPages(1).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(4L, BufferTestUtils.createPage(4), new Page[0]));
        TestClientBuffer.assertBufferInfo(buffer, 1, 4);
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)1);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, supplier, 5L, BufferTestUtils.sizeOfPages(30).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(5L, BufferTestUtils.createPage(5), new Page[0]));
        TestClientBuffer.assertBufferInfo(buffer, 1, 5);
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)0);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, supplier, 6L, BufferTestUtils.sizeOfPages(10).toBytes(), BufferTestUtils.NO_WAIT), BufferResult.emptyResults((String)TASK_INSTANCE_ID, (long)6L, (boolean)true));
        TestClientBuffer.assertBufferInfo(buffer, 0, 6);
        Assert.assertEquals((int)supplier.getBufferedPages(), (int)0);
        buffer.destroy();
        TestClientBuffer.assertBufferDestroyed(buffer, 6);
    }

    @Test
    public void testBufferResults() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        long totalSizeOfPagesInBytes = 0L;
        for (int i = 0; i < 3; ++i) {
            Page page = BufferTestUtils.createPage(i);
            SerializedPageReference pageReference = new SerializedPageReference(BufferTestUtils.PAGES_SERDE.serialize(page), 1, Lifespan.taskWide());
            totalSizeOfPagesInBytes += pageReference.getSerializedPage().getRetainedSizeInBytes();
            TestClientBuffer.addPage(buffer, page);
        }
        TestClientBuffer.assertBufferInfo(buffer, 3, 0, totalSizeOfPagesInBytes);
        BufferResult bufferResult = TestClientBuffer.getBufferResult(buffer, 0L, BufferTestUtils.sizeOfPages(1).toBytes(), BufferTestUtils.NO_WAIT);
        long remainingBytes = totalSizeOfPagesInBytes - bufferResult.getBufferedBytes();
        Assert.assertEquals((long)bufferResult.getBufferedBytes(), (long)BufferTestUtils.sizeOfPages(1).toBytes());
        TestClientBuffer.assertBufferInfo(buffer, 3, 0, totalSizeOfPagesInBytes);
        buffer.acknowledgePages(bufferResult.getNextToken());
        TestClientBuffer.assertBufferInfo(buffer, 2, 1, remainingBytes);
    }

    @Test
    public void testDuplicateRequests() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        for (int i = 0; i < 3; ++i) {
            TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(i));
        }
        TestClientBuffer.assertBufferInfo(buffer, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 0L, BufferTestUtils.sizeOfPages(10).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        TestClientBuffer.assertBufferInfo(buffer, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 0L, BufferTestUtils.sizeOfPages(10).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        TestClientBuffer.assertBufferInfo(buffer, 3, 0);
        buffer.getPages(3L, 10L).cancel(true);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 0L, BufferTestUtils.sizeOfPages(10).toBytes(), BufferTestUtils.NO_WAIT), BufferResult.emptyResults((String)TASK_INSTANCE_ID, (long)0L, (boolean)false));
        TestClientBuffer.assertBufferInfo(buffer, 0, 3);
    }

    @Test
    public void testAddAfterNoMorePages() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        buffer.setNoMorePages();
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        TestClientBuffer.assertBufferInfo(buffer, 0, 0);
    }

    @Test
    public void testAddAfterDestroy() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        buffer.destroy();
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        TestClientBuffer.assertBufferDestroyed(buffer, 0);
    }

    @Test
    public void testDestroy() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        for (int i = 0; i < 5; ++i) {
            TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(i));
        }
        buffer.setNoMorePages();
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 0L, BufferTestUtils.sizeOfPages(1).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        buffer.destroy();
        TestClientBuffer.assertBufferDestroyed(buffer, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 1L, BufferTestUtils.sizeOfPages(1).toBytes(), BufferTestUtils.NO_WAIT), BufferResult.emptyResults((String)TASK_INSTANCE_ID, (long)0L, (boolean)true));
    }

    @Test
    public void testNoMorePagesFreesReader() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        ListenableFuture future = buffer.getPages(0L, BufferTestUtils.sizeOfPages(10).toBytes());
        Assert.assertFalse((boolean)future.isDone());
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture((ListenableFuture<BufferResult>)future, BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        future = buffer.getPages(1L, BufferTestUtils.sizeOfPages(10).toBytes());
        Assert.assertFalse((boolean)future.isDone());
        buffer.setNoMorePages();
        TestClientBuffer.assertBufferInfo(buffer, 0, 1);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture((ListenableFuture<BufferResult>)future, BufferTestUtils.NO_WAIT), BufferResult.emptyResults((String)TASK_INSTANCE_ID, (long)1L, (boolean)true));
    }

    @Test
    public void testDestroyFreesReader() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        ListenableFuture future = buffer.getPages(0L, BufferTestUtils.sizeOfPages(10).toBytes());
        Assert.assertFalse((boolean)future.isDone());
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        Assert.assertTrue((boolean)future.isDone());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture((ListenableFuture<BufferResult>)future, BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        future = buffer.getPages(1L, BufferTestUtils.sizeOfPages(10).toBytes());
        Assert.assertFalse((boolean)future.isDone());
        buffer.destroy();
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture((ListenableFuture<BufferResult>)future, BufferTestUtils.NO_WAIT), BufferResult.emptyResults((String)TASK_INSTANCE_ID, (long)1L, (boolean)false));
        TestClientBuffer.assertBufferDestroyed(buffer, 1);
    }

    @Test
    public void testInvalidTokenFails() {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(1));
        buffer.getPages(1L, BufferTestUtils.sizeOfPages(10).toBytes()).cancel(true);
        TestClientBuffer.assertBufferInfo(buffer, 1, 1);
        TestClientBuffer.assertInvalidSequenceId(buffer, -1);
        TestClientBuffer.assertBufferInfo(buffer, 1, 1);
        TestClientBuffer.assertInvalidSequenceId(buffer, 10);
        TestClientBuffer.assertBufferInfo(buffer, 1, 1);
    }

    @Test
    public void testReferenceCount() {
        AtomicInteger releasedPages = new AtomicInteger(0);
        SerializedPageReference.PagesReleasedListener onPagesReleased = (lifespan, releasedPagesCount, releasedSizeInBytes) -> releasedPages.addAndGet(releasedPagesCount);
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, onPagesReleased);
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(0));
        TestClientBuffer.addPage(buffer, BufferTestUtils.createPage(1));
        Assert.assertEquals((int)releasedPages.get(), (int)0);
        TestClientBuffer.assertBufferInfo(buffer, 2, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 0L, BufferTestUtils.sizeOfPages(0).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        Assert.assertEquals((int)releasedPages.get(), (int)0);
        TestClientBuffer.assertBufferInfo(buffer, 2, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, TestClientBuffer.getBufferResult(buffer, 1L, BufferTestUtils.sizeOfPages(1).toBytes(), BufferTestUtils.NO_WAIT), TestClientBuffer.bufferResult(1L, BufferTestUtils.createPage(1), new Page[0]));
        Assert.assertEquals((int)releasedPages.get(), (int)1);
        TestClientBuffer.assertBufferInfo(buffer, 1, 1);
        buffer.destroy();
        Assert.assertEquals((int)releasedPages.get(), (int)2);
        TestClientBuffer.assertBufferDestroyed(buffer, 1);
    }

    private static void assertInvalidSequenceId(ClientBuffer buffer, int sequenceId) {
        try {
            buffer.getPages((long)sequenceId, BufferTestUtils.sizeOfPages(10).toBytes());
            Assert.fail((String)"Expected Invalid sequence id");
        }
        catch (IllegalArgumentException e) {
            Assert.assertEquals((String)e.getMessage(), (String)INVALID_SEQUENCE_ID);
        }
    }

    private static BufferResult getBufferResult(ClientBuffer buffer, long sequenceId, long maxSizeInBytes, Duration maxWait) {
        ListenableFuture future = buffer.getPages(sequenceId, maxSizeInBytes);
        return BufferTestUtils.getFuture((ListenableFuture<BufferResult>)future, maxWait);
    }

    private static BufferResult getBufferResult(ClientBuffer buffer, ClientBuffer.PagesSupplier supplier, long sequenceId, long maxSizeInBytes, Duration maxWait) {
        ListenableFuture future = buffer.getPages(sequenceId, maxSizeInBytes, Optional.of(supplier));
        return BufferTestUtils.getFuture((ListenableFuture<BufferResult>)future, maxWait);
    }

    private static void addPage(ClientBuffer buffer, Page page) {
        TestClientBuffer.addPage(buffer, page, NOOP_RELEASE_LISTENER);
    }

    private static void addPage(ClientBuffer buffer, Page page, SerializedPageReference.PagesReleasedListener onPagesReleased) {
        SerializedPageReference serializedPageReference = new SerializedPageReference(BufferTestUtils.PAGES_SERDE.serialize(page), 1, Lifespan.taskWide());
        TestClientBuffer.addPage(buffer, serializedPageReference, onPagesReleased);
    }

    private static void addPage(ClientBuffer buffer, SerializedPageReference page, SerializedPageReference.PagesReleasedListener onPagesReleased) {
        buffer.enqueuePages((Collection)ImmutableList.of((Object)page));
        SerializedPageReference.dereferencePages((List)ImmutableList.of((Object)page), (SerializedPageReference.PagesReleasedListener)onPagesReleased);
    }

    private static void assertBufferInfo(ClientBuffer buffer, int bufferedPages, int pagesSent) {
        Assert.assertEquals((Object)buffer.getInfo(), (Object)new BufferInfo(BUFFER_ID, false, bufferedPages, (long)pagesSent, new PageBufferInfo(BUFFER_ID.getId(), (long)bufferedPages, BufferTestUtils.sizeOfPages(bufferedPages).toBytes(), (long)(bufferedPages + pagesSent), (long)(bufferedPages + pagesSent))));
        Assert.assertFalse((boolean)buffer.isDestroyed());
    }

    private static void assertBufferInfo(ClientBuffer buffer, int bufferedPages, int pagesSent, long bufferedBytes) {
        Assert.assertEquals((Object)buffer.getInfo(), (Object)new BufferInfo(BUFFER_ID, false, bufferedPages, (long)pagesSent, new PageBufferInfo(BUFFER_ID.getId(), (long)bufferedPages, BufferTestUtils.sizeOfPages(bufferedPages).toBytes(), (long)(bufferedPages + pagesSent), (long)(bufferedPages + pagesSent))));
        Assert.assertFalse((boolean)buffer.isDestroyed());
    }

    private static BufferResult bufferResult(long token, Page firstPage, Page ... otherPages) {
        ImmutableList pages = ImmutableList.builder().add((Object)firstPage).add((Object[])otherPages).build();
        return BufferTestUtils.createBufferResult(TASK_INSTANCE_ID, token, (List<Page>)pages);
    }

    private static void assertBufferDestroyed(ClientBuffer buffer, int pagesSent) {
        BufferInfo bufferInfo = buffer.getInfo();
        Assert.assertEquals((int)bufferInfo.getBufferedPages(), (int)0);
        Assert.assertEquals((long)bufferInfo.getPagesSent(), (long)pagesSent);
        Assert.assertTrue((boolean)bufferInfo.isFinished());
        Assert.assertTrue((boolean)buffer.isDestroyed());
    }

    @ThreadSafe
    private static class TestingPagesSupplier
    implements ClientBuffer.PagesSupplier {
        @GuardedBy(value="this")
        private final Deque<SerializedPageReference> buffer = new ArrayDeque<SerializedPageReference>();
        @GuardedBy(value="this")
        private boolean noMorePages;

        private TestingPagesSupplier() {
        }

        public synchronized boolean mayHaveMorePages() {
            return !this.noMorePages || !this.buffer.isEmpty();
        }

        synchronized void setNoMorePages() {
            this.noMorePages = true;
        }

        synchronized int getBufferedPages() {
            return this.buffer.size();
        }

        public synchronized void addPage(Page page) {
            Objects.requireNonNull(page, "page is null");
            Preconditions.checkState((!this.noMorePages ? 1 : 0) != 0);
            this.buffer.add(new SerializedPageReference(BufferTestUtils.PAGES_SERDE.serialize(page), 1, Lifespan.taskWide()));
        }

        public synchronized List<SerializedPageReference> getPages(long maxSizeInBytes) {
            SerializedPageReference page;
            ArrayList<SerializedPageReference> pages = new ArrayList<SerializedPageReference>();
            long bytesRemoved = 0L;
            while ((page = this.buffer.peek()) != null && (pages.isEmpty() || (bytesRemoved += page.getRetainedSizeInBytes()) <= maxSizeInBytes)) {
                Preconditions.checkState((this.buffer.poll() == page ? 1 : 0) != 0, (Object)"Buffer corrupted");
                pages.add(page);
            }
            return ImmutableList.copyOf(pages);
        }
    }
}

