/*
 * Decompiled with CFR 0.152.
 */
package io.trino.spiller;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closer;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import io.airlift.concurrent.MoreFutures;
import io.trino.FeaturesConfig;
import io.trino.RowPagesBuilder;
import io.trino.SequencePageBuilder;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.operator.PageAssertions;
import io.trino.operator.PartitionFunction;
import io.trino.operator.SpillContext;
import io.trino.operator.TestingOperatorContext;
import io.trino.spi.Page;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.block.TestingBlockEncodingSerde;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import io.trino.spiller.FileSingleStreamSpillerFactory;
import io.trino.spiller.GenericPartitioningSpiller;
import io.trino.spiller.GenericPartitioningSpillerFactory;
import io.trino.spiller.NodeSpillConfig;
import io.trino.spiller.PartitioningSpiller;
import io.trino.spiller.SingleStreamSpillerFactory;
import io.trino.spiller.SpillerStats;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.IntPredicate;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.CONCURRENT)
public class TestGenericPartitioningSpiller {
    private static final int FIRST_PARTITION_START = -10;
    private static final int SECOND_PARTITION_START = 0;
    private static final int THIRD_PARTITION_START = 10;
    private static final int FOURTH_PARTITION_START = 20;
    private static final List<Type> TYPES = ImmutableList.of((Object)BigintType.BIGINT, (Object)VarcharType.VARCHAR, (Object)DoubleType.DOUBLE, (Object)BigintType.BIGINT);
    private Path tempDirectory;
    private GenericPartitioningSpillerFactory factory;
    private ScheduledExecutorService scheduledExecutor;

    @BeforeAll
    public void setUp() throws Exception {
        this.tempDirectory = Files.createTempDirectory(this.getClass().getSimpleName(), new FileAttribute[0]);
        FeaturesConfig featuresConfig = new FeaturesConfig();
        featuresConfig.setSpillerSpillPaths((List)ImmutableList.of((Object)this.tempDirectory.toString()));
        featuresConfig.setSpillerThreads("8");
        featuresConfig.setSpillMaxUsedSpaceThreshold(1.0);
        FileSingleStreamSpillerFactory singleStreamSpillerFactory = new FileSingleStreamSpillerFactory((BlockEncodingSerde)new TestingBlockEncodingSerde(), new SpillerStats(), featuresConfig, new NodeSpillConfig());
        this.factory = new GenericPartitioningSpillerFactory((SingleStreamSpillerFactory)singleStreamSpillerFactory);
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    }

    @AfterAll
    public void tearDown() throws Exception {
        try (Closer closer = Closer.create();){
            closer.register(() -> this.scheduledExecutor.shutdownNow());
            closer.register(() -> MoreFiles.deleteRecursively((Path)this.tempDirectory, (RecursiveDeleteOption[])new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE}));
        }
    }

    @Test
    public void testFileSpiller() throws Exception {
        try (PartitioningSpiller spiller = this.factory.create(TYPES, (PartitionFunction)new FourFixedPartitionsPartitionFunction(0), TestGenericPartitioningSpiller.mockSpillContext(), TestGenericPartitioningSpiller.mockMemoryContext(this.scheduledExecutor));){
            RowPagesBuilder builder = RowPagesBuilder.rowPagesBuilder(TYPES);
            builder.addSequencePage(10, 0, 5, 10, 15);
            builder.addSequencePage(10, -10, -5, 0, 5);
            List<Page> firstSpill = builder.build();
            builder = RowPagesBuilder.rowPagesBuilder(TYPES);
            builder.addSequencePage(10, 10, 15, 20, 25);
            builder.addSequencePage(10, 20, 25, 30, 35);
            List<Page> secondSpill = builder.build();
            IntPredicate spillPartitionMask = arg_0 -> ((ImmutableSet)ImmutableSet.of((Object)1, (Object)2)).contains(arg_0);
            PartitioningSpiller.PartitioningSpillResult result = spiller.partitionAndSpill(firstSpill.get(0), spillPartitionMask);
            result.getSpillingFuture().get();
            Assertions.assertThat((int)result.getRetained().getPositionCount()).isEqualTo(0);
            result = spiller.partitionAndSpill(firstSpill.get(1), spillPartitionMask);
            result.getSpillingFuture().get();
            Assertions.assertThat((int)result.getRetained().getPositionCount()).isEqualTo(10);
            result = spiller.partitionAndSpill(secondSpill.get(0), spillPartitionMask);
            result.getSpillingFuture().get();
            Assertions.assertThat((int)result.getRetained().getPositionCount()).isEqualTo(0);
            result = spiller.partitionAndSpill(secondSpill.get(1), spillPartitionMask);
            result.getSpillingFuture().get();
            Assertions.assertThat((int)result.getRetained().getPositionCount()).isEqualTo(10);
            builder = RowPagesBuilder.rowPagesBuilder(TYPES);
            builder.addSequencePage(10, 0, 5, 10, 15);
            List<Page> secondPartition = builder.build();
            builder = RowPagesBuilder.rowPagesBuilder(TYPES);
            builder.addSequencePage(10, 10, 15, 20, 25);
            List<Page> thirdPartition = builder.build();
            this.assertSpilledPages(TYPES, spiller, (List<List<Page>>)ImmutableList.of((Object)ImmutableList.of(), secondPartition, thirdPartition, (Object)ImmutableList.of()));
        }
    }

    @Test
    public void testCloseDuringReading() throws Exception {
        Iterator readingInProgress;
        try (PartitioningSpiller spiller = this.factory.create(TYPES, (PartitionFunction)new ModuloPartitionFunction(0, 4), TestGenericPartitioningSpiller.mockSpillContext(), TestGenericPartitioningSpiller.mockMemoryContext(this.scheduledExecutor));){
            Page page = SequencePageBuilder.createSequencePage(TYPES, 10, -10, 5, 10, 15);
            PartitioningSpiller.PartitioningSpillResult spillResult = spiller.partitionAndSpill(page, partition -> true);
            Assertions.assertThat((int)spillResult.getRetained().getPositionCount()).isEqualTo(0);
            MoreFutures.getFutureValue((Future)spillResult.getSpillingFuture());
            readingInProgress = spiller.getSpilledPages(0);
        }
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(readingInProgress::hasNext).isInstanceOf(UncheckedIOException.class)).hasCauseInstanceOf(ClosedChannelException.class);
    }

    @Test
    public void testWriteManyPartitions() throws Exception {
        ImmutableList types = ImmutableList.of((Object)BigintType.BIGINT);
        int partitionCount = 4;
        AggregatedMemoryContext memoryContext = TestGenericPartitioningSpiller.mockMemoryContext(this.scheduledExecutor);
        try (GenericPartitioningSpiller spiller = (GenericPartitioningSpiller)this.factory.create((List)types, (PartitionFunction)new ModuloPartitionFunction(0, partitionCount), TestGenericPartitioningSpiller.mockSpillContext(), memoryContext);){
            for (int i = 0; i < 50000; ++i) {
                Page page = SequencePageBuilder.createSequencePage((List<? extends Type>)types, partitionCount, 0);
                PartitioningSpiller.PartitioningSpillResult spillResult = spiller.partitionAndSpill(page, partition -> true);
                Assertions.assertThat((int)spillResult.getRetained().getPositionCount()).isEqualTo(0);
                MoreFutures.getFutureValue((Future)spillResult.getSpillingFuture());
                MoreFutures.getFutureValue((Future)spiller.flush());
            }
        }
        ((AbstractLongAssert)Assertions.assertThat((long)memoryContext.getBytes()).describedAs("Reserved bytes should be zeroed after spiller is closed", new Object[0])).isEqualTo(0L);
    }

    private void assertSpilledPages(List<Type> types, PartitioningSpiller spiller, List<List<Page>> expectedPartitions) {
        for (int partition = 0; partition < expectedPartitions.size(); ++partition) {
            ImmutableList actualSpill = ImmutableList.copyOf((Iterator)spiller.getSpilledPages(partition));
            List<Page> expectedSpill = expectedPartitions.get(partition);
            Assertions.assertThat((List)actualSpill).hasSize(expectedSpill.size());
            for (int j = 0; j < actualSpill.size(); ++j) {
                PageAssertions.assertPageEquals(types, (Page)actualSpill.get(j), expectedSpill.get(j));
            }
        }
    }

    private static AggregatedMemoryContext mockMemoryContext(ScheduledExecutorService scheduledExecutor) {
        return TestingOperatorContext.create(scheduledExecutor).newAggregateUserMemoryContext();
    }

    private static SpillContext mockSpillContext() {
        return bytes -> {};
    }

    private static class FourFixedPartitionsPartitionFunction
    implements PartitionFunction {
        private final int valueChannel;

        FourFixedPartitionsPartitionFunction(int valueChannel) {
            this.valueChannel = valueChannel;
        }

        public int partitionCount() {
            return 4;
        }

        public int getPartition(Page page, int position) {
            long value = BigintType.BIGINT.getLong(page.getBlock(this.valueChannel), position);
            if (value >= 20L) {
                return 3;
            }
            if (value >= 10L) {
                return 2;
            }
            if (value >= 0L) {
                return 1;
            }
            return 0;
        }
    }

    private static class ModuloPartitionFunction
    implements PartitionFunction {
        private final int valueChannel;
        private final int partitionCount;

        ModuloPartitionFunction(int valueChannel, int partitionCount) {
            this.valueChannel = valueChannel;
            Preconditions.checkArgument((partitionCount > 0 ? 1 : 0) != 0);
            this.partitionCount = partitionCount;
        }

        public int partitionCount() {
            return this.partitionCount;
        }

        public int getPartition(Page page, int position) {
            long value = BigintType.BIGINT.getLong(page.getBlock(this.valueChannel), position);
            return Math.toIntExact(Math.abs(value) % (long)this.partitionCount);
        }
    }
}

