/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.flink.HadoopTableExtension;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.enumerator.ContinuousEnumerationResult;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.assertj.core.api.AbstractCharSequenceAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class TestContinuousSplitPlannerImpl {
    @TempDir
    protected Path temporaryFolder;
    private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
    private static final AtomicLong RANDOM_SEED = new AtomicLong();
    @RegisterExtension
    private static final HadoopTableExtension TABLE_RESOURCE = new HadoopTableExtension("default", "t", TestFixtures.SCHEMA);
    private GenericAppenderHelper dataAppender;
    private DataFile dataFile1;
    private Snapshot snapshot1;
    private DataFile dataFile2;
    private Snapshot snapshot2;

    @BeforeEach
    public void before() throws IOException {
        this.dataAppender = new GenericAppenderHelper(TABLE_RESOURCE.table(), FILE_FORMAT, this.temporaryFolder);
    }

    private void appendTwoSnapshots() throws IOException {
        List batch1 = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)2, (long)0L);
        this.dataFile1 = this.dataAppender.writeFile(null, batch1);
        this.dataAppender.appendToTable(new DataFile[]{this.dataFile1});
        this.snapshot1 = TABLE_RESOURCE.table().currentSnapshot();
        List batch2 = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)2, (long)1L);
        this.dataFile2 = this.dataAppender.writeFile(null, batch2);
        this.dataAppender.appendToTable(new DataFile[]{this.dataFile2});
        this.snapshot2 = TABLE_RESOURCE.table().currentSnapshot();
    }

    private CycleResult verifyOneCycle(ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition) throws Exception {
        List batch = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)2, (long)RANDOM_SEED.incrementAndGet());
        DataFile dataFile = this.dataAppender.writeFile(null, batch);
        this.dataAppender.appendToTable(new DataFile[]{dataFile});
        Snapshot snapshot = TABLE_RESOURCE.table().currentSnapshot();
        ContinuousEnumerationResult result = splitPlanner.planSplits(lastPosition);
        Assertions.assertThat((Long)result.fromPosition().snapshotId()).isEqualTo((Object)lastPosition.snapshotId());
        Assertions.assertThat((Long)result.fromPosition().snapshotTimestampMs()).isEqualTo((Object)lastPosition.snapshotTimestampMs());
        Assertions.assertThat((long)result.toPosition().snapshotId()).isEqualTo(snapshot.snapshotId());
        Assertions.assertThat((long)result.toPosition().snapshotTimestampMs()).isEqualTo(snapshot.timestampMillis());
        Assertions.assertThat((Collection)result.splits()).hasSize(1);
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)result.splits());
        ((ObjectAssert)((AbstractCollectionAssert)Assertions.assertThat((Collection)split.task().files()).hasSize(1)).first()).satisfies(new ThrowingConsumer[]{fileScanTask -> {
            AbstractCharSequenceAssert cfr_ignored_0 = (AbstractCharSequenceAssert)Assertions.assertThat((CharSequence)((DataFile)fileScanTask.file()).path()).isEqualTo((Object)dataFile.path());
        }});
        return new CycleResult(result.toPosition(), split);
    }

    @Test
    public void testTableScanThenIncrementalWithEmptyTable() throws Exception {
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Collection)emptyTableInitialDiscoveryResult.splits()).isEmpty();
        Assertions.assertThat((Object)emptyTableInitialDiscoveryResult.fromPosition()).isNull();
        Assertions.assertThat((boolean)emptyTableInitialDiscoveryResult.toPosition().isEmpty()).isTrue();
        Assertions.assertThat((Long)emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs()).isNull();
        ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner.planSplits(emptyTableInitialDiscoveryResult.toPosition());
        Assertions.assertThat((Collection)emptyTableSecondDiscoveryResult.splits()).isEmpty();
        Assertions.assertThat((boolean)emptyTableSecondDiscoveryResult.fromPosition().isEmpty()).isTrue();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs()).isNull();
        Assertions.assertThat((boolean)emptyTableSecondDiscoveryResult.toPosition().isEmpty()).isTrue();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs()).isNull();
        IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Object)initialResult.fromPosition()).isNull();
        Assertions.assertThat((long)initialResult.toPosition().snapshotId()).isEqualTo(this.snapshot2.snapshotId());
        Assertions.assertThat((long)initialResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot2.timestampMillis());
        Assertions.assertThat((Collection)initialResult.splits()).hasSize(1);
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)initialResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(2);
        Set discoveredFiles = split.task().files().stream().map(fileScanTask -> ((DataFile)fileScanTask.file()).path().toString()).collect(Collectors.toSet());
        ImmutableSet expectedFiles = ImmutableSet.of((Object)this.dataFile1.path().toString(), (Object)this.dataFile2.path().toString());
        Assertions.assertThat(discoveredFiles).containsExactlyInAnyOrderElementsOf((Iterable)expectedFiles);
        IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception {
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT).splitSize(Long.valueOf(1L)).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Collection)emptyTableInitialDiscoveryResult.splits()).isEmpty();
        Assertions.assertThat((Object)emptyTableInitialDiscoveryResult.fromPosition()).isNull();
        Assertions.assertThat((boolean)emptyTableInitialDiscoveryResult.toPosition().isEmpty()).isTrue();
        Assertions.assertThat((Long)emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs()).isNull();
        ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner.planSplits(emptyTableInitialDiscoveryResult.toPosition());
        Assertions.assertThat((Collection)emptyTableSecondDiscoveryResult.splits()).isEmpty();
        Assertions.assertThat((boolean)emptyTableSecondDiscoveryResult.fromPosition().isEmpty()).isTrue();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs()).isNull();
        Assertions.assertThat((boolean)emptyTableSecondDiscoveryResult.toPosition().isEmpty()).isTrue();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs()).isNull();
        this.appendTwoSnapshots();
        ContinuousEnumerationResult afterTwoSnapshotsAppended = splitPlanner.planSplits(emptyTableSecondDiscoveryResult.toPosition());
        Assertions.assertThat((Collection)afterTwoSnapshotsAppended.splits()).hasSize(2);
        IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Object)initialResult.fromPosition()).isNull();
        Assertions.assertThat((long)initialResult.toPosition().snapshotId()).isEqualTo(this.snapshot1.snapshotId());
        Assertions.assertThat((long)initialResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot1.timestampMillis());
        Assertions.assertThat((Collection)initialResult.splits()).isEmpty();
        ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
        Assertions.assertThat((long)secondResult.fromPosition().snapshotId()).isEqualTo(this.snapshot1.snapshotId());
        Assertions.assertThat((long)secondResult.fromPosition().snapshotTimestampMs()).isEqualTo(this.snapshot1.timestampMillis());
        Assertions.assertThat((long)secondResult.toPosition().snapshotId()).isEqualTo(this.snapshot2.snapshotId());
        Assertions.assertThat((long)secondResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot2.timestampMillis());
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)secondResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(1);
        Set discoveredFiles = split.task().files().stream().map(fileScanTask -> ((DataFile)fileScanTask.file()).path().toString()).collect(Collectors.toSet());
        ImmutableSet expectedFiles = ImmutableSet.of((Object)this.dataFile2.path().toString());
        Assertions.assertThat(discoveredFiles).containsExactlyElementsOf((Iterable)expectedFiles);
        IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception {
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Collection)emptyTableInitialDiscoveryResult.splits()).isEmpty();
        Assertions.assertThat((Object)emptyTableInitialDiscoveryResult.fromPosition()).isNull();
        Assertions.assertThat((Long)emptyTableInitialDiscoveryResult.toPosition().snapshotId()).isNull();
        Assertions.assertThat((Long)emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs()).isNull();
        ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner.planSplits(emptyTableInitialDiscoveryResult.toPosition());
        Assertions.assertThat((Collection)emptyTableSecondDiscoveryResult.splits()).isEmpty();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.fromPosition().snapshotId()).isNull();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs()).isNull();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.toPosition().snapshotId()).isNull();
        Assertions.assertThat((Long)emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs()).isNull();
        IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Object)initialResult.fromPosition()).isNull();
        Assertions.assertThat((Long)initialResult.toPosition().snapshotId()).isNull();
        Assertions.assertThat((Long)initialResult.toPosition().snapshotTimestampMs()).isNull();
        Assertions.assertThat((Collection)initialResult.splits()).isEmpty();
        ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
        Assertions.assertThat((Long)secondResult.fromPosition().snapshotId()).isNull();
        Assertions.assertThat((Long)secondResult.fromPosition().snapshotTimestampMs()).isNull();
        Assertions.assertThat((long)secondResult.toPosition().snapshotId()).isEqualTo(this.snapshot2.snapshotId());
        Assertions.assertThat((long)secondResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot2.timestampMillis());
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)secondResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(2);
        Set discoveredFiles = split.task().files().stream().map(fileScanTask -> ((DataFile)fileScanTask.file()).path().toString()).collect(Collectors.toSet());
        ImmutableSet expectedFiles = ImmutableSet.of((Object)this.dataFile1.path().toString(), (Object)this.dataFile2.path().toString());
        Assertions.assertThat(discoveredFiles).containsExactlyInAnyOrderElementsOf((Iterable)expectedFiles);
        IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testIncrementalFromSnapshotIdWithEmptyTable() {
        ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID).startSnapshotId(Long.valueOf(1L)).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null)).isInstanceOf(IllegalArgumentException.class)).hasMessage("Start snapshot id not found in history: 1");
    }

    @Test
    public void testIncrementalFromSnapshotIdWithInvalidIds() throws Exception {
        long invalidSnapshotId;
        this.appendTwoSnapshots();
        for (invalidSnapshotId = 0L; invalidSnapshotId == this.snapshot1.snapshotId() || invalidSnapshotId == this.snapshot2.snapshotId(); ++invalidSnapshotId) {
        }
        ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID).startSnapshotId(Long.valueOf(invalidSnapshotId)).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null)).isInstanceOf(IllegalArgumentException.class)).hasMessage("Start snapshot id not found in history: " + invalidSnapshotId);
    }

    @Test
    public void testIncrementalFromSnapshotId() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID).startSnapshotId(Long.valueOf(this.snapshot2.snapshotId())).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Object)initialResult.fromPosition()).isNull();
        Assertions.assertThat((long)initialResult.toPosition().snapshotId()).isEqualTo(this.snapshot1.snapshotId());
        Assertions.assertThat((long)initialResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot1.timestampMillis());
        Assertions.assertThat((Collection)initialResult.splits()).isEmpty();
        ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
        Assertions.assertThat((long)secondResult.fromPosition().snapshotId()).isEqualTo(this.snapshot1.snapshotId());
        Assertions.assertThat((long)secondResult.fromPosition().snapshotTimestampMs()).isEqualTo(this.snapshot1.timestampMillis());
        Assertions.assertThat((long)secondResult.toPosition().snapshotId()).isEqualTo(this.snapshot2.snapshotId());
        Assertions.assertThat((long)secondResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot2.timestampMillis());
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)secondResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(1);
        Set discoveredFiles = split.task().files().stream().map(fileScanTask -> ((DataFile)fileScanTask.file()).path().toString()).collect(Collectors.toSet());
        ImmutableSet expectedFiles = ImmutableSet.of((Object)this.dataFile2.path().toString());
        Assertions.assertThat(discoveredFiles).containsExactlyElementsOf((Iterable)expectedFiles);
        IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testIncrementalFromSnapshotTimestampWithEmptyTable() {
        ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP).startSnapshotTimestamp(Long.valueOf(1L)).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null)).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot find a snapshot after: 1");
    }

    @Test
    public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exception {
        this.appendTwoSnapshots();
        long invalidSnapshotTimestampMs = this.snapshot2.timestampMillis() + 1000L;
        ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP).startSnapshotTimestamp(Long.valueOf(invalidSnapshotTimestampMs)).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null)).isInstanceOf(IllegalArgumentException.class)).hasMessageStartingWith("Cannot find a snapshot after:");
    }

    @Test
    public void testIncrementalFromSnapshotTimestamp() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP).startSnapshotTimestamp(Long.valueOf(this.snapshot2.timestampMillis())).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Object)initialResult.fromPosition()).isNull();
        Assertions.assertThat((long)initialResult.toPosition().snapshotId()).isEqualTo(this.snapshot1.snapshotId());
        Assertions.assertThat((long)initialResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot1.timestampMillis());
        Assertions.assertThat((Collection)initialResult.splits()).isEmpty();
        ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
        Assertions.assertThat((long)secondResult.fromPosition().snapshotId()).isEqualTo(this.snapshot1.snapshotId());
        Assertions.assertThat((long)secondResult.fromPosition().snapshotTimestampMs()).isEqualTo(this.snapshot1.timestampMillis());
        Assertions.assertThat((long)secondResult.toPosition().snapshotId()).isEqualTo(this.snapshot2.snapshotId());
        Assertions.assertThat((long)secondResult.toPosition().snapshotTimestampMs()).isEqualTo(this.snapshot2.timestampMillis());
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)secondResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(1);
        Set discoveredFiles = split.task().files().stream().map(fileScanTask -> ((DataFile)fileScanTask.file()).path().toString()).collect(Collectors.toSet());
        ImmutableSet expectedFiles = ImmutableSet.of((Object)this.dataFile2.path().toString());
        Assertions.assertThat(discoveredFiles).containsExactlyElementsOf((Iterable)expectedFiles);
        IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            lastPosition = this.verifyOneCycle((ContinuousSplitPlannerImpl)splitPlanner, (IcebergEnumeratorPosition)lastPosition).lastPosition;
        }
    }

    @Test
    public void testMaxPlanningSnapshotCount() throws Exception {
        this.appendTwoSnapshots();
        for (int i = 2; i < 5; ++i) {
            this.appendSnapshot(i, 2);
        }
        ScanContext scanContext = ScanContext.builder().startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Object)initialResult.fromPosition()).isNull();
        Assertions.assertThat((Long)initialResult.toPosition().snapshotId()).isNull();
        Assertions.assertThat((Long)initialResult.toPosition().snapshotTimestampMs()).isNull();
        Assertions.assertThat((Collection)initialResult.splits()).isEmpty();
        ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
        this.verifyMaxPlanningSnapshotCountResult(secondResult, null, this.snapshot1, (Set<String>)ImmutableSet.of((Object)this.dataFile1.path().toString()));
        ContinuousEnumerationResult thirdResult = splitPlanner.planSplits(secondResult.toPosition());
        this.verifyMaxPlanningSnapshotCountResult(thirdResult, this.snapshot1, this.snapshot2, (Set<String>)ImmutableSet.of((Object)this.dataFile2.path().toString()));
    }

    @Test
    public void testTableScanNoStats() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().includeColumnStats(false).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Collection)initialResult.splits()).hasSize(1);
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)initialResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(2);
        this.verifyStatCount(split, 0);
        IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            CycleResult result = this.verifyOneCycle(splitPlanner, lastPosition);
            this.verifyStatCount(result.split, 0);
            lastPosition = result.lastPosition;
        }
    }

    @Test
    public void testTableScanAllStats() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().includeColumnStats(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Collection)initialResult.splits()).hasSize(1);
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)initialResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(2);
        this.verifyStatCount(split, 3);
        IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            CycleResult result = this.verifyOneCycle(splitPlanner, lastPosition);
            this.verifyStatCount(result.split, 3);
            lastPosition = result.lastPosition;
        }
    }

    @Test
    public void testTableScanSingleStat() throws Exception {
        this.appendTwoSnapshots();
        ScanContext scanContext = ScanContext.builder().includeColumnStats((Collection)ImmutableSet.of((Object)"data")).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null);
        ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
        Assertions.assertThat((Collection)initialResult.splits()).hasSize(1);
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)initialResult.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(2);
        this.verifyStatCount(split, 1);
        IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
        for (int i = 0; i < 3; ++i) {
            CycleResult result = this.verifyOneCycle(splitPlanner, lastPosition);
            this.verifyStatCount(result.split, 1);
            lastPosition = result.lastPosition;
        }
    }

    private void verifyStatCount(IcebergSourceSplit split, int expected) {
        if (expected == 0) {
            split.task().files().forEach(f -> {
                Assertions.assertThat((Map)((DataFile)f.file()).valueCounts()).isNull();
                Assertions.assertThat((Map)((DataFile)f.file()).columnSizes()).isNull();
                Assertions.assertThat((Map)((DataFile)f.file()).lowerBounds()).isNull();
                Assertions.assertThat((Map)((DataFile)f.file()).upperBounds()).isNull();
                Assertions.assertThat((Map)((DataFile)f.file()).nanValueCounts()).isNull();
                Assertions.assertThat((Map)((DataFile)f.file()).nullValueCounts()).isNull();
            });
        } else {
            split.task().files().forEach(f -> {
                Assertions.assertThat((Map)((DataFile)f.file()).valueCounts()).hasSize(expected);
                Assertions.assertThat((Map)((DataFile)f.file()).columnSizes()).hasSize(expected);
                Assertions.assertThat((Map)((DataFile)f.file()).lowerBounds()).hasSize(expected);
                Assertions.assertThat((Map)((DataFile)f.file()).upperBounds()).hasSize(expected);
                Assertions.assertThat((Map)((DataFile)f.file()).nullValueCounts()).hasSize(expected);
                Assertions.assertThat((Map)((DataFile)f.file()).nanValueCounts()).isEmpty();
            });
        }
    }

    private void verifyMaxPlanningSnapshotCountResult(ContinuousEnumerationResult result, Snapshot fromSnapshotExclusive, Snapshot toSnapshotInclusive, Set<String> expectedFiles) {
        if (fromSnapshotExclusive == null) {
            Assertions.assertThat((Long)result.fromPosition().snapshotId()).isNull();
            Assertions.assertThat((Long)result.fromPosition().snapshotTimestampMs()).isNull();
        } else {
            Assertions.assertThat((long)result.fromPosition().snapshotId()).isEqualTo(fromSnapshotExclusive.snapshotId());
            Assertions.assertThat((long)result.fromPosition().snapshotTimestampMs()).isEqualTo(fromSnapshotExclusive.timestampMillis());
        }
        Assertions.assertThat((long)result.toPosition().snapshotId()).isEqualTo(toSnapshotInclusive.snapshotId());
        Assertions.assertThat((long)result.toPosition().snapshotTimestampMs()).isEqualTo(toSnapshotInclusive.timestampMillis());
        IcebergSourceSplit split = (IcebergSourceSplit)Iterables.getOnlyElement((Iterable)result.splits());
        Assertions.assertThat((Collection)split.task().files()).hasSize(1);
        Set discoveredFiles = split.task().files().stream().map(fileScanTask -> ((DataFile)fileScanTask.file()).path().toString()).collect(Collectors.toSet());
        Assertions.assertThat(discoveredFiles).containsExactlyElementsOf(expectedFiles);
    }

    private Snapshot appendSnapshot(long seed, int numRecords) throws Exception {
        List batch = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)numRecords, (long)seed);
        DataFile dataFile = this.dataAppender.writeFile(null, batch);
        this.dataAppender.appendToTable(new DataFile[]{dataFile});
        return TABLE_RESOURCE.table().currentSnapshot();
    }

    private static class CycleResult {
        IcebergEnumeratorPosition lastPosition;
        IcebergSourceSplit split;

        CycleResult(IcebergEnumeratorPosition lastPosition, IcebergSourceSplit split) {
            this.lastPosition = lastPosition;
            this.split = split;
        }
    }
}

