/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.Threads;
import io.airlift.slice.Slice;
import io.trino.RowPagesBuilder;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.connector.CatalogName;
import io.trino.connector.CatalogServiceProvider;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.metadata.OutputTableHandle;
import io.trino.metadata.TestingFunctionResolution;
import io.trino.operator.AggregationOperator;
import io.trino.operator.DevNullOperator;
import io.trino.operator.DriverContext;
import io.trino.operator.Operator;
import io.trino.operator.OperatorContext;
import io.trino.operator.OperatorFactory;
import io.trino.operator.PageAssertions;
import io.trino.operator.TableWriterOperator;
import io.trino.operator.aggregation.TestingAggregationFunction;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.split.PageSinkManager;
import io.trino.sql.analyzer.TypeSignatureProvider;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.tree.QualifiedName;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingTaskContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestTableWriterOperator {
    private static final CatalogName CONNECTOR_ID = new CatalogName("testConnectorId");
    private static final TestingAggregationFunction LONG_MAX = new TestingFunctionResolution().getAggregateFunction(QualifiedName.of((String)"max"), TypeSignatureProvider.fromTypes((Type[])new Type[]{BigintType.BIGINT}));
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;

    @BeforeClass
    public void setUp() {
        this.executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-%s")));
        this.scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-scheduledExecutor-%s")));
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() {
        this.executor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
    }

    @Test
    public void testBlockedPageSink() {
        BlockingPageSink blockingPageSink = new BlockingPageSink();
        Operator operator = this.createTableWriterOperator(blockingPageSink);
        Assert.assertTrue((boolean)operator.isBlocked().isDone());
        Assert.assertFalse((boolean)operator.isFinished());
        Assert.assertTrue((boolean)operator.needsInput());
        operator.addInput(RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).row(42).build().get(0));
        Assert.assertFalse((boolean)operator.isBlocked().isDone());
        Assert.assertFalse((boolean)operator.isFinished());
        Assert.assertFalse((boolean)operator.needsInput());
        Assert.assertNull((Object)operator.getOutput());
        blockingPageSink.complete();
        Assert.assertTrue((boolean)operator.isBlocked().isDone());
        Assert.assertFalse((boolean)operator.isFinished());
        Assert.assertTrue((boolean)operator.needsInput());
        operator.addInput(RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).row(44).build().get(0));
        Assert.assertFalse((boolean)operator.isBlocked().isDone());
        Assert.assertFalse((boolean)operator.isFinished());
        Assert.assertFalse((boolean)operator.needsInput());
        operator.finish();
        Assert.assertFalse((boolean)operator.isBlocked().isDone());
        Assert.assertFalse((boolean)operator.isFinished());
        Assert.assertFalse((boolean)operator.needsInput());
        blockingPageSink.complete();
        ImmutableList expectedTypes = ImmutableList.of((Object)BigintType.BIGINT, (Object)VarbinaryType.VARBINARY);
        PageAssertions.assertPageEquals((List<? extends Type>)expectedTypes, operator.getOutput(), RowPagesBuilder.rowPagesBuilder((Iterable<Type>)expectedTypes).row(2, null).build().get(0));
        Assert.assertTrue((boolean)operator.isBlocked().isDone());
        Assert.assertTrue((boolean)operator.isFinished());
        Assert.assertFalse((boolean)operator.needsInput());
    }

    @Test
    public void addInputFailsOnBlockedOperator() {
        Operator operator = this.createTableWriterOperator(new BlockingPageSink());
        operator.addInput(RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).row(42).build().get(0));
        Assert.assertFalse((boolean)operator.isBlocked().isDone());
        Assert.assertFalse((boolean)operator.needsInput());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> operator.addInput(RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).row(42).build().get(0))).isInstanceOf(IllegalStateException.class)).hasMessage("Operator does not need input");
    }

    @Test
    public void testTableWriterInfo() {
        PageSinkManager pageSinkManager = new PageSinkManager(CatalogServiceProvider.singleton((CatalogName)CONNECTOR_ID, (Object)new ConstantPageSinkProvider(new TableWriteInfoTestPageSink())));
        TableWriterOperator tableWriterOperator = (TableWriterOperator)this.createTableWriterOperator(pageSinkManager, (OperatorFactory)new DevNullOperator.DevNullOperatorFactory(1, new PlanNodeId("test")), (List<Type>)ImmutableList.of((Object)BigintType.BIGINT, (Object)VarbinaryType.VARBINARY));
        RowPagesBuilder rowPagesBuilder = RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT});
        for (int i = 0; i < 100; ++i) {
            rowPagesBuilder.addSequencePage(100, 0);
        }
        List<Page> pages = rowPagesBuilder.build();
        long peakMemoryUsage = 0L;
        long validationCpuNanos = 0L;
        for (int i = 0; i < pages.size(); ++i) {
            Page page = pages.get(i);
            tableWriterOperator.addInput(page);
            TableWriterOperator.TableWriterInfo info = tableWriterOperator.getInfo();
            Assert.assertEquals((long)info.getPageSinkPeakMemoryUsage(), (long)(peakMemoryUsage += page.getRetainedSizeInBytes()));
            Assert.assertEquals((long)((long)info.getValidationCpuTime().getValue(TimeUnit.NANOSECONDS)), (long)(validationCpuNanos += (long)page.getPositionCount()));
        }
    }

    @Test
    public void testStatisticsAggregation() throws Exception {
        PageSinkManager pageSinkManager = new PageSinkManager(CatalogServiceProvider.singleton((CatalogName)CONNECTOR_ID, (Object)new ConstantPageSinkProvider(new TableWriteInfoTestPageSink())));
        ImmutableList outputTypes = ImmutableList.of((Object)BigintType.BIGINT, (Object)VarbinaryType.VARBINARY, (Object)BigintType.BIGINT);
        Session session = TestingSession.testSessionBuilder().setSystemProperty("statistics_cpu_timer_enabled", "true").build();
        DriverContext driverContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)session).addPipelineContext(0, true, true, false).addDriverContext();
        TableWriterOperator operator = (TableWriterOperator)this.createTableWriterOperator(pageSinkManager, (OperatorFactory)new AggregationOperator.AggregationOperatorFactory(1, new PlanNodeId("test"), (List)ImmutableList.of((Object)LONG_MAX.createAggregatorFactory(AggregationNode.Step.SINGLE, (List<Integer>)ImmutableList.of((Object)0), OptionalInt.empty()))), (List<Type>)outputTypes, session, driverContext);
        operator.addInput(RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).row(42).build().get(0));
        operator.addInput(RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).row(43).build().get(0));
        Assert.assertTrue((boolean)operator.isBlocked().isDone());
        Assert.assertTrue((boolean)operator.needsInput());
        Assertions.assertThat((long)driverContext.getMemoryUsage()).isGreaterThan(0L);
        operator.finish();
        Assert.assertFalse((boolean)operator.isFinished());
        PageAssertions.assertPageEquals((List<? extends Type>)outputTypes, operator.getOutput(), RowPagesBuilder.rowPagesBuilder((Iterable<Type>)outputTypes).row(null, null, 43).build().get(0));
        PageAssertions.assertPageEquals((List<? extends Type>)outputTypes, operator.getOutput(), RowPagesBuilder.rowPagesBuilder((Iterable<Type>)outputTypes).row(2, null, null).build().get(0));
        Assert.assertTrue((boolean)operator.isBlocked().isDone());
        Assert.assertFalse((boolean)operator.needsInput());
        Assert.assertTrue((boolean)operator.isFinished());
        operator.close();
        this.assertMemoryIsReleased(operator);
        TableWriterOperator.TableWriterInfo info = operator.getInfo();
        Assertions.assertThat((double)info.getStatisticsWallTime().getValue(TimeUnit.NANOSECONDS)).isGreaterThan(0.0);
        Assertions.assertThat((double)info.getStatisticsCpuTime().getValue(TimeUnit.NANOSECONDS)).isGreaterThan(0.0);
    }

    private void assertMemoryIsReleased(TableWriterOperator tableWriterOperator) {
        OperatorContext tableWriterOperatorOperatorContext = tableWriterOperator.getOperatorContext();
        MemoryTrackingContext tableWriterMemoryContext = tableWriterOperatorOperatorContext.getOperatorMemoryContext();
        Assert.assertEquals((long)tableWriterMemoryContext.getUserMemory(), (long)0L);
        Assert.assertEquals((long)tableWriterMemoryContext.getRevocableMemory(), (long)0L);
        Operator statisticAggregationOperator = tableWriterOperator.getStatisticAggregationOperator();
        Assert.assertTrue((boolean)(statisticAggregationOperator instanceof AggregationOperator));
        AggregationOperator aggregationOperator = (AggregationOperator)statisticAggregationOperator;
        OperatorContext aggregationOperatorOperatorContext = aggregationOperator.getOperatorContext();
        MemoryTrackingContext aggregationOperatorMemoryContext = aggregationOperatorOperatorContext.getOperatorMemoryContext();
        Assert.assertEquals((long)aggregationOperatorMemoryContext.getUserMemory(), (long)0L);
        Assert.assertEquals((long)aggregationOperatorMemoryContext.getRevocableMemory(), (long)0L);
    }

    private Operator createTableWriterOperator(BlockingPageSink blockingPageSink) {
        PageSinkManager pageSinkManager = new PageSinkManager(CatalogServiceProvider.singleton((CatalogName)CONNECTOR_ID, (Object)new ConstantPageSinkProvider(blockingPageSink)));
        return this.createTableWriterOperator(pageSinkManager, (OperatorFactory)new DevNullOperator.DevNullOperatorFactory(1, new PlanNodeId("test")), (List<Type>)ImmutableList.of((Object)BigintType.BIGINT, (Object)VarbinaryType.VARBINARY));
    }

    private Operator createTableWriterOperator(PageSinkManager pageSinkManager, OperatorFactory statisticsAggregation, List<Type> outputTypes) {
        return this.createTableWriterOperator(pageSinkManager, statisticsAggregation, outputTypes, SessionTestUtils.TEST_SESSION);
    }

    private Operator createTableWriterOperator(PageSinkManager pageSinkManager, OperatorFactory statisticsAggregation, List<Type> outputTypes, Session session) {
        DriverContext driverContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)session).addPipelineContext(0, true, true, false).addDriverContext();
        return this.createTableWriterOperator(pageSinkManager, statisticsAggregation, outputTypes, session, driverContext);
    }

    private Operator createTableWriterOperator(PageSinkManager pageSinkManager, OperatorFactory statisticsAggregation, List<Type> outputTypes, Session session, DriverContext driverContext) {
        ArrayList<Object> notNullColumnNames = new ArrayList<Object>(1);
        notNullColumnNames.add(null);
        SchemaTableName schemaTableName = new SchemaTableName("testSchema", "testTable");
        TableWriterOperator.TableWriterOperatorFactory factory = new TableWriterOperator.TableWriterOperatorFactory(0, new PlanNodeId("test"), pageSinkManager, (TableWriterNode.WriterTarget)new TableWriterNode.CreateTarget(new OutputTableHandle(CONNECTOR_ID, schemaTableName, new ConnectorTransactionHandle(){}, new ConnectorOutputTableHandle(){}), schemaTableName, false), (List)ImmutableList.of((Object)0), notNullColumnNames, session, statisticsAggregation, outputTypes);
        return factory.createOperator(driverContext);
    }

    private static class TableWriteInfoTestPageSink
    implements ConnectorPageSink {
        private final List<Page> pages = new ArrayList<Page>();

        private TableWriteInfoTestPageSink() {
        }

        public CompletableFuture<?> appendPage(Page page) {
            this.pages.add(page);
            return NOT_BLOCKED;
        }

        public CompletableFuture<Collection<Slice>> finish() {
            return CompletableFuture.completedFuture(ImmutableList.of());
        }

        public long getMemoryUsage() {
            long memoryUsage = 0L;
            for (Page page : this.pages) {
                memoryUsage += page.getRetainedSizeInBytes();
            }
            return memoryUsage;
        }

        public long getValidationCpuNanos() {
            long validationCpuNanos = 0L;
            for (Page page : this.pages) {
                validationCpuNanos += (long)page.getPositionCount();
            }
            return validationCpuNanos;
        }

        public void abort() {
        }
    }

    private static class BlockingPageSink
    implements ConnectorPageSink {
        private CompletableFuture<?> future = new CompletableFuture();
        private CompletableFuture<Collection<Slice>> finishFuture = new CompletableFuture();

        private BlockingPageSink() {
        }

        public CompletableFuture<?> appendPage(Page page) {
            this.future = new CompletableFuture();
            return this.future;
        }

        public CompletableFuture<Collection<Slice>> finish() {
            this.finishFuture = new CompletableFuture();
            return this.finishFuture;
        }

        public void abort() {
        }

        void complete() {
            this.future.complete(null);
            this.finishFuture.complete((Collection<Slice>)ImmutableList.of());
        }
    }

    private static class ConstantPageSinkProvider
    implements ConnectorPageSinkProvider {
        private final ConnectorPageSink pageSink;

        private ConstantPageSinkProvider(ConnectorPageSink pageSink) {
            this.pageSink = pageSink;
        }

        public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle) {
            return this.pageSink;
        }

        public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle) {
            return this.pageSink;
        }
    }
}

