/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.planner;

import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spark.planner.PrestoSparkSourceDistributionSplitAssigner;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.testing.TestingTransactionHandle;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestPrestoSparkSourceDistributionSplitAssigner {
    @Test
    public void testSplitAssignmentWithAutoTuneEnabled() {
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneEnabled(new DataSize(10.0, DataSize.Unit.BYTE), 2, 4, (List<Long>)ImmutableList.of(), (Map<Integer, List<Long>>)ImmutableMap.of());
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneEnabled(new DataSize(10.0, DataSize.Unit.BYTE), 2, 4, (List<Long>)ImmutableList.of((Object)1L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneEnabled(new DataSize(10.0, DataSize.Unit.BYTE), 2, 4, (List<Long>)ImmutableList.of((Object)1L, (Object)1L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)1L), (Object)1, (Object)ImmutableList.of((Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneEnabled(new DataSize(10.0, DataSize.Unit.BYTE), 2, 4, (List<Long>)ImmutableList.of((Object)10L, (Object)11L, (Object)12L, (Object)13L, (Object)9L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)13L), (Object)1, (Object)ImmutableList.of((Object)12L), (Object)2, (Object)ImmutableList.of((Object)11L), (Object)3, (Object)ImmutableList.of((Object)10L, (Object)9L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneEnabled(new DataSize(10.0, DataSize.Unit.BYTE), 1, 4, (List<Long>)ImmutableList.of((Object)3L, (Object)4L, (Object)5L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)5L, (Object)4L), (Object)1, (Object)ImmutableList.of((Object)3L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneEnabled(new DataSize(10.0, DataSize.Unit.BYTE), 1, 10, (List<Long>)ImmutableList.of((Object)1L, (Object)2L, (Object)3L, (Object)4L, (Object)5L, (Object)6L, (Object)7L, (Object)8L, (Object)9L, (Object)10L, (Object)11L), (Map<Integer, List<Long>>)ImmutableMap.builder().put((Object)0, (Object)ImmutableList.of((Object)11L)).put((Object)1, (Object)ImmutableList.of((Object)10L)).put((Object)2, (Object)ImmutableList.of((Object)9L)).put((Object)3, (Object)ImmutableList.of((Object)8L, (Object)1L)).put((Object)4, (Object)ImmutableList.of((Object)7L, (Object)2L)).put((Object)5, (Object)ImmutableList.of((Object)6L, (Object)3L)).put((Object)6, (Object)ImmutableList.of((Object)5L, (Object)4L)).build());
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneEnabled(new DataSize(10.0, DataSize.Unit.BYTE), 1, 10, (List<Long>)ImmutableList.of((Object)1L, (Object)2L, (Object)3L, (Object)4L, (Object)5L, (Object)6L), (Map<Integer, List<Long>>)ImmutableMap.builder().put((Object)0, (Object)ImmutableList.of((Object)6L, (Object)3L)).put((Object)1, (Object)ImmutableList.of((Object)5L, (Object)4L)).put((Object)2, (Object)ImmutableList.of((Object)2L, (Object)1L)).build());
    }

    private static void assertSplitAssignmentWithAutoTuneEnabled(DataSize maxSplitsDataSizePerSparkPartition, int minSparkInputPartitionCountForAutoTune, int maxSparkInputPartitionCountForAutoTune, List<Long> splitSizes, Map<Integer, List<Long>> expectedAssignment) {
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignment(true, maxSplitsDataSizePerSparkPartition, 1, minSparkInputPartitionCountForAutoTune, maxSparkInputPartitionCountForAutoTune, splitSizes, expectedAssignment);
    }

    @Test
    public void testSplitAssignmentWithAutoTuneDisabled() {
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(1, (List<Long>)ImmutableList.of(), (Map<Integer, List<Long>>)ImmutableMap.of());
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(1, (List<Long>)ImmutableList.of((Object)1L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(1, (List<Long>)ImmutableList.of((Object)1L, (Object)1L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)1L, (Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(2, (List<Long>)ImmutableList.of((Object)1L, (Object)1L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)1L), (Object)1, (Object)ImmutableList.of((Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(2, (List<Long>)ImmutableList.of((Object)1L, (Object)1L, (Object)2L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)2L), (Object)1, (Object)ImmutableList.of((Object)1L, (Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(2, (List<Long>)ImmutableList.of((Object)2L, (Object)1L, (Object)1L, (Object)1L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)2L, (Object)1L), (Object)1, (Object)ImmutableList.of((Object)1L, (Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(2, (List<Long>)ImmutableList.of((Object)2L, (Object)1L, (Object)1L, (Object)1L, (Object)3L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)3L, (Object)1L), (Object)1, (Object)ImmutableList.of((Object)2L, (Object)1L, (Object)1L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(3, (List<Long>)ImmutableList.of((Object)1L, (Object)2L, (Object)3L, (Object)4L, (Object)5L, (Object)6L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)6L, (Object)1L), (Object)1, (Object)ImmutableList.of((Object)5L, (Object)2L), (Object)2, (Object)ImmutableList.of((Object)4L, (Object)3L)));
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignmentWithAutoTuneDisabled(3, (List<Long>)ImmutableList.of((Object)5L, (Object)6L, (Object)7L, (Object)8L, (Object)9L, (Object)10L), (Map<Integer, List<Long>>)ImmutableMap.of((Object)0, (Object)ImmutableList.of((Object)10L, (Object)5L), (Object)1, (Object)ImmutableList.of((Object)9L, (Object)6L), (Object)2, (Object)ImmutableList.of((Object)8L, (Object)7L)));
    }

    private static void assertSplitAssignmentWithAutoTuneDisabled(int initialPartitionCount, List<Long> splitSizes, Map<Integer, List<Long>> expectedAssignment) {
        TestPrestoSparkSourceDistributionSplitAssigner.assertSplitAssignment(false, new DataSize(1.0, DataSize.Unit.BYTE), initialPartitionCount, 1, 2, splitSizes, expectedAssignment);
    }

    private static void assertSplitAssignment(boolean autoTuneEnabled, DataSize maxSplitsDataSizePerSparkPartition, int initialPartitionCount, int minSparkInputPartitionCountForAutoTune, int maxSparkInputPartitionCountForAutoTune, List<Long> splitSizes, Map<Integer, List<Long>> expectedAssignment) {
        PrestoSparkSourceDistributionSplitAssigner assigner = new PrestoSparkSourceDistributionSplitAssigner(new PlanNodeId("test"), TestPrestoSparkSourceDistributionSplitAssigner.createSplitSource(splitSizes), Integer.MAX_VALUE, maxSplitsDataSizePerSparkPartition.toBytes(), initialPartitionCount, autoTuneEnabled, minSparkInputPartitionCountForAutoTune, maxSparkInputPartitionCountForAutoTune, 0);
        Optional actualAssignment = assigner.getNextBatch();
        if (!splitSizes.isEmpty()) {
            Assertions.assertThat((Optional)actualAssignment).isPresent();
            TestPrestoSparkSourceDistributionSplitAssigner.assertAssignedSplits((SetMultimap<Integer, ScheduledSplit>)((SetMultimap)actualAssignment.get()), expectedAssignment);
        } else {
            Assertions.assertThat((Optional)actualAssignment).isNotPresent();
        }
        for (int splitBatchSize = 1; splitBatchSize < splitSizes.size(); splitBatchSize *= 2) {
            Optional assignment;
            actualAssignment = HashMultimap.create();
            ArrayList<Long> sortedSplits = new ArrayList<Long>(splitSizes);
            sortedSplits.sort(Comparator.naturalOrder().reversed());
            PrestoSparkSourceDistributionSplitAssigner assigner2 = new PrestoSparkSourceDistributionSplitAssigner(new PlanNodeId("test"), TestPrestoSparkSourceDistributionSplitAssigner.createSplitSource(sortedSplits), splitBatchSize, maxSplitsDataSizePerSparkPartition.toBytes(), initialPartitionCount, autoTuneEnabled, minSparkInputPartitionCountForAutoTune, maxSparkInputPartitionCountForAutoTune, 0);
            while ((assignment = assigner2.getNextBatch()).isPresent()) {
                actualAssignment.putAll((Multimap)assignment.get());
            }
            TestPrestoSparkSourceDistributionSplitAssigner.assertAssignedSplits((SetMultimap<Integer, ScheduledSplit>)actualAssignment, expectedAssignment);
        }
    }

    @Test
    public void testAssignSplitsToPartitionWithRandomSplitSizes() {
        DataSize maxSplitDataSizePerPartition = new DataSize(2048.0, DataSize.Unit.BYTE);
        int initialPartitionCount = 3;
        int minSparkInputPartitionCountForAutoTune = 2;
        int maxSparkInputPartitionCountForAutoTune = 5;
        int maxSplitSizeInBytes = 2048;
        AtomicInteger sequenceId = new AtomicInteger();
        for (int i = 0; i < 3; ++i) {
            Optional assignment;
            ArrayList<Long> splitSizes = new ArrayList<Long>(1000);
            for (int j = 0; j < 1000; ++j) {
                splitSizes.add(ThreadLocalRandom.current().nextLong((long)((double)maxSplitSizeInBytes * 1.2)));
            }
            PrestoSparkSourceDistributionSplitAssigner assigner = new PrestoSparkSourceDistributionSplitAssigner(new PlanNodeId("test"), TestPrestoSparkSourceDistributionSplitAssigner.createSplitSource(splitSizes), 333, maxSplitDataSizePerPartition.toBytes(), initialPartitionCount, true, minSparkInputPartitionCountForAutoTune, maxSparkInputPartitionCountForAutoTune, 0);
            HashMultimap actualAssignment = HashMultimap.create();
            while ((assignment = assigner.getNextBatch()).isPresent()) {
                actualAssignment.putAll((Multimap)assignment.get());
            }
            long expectedSizeInBytes = splitSizes.stream().mapToLong(Long::longValue).sum();
            long actualTotalSizeInBytes = actualAssignment.values().stream().mapToLong(split -> split.getSplit().getConnectorSplit().getSplitSizeInBytes().orElseThrow(() -> new IllegalArgumentException("split size is expected to be present"))).sum();
            Assert.assertEquals((long)expectedSizeInBytes, (long)actualTotalSizeInBytes);
        }
    }

    private static void assertAssignedSplits(SetMultimap<Integer, ScheduledSplit> actual, Map<Integer, List<Long>> expected) {
        Map<Integer, List<Long>> actualAssignment = TestPrestoSparkSourceDistributionSplitAssigner.getAssignedSplitSizes(actual);
        Assertions.assertThat(actualAssignment.keySet()).isEqualTo(expected.keySet());
        for (Integer partition : actualAssignment.keySet()) {
            Assertions.assertThat(actualAssignment.get(partition)).containsExactlyInAnyOrder((Object[])expected.get(partition).toArray(new Long[0]));
        }
    }

    private static Map<Integer, List<Long>> getAssignedSplitSizes(SetMultimap<Integer, ScheduledSplit> assignedSplits) {
        return (Map)assignedSplits.asMap().entrySet().stream().collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, entry -> (ImmutableList)((Collection)entry.getValue()).stream().map(split -> split.getSplit().getConnectorSplit().getSplitSizeInBytes().orElseThrow(() -> new IllegalArgumentException("split size is expected to be present"))).collect(ImmutableList.toImmutableList())));
    }

    private static SplitSource createSplitSource(List<Long> splitSizes) {
        List splits = (List)splitSizes.stream().map(size -> new Split(new ConnectorId("test"), (ConnectorTransactionHandle)TestingTransactionHandle.create(), (ConnectorSplit)new MockSplit((long)size))).collect(ImmutableList.toImmutableList());
        return new MockSplitSource(splits);
    }

    private static class MockSplitSource
    implements SplitSource {
        private final List<Split> splits;
        private int position;
        private boolean closed;

        private MockSplitSource(List<Split> splits) {
            this.splits = ImmutableList.copyOf((Collection)Objects.requireNonNull(splits, "splits is null"));
        }

        public ConnectorId getConnectorId() {
            throw new UnsupportedOperationException();
        }

        public ConnectorTransactionHandle getTransactionHandle() {
            throw new UnsupportedOperationException();
        }

        public ListenableFuture<SplitSource.SplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, Lifespan lifespan, int maxSize) {
            Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"split source is closed");
            Preconditions.checkState((!this.isFinished() ? 1 : 0) != 0, (Object)"split source is finished");
            Preconditions.checkArgument((boolean)partitionHandle.equals((Object)NotPartitionedPartitionHandle.NOT_PARTITIONED), (String)"unexpected partition handle: %s", (Object)partitionHandle);
            Preconditions.checkArgument((boolean)lifespan.equals((Object)Lifespan.taskWide()), (String)"unexpected lifespan: %s", (Object)lifespan);
            int remaining = this.splits.size() - this.position;
            int batchSize = Ints.min((int[])new int[]{remaining, maxSize});
            ImmutableList batch = ImmutableList.copyOf(this.splits.subList(this.position, this.position + batchSize));
            this.position += batchSize;
            return Futures.immediateFuture((Object)new SplitSource.SplitBatch((List)batch, this.isFinished()));
        }

        public void rewind(ConnectorPartitionHandle partitionHandle) {
            throw new UnsupportedOperationException();
        }

        public void close() {
            this.closed = true;
        }

        public boolean isFinished() {
            return this.position >= this.splits.size();
        }
    }

    private static class MockSplit
    implements ConnectorSplit {
        private final long splitSizeInBytes;

        public MockSplit(long splitSizeInBytes) {
            this.splitSizeInBytes = splitSizeInBytes;
        }

        public NodeSelectionStrategy getNodeSelectionStrategy() {
            throw new UnsupportedOperationException();
        }

        public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider) {
            throw new UnsupportedOperationException();
        }

        public Object getInfo() {
            throw new UnsupportedOperationException();
        }

        public OptionalLong getSplitSizeInBytes() {
            return OptionalLong.of(this.splitSizeInBytes);
        }
    }
}

