/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.source.ContinuousFileSplitEnumerator;
import org.apache.paimon.flink.source.FileSplitEnumeratorTestBase;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.ReaderConsumeProgressEvent;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileTestUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;

public class ContinuousFileSplitEnumeratorTest
extends FileSplitEnumeratorTestBase {
    @Test
    public void testSplitAllocationIsOrdered() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(1);
        ArrayList<FileStoreSourceSplit> initialSplits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 4; ++i) {
            initialSplits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        ArrayList expectedSplits = new ArrayList(initialSplits);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(initialSplits).setDiscoveryInterval(3L).build();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        List assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat((List)assignedSplits).hasSameElementsAs(expectedSplits.subList(0, 2));
        enumerator.addSplitsBack(assignedSplits, 0);
        context.getSplitAssignments().clear();
        Assertions.assertThat((Map)context.getSplitAssignments()).isEmpty();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(0, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat((List)assignedSplits).hasSameElementsAs(expectedSplits.subList(0, 2));
        context.getSplitAssignments().clear();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(0, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat((List)assignedSplits).hasSameElementsAs(expectedSplits.subList(2, 4));
    }

    @Test
    public void testSplitWithBatch() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(1);
        ArrayList<FileStoreSourceSplit> initialSplits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 18; ++i) {
            initialSplits.add(this.createSnapshotSplit(i, i, Collections.emptyList()));
        }
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(initialSplits).setDiscoveryInterval(3L).build();
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).hasSize(1);
        enumerator.handleSplitRequest(0, "test-host");
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).hasSize(2);
        enumerator.handleSplitRequest(0, "test-host");
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).hasSize(3);
    }

    @Test
    public void testSplitAllocationIsFair() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(1);
        ArrayList<FileStoreSourceSplit> initialSplits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 2; ++i) {
            initialSplits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
            initialSplits.add(this.createSnapshotSplit(i, 1, Collections.emptyList()));
        }
        ArrayList expectedSplits = new ArrayList(initialSplits);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(initialSplits).setDiscoveryInterval(3L).build();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        List assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat((List)assignedSplits).hasSameElementsAs(expectedSplits.subList(0, 2));
        context.getSplitAssignments().clear();
        Assertions.assertThat((Map)context.getSplitAssignments()).isEmpty();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(0, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat((List)assignedSplits).hasSameElementsAs(expectedSplits.subList(2, 4));
    }

    @Test
    public void testSnapshotEnumerator() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).build();
        enumerator.start();
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (int i = 0; i < 4; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, i, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits())).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(0)});
        enumerator.handleSplitRequest(0, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits())).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(0), (DataSplit)splits.get(2)});
        enumerator.handleSplitRequest(0, "test-host");
        context.triggerAllActions();
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((boolean)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).hasReceivedNoMoreSplitsSignal()).isTrue();
        assignments.clear();
        enumerator.handleSplitRequest(1, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{1});
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits())).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(1)});
        enumerator.handleSplitRequest(1, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{1});
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits())).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(1), (DataSplit)splits.get(3)});
        enumerator.handleSplitRequest(1, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{1});
        Assertions.assertThat((boolean)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).hasReceivedNoMoreSplitsSignal()).isTrue();
    }

    @Test
    public void testUnawareBucketEnumeratorWithBucket() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(3, 1);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).withBucketMode(BucketMode.UNAWARE).build();
        enumerator.start();
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, 1, Collections.emptyList()));
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).size()).isEqualTo(1);
        splits.clear();
        splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, 2, Collections.emptyList()));
        results.put(2L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        enumerator.handleSplitRequest(0, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).size()).isEqualTo(2);
    }

    @Test
    public void testUnawareBucketEnumeratorLot() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(4);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).withBucketMode(BucketMode.UNAWARE).build();
        enumerator.start();
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (int i = 0; i < 100; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, 0, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).size()).isEqualTo(1);
        enumerator.handleSplitRequest(1, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).size()).isEqualTo(1);
        enumerator.handleSplitRequest(2, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1, 2});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(2)).getAssignedSplits()).size()).isEqualTo(1);
        for (int i = 0; i < 97; ++i) {
            enumerator.handleSplitRequest(3, "test-host");
            assignments = context.getSplitAssignments();
            Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1, 2, 3});
            Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(3)).getAssignedSplits()).size()).isEqualTo(i + 1);
        }
        enumerator.handleSplitRequest(3, "test-host");
        context.triggerAllActions();
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1, 2, 3});
        Assertions.assertThat((boolean)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(3)).hasReceivedNoMoreSplitsSignal()).isTrue();
    }

    @Test
    public void testUnawareBucketEnumeratorAssignLater() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(4);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).withBucketMode(BucketMode.UNAWARE).build();
        enumerator.start();
        scan.allowEnd(false);
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((int)assignments.size()).isEqualTo(0);
        enumerator.handleSplitRequest(1, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((int)assignments.size()).isEqualTo(0);
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (int i = 0; i < 100; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, 0, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((int)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits().size()).isEqualTo(1);
        Assertions.assertThat((int)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits().size()).isEqualTo(1);
        enumerator.handleSplitRequest(2, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1, 2});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(2)).getAssignedSplits()).size()).isEqualTo(1);
        enumerator.handleSplitRequest(3, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1, 2, 3});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(3)).getAssignedSplits()).size()).isEqualTo(1);
    }

    @Test
    public void testEnumeratorDeregisteredByContext() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).withBucketMode(BucketMode.UNAWARE).build();
        enumerator.start();
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (int i = 0; i < 4; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, i, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        context.registeredReaders().remove(0);
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((int)assignments.size()).isEqualTo(0);
        enumerator.handleSplitRequest(1, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{1});
        Assertions.assertThat((int)ContinuousFileSplitEnumeratorTest.toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).size()).isEqualTo(1);
    }

    @Test
    public void testRemoveReadersAwaitSuccessful() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).withBucketMode(BucketMode.UNAWARE).build();
        enumerator.start();
        enumerator.handleSplitRequest(1, "test-host");
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (int i = 0; i < 4; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, i, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        context.registeredReaders().remove(1);
        Assertions.assertThatCode(() -> enumerator.handleSplitRequest(0, "test-host")).doesNotThrowAnyException();
    }

    @Test
    public void testTriggerScanByTaskRequest() throws Exception {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        scan.allowEnd(false);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).build();
        enumerator.start();
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (int i = 0; i < 4; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, i, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        enumerator.handleSplitRequest(0, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        List assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(0)});
        enumerator.handleSplitRequest(1, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(1)});
    }

    @Test
    public void testNoTriggerWhenReadLatest() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(4);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        scan.allowEnd(false);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).build();
        enumerator.start();
        enumerator.handleSplitRequest(0, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (int i = 0; i < 2; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, i, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        enumerator.handleSplitRequest(0, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).isEmpty();
        enumerator.handleSplitRequest(1, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).isEmpty();
        context.triggerAllActions();
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        List assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(0)});
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(1)});
        splits.clear();
        for (int i = 2; i < 4; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, i, Collections.emptyList()));
        }
        results.put(2L, (TableScan.Plan)new DataFilePlan(splits));
        enumerator.handleSplitRequest(2, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        enumerator.handleSplitRequest(3, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1, 2, 3});
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(2)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(0)});
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(3)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).containsExactly((Object[])new DataSplit[]{(DataSplit)splits.get(1)});
        enumerator.handleSplitRequest(3, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        splits.clear();
        splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot, 7, Collections.emptyList()));
        results.put(3L, (TableScan.Plan)new DataFilePlan(splits));
        enumerator.handleSplitRequest(3, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        assignments = context.getSplitAssignments();
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(3)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).doesNotContain((Object[])new DataSplit[]{(DataSplit)splits.get(0)});
        enumerator.enableTriggerScan();
        enumerator.handleSplitRequest(3, "test-host");
        context.getExecutorService().triggerAllNonPeriodicTasks();
        assignments = context.getSplitAssignments();
        assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(3)).getAssignedSplits();
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(assignedSplits)).contains((Object[])new DataSplit[]{(DataSplit)splits.get(0)});
    }

    @Test
    public void testEnumeratorWithCheckpoint() {
        TestingAsyncSplitEnumeratorContext context = new TestingAsyncSplitEnumeratorContext(1);
        context.registerReader(0, "test-host");
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        HashMap<Long, List<DataSplit>> expectedResults = new HashMap<Long, List<DataSplit>>(4);
        MockScan scan = new MockScan(results);
        for (int i = 1; i <= 4; ++i) {
            List<DataSplit> dataSplits = Collections.singletonList(ContinuousFileSplitEnumeratorTest.createDataSplit(i, 0, Collections.emptyList()));
            results.put(Long.valueOf(i), (TableScan.Plan)new DataFilePlan(dataSplits));
            expectedResults.put(Long.valueOf(i), dataSplits);
        }
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).build();
        enumerator.start();
        AtomicReference<Object> checkpoint = new AtomicReference<Object>();
        context.runInCoordinatorThread(() -> checkpoint.set(ContinuousFileSplitEnumeratorTest.checkpointWithoutException(enumerator, 1L)));
        context.triggerAlCoordinatorAction();
        PendingSplitsCheckpoint state = checkpoint.getAndSet(null);
        Assertions.assertThat((Object)state).isNotNull();
        Assertions.assertThat((Long)state.currentSnapshotId()).isNull();
        Assertions.assertThat((Collection)state.splits()).isEmpty();
        context.triggerAllWorkerAction();
        context.triggerAlCoordinatorAction();
        context.runInCoordinatorThread(() -> checkpoint.set(ContinuousFileSplitEnumeratorTest.checkpointWithoutException(enumerator, 2L)));
        context.triggerAlCoordinatorAction();
        state = checkpoint.getAndSet(null);
        Assertions.assertThat((Object)state).isNotNull();
        Assertions.assertThat((Long)state.currentSnapshotId()).isEqualTo(2L);
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(state.splits())).containsExactlyElementsOf((Iterable)expectedResults.get(1L));
        enumerator.handleSplitRequest(0, "test");
        context.triggerAlCoordinatorAction();
        context.triggerAllWorkerAction();
        context.runInCoordinatorThread(() -> checkpoint.set(ContinuousFileSplitEnumeratorTest.checkpointWithoutException(enumerator, 3L)));
        context.triggerAllWorkerAction();
        context.triggerNextCoordinatorAction();
        context.triggerNextCoordinatorAction();
        state = checkpoint.getAndSet(null);
        Assertions.assertThat((Object)state).isNotNull();
        Assertions.assertThat((Long)state.currentSnapshotId()).isEqualTo(3L);
        Assertions.assertThat(ContinuousFileSplitEnumeratorTest.toDataSplits(state.splits())).containsExactlyElementsOf((Iterable)expectedResults.get(2L));
    }

    @Test
    public void testEnumeratorWithConsumer() throws Exception {
        TestingAsyncSplitEnumeratorContext<FileStoreSourceSplit> context = new TestingAsyncSplitEnumeratorContext<FileStoreSourceSplit>(3);
        for (int i = 0; i < 3; ++i) {
            context.registerReader(i, "test-host");
        }
        TreeMap<Long, TableScan.Plan> dataSplits = new TreeMap<Long, TableScan.Plan>();
        for (int i = 1; i <= 2; ++i) {
            dataSplits.put(Long.valueOf(i), (TableScan.Plan)new DataFilePlan(Arrays.asList(ContinuousFileSplitEnumeratorTest.createDataSplit(i, 0, Collections.emptyList()), ContinuousFileSplitEnumeratorTest.createDataSplit(i, 2, Collections.emptyList()))));
        }
        MockScan scan = new MockScan(dataSplits);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).build();
        enumerator.start();
        long checkpointId = 1L;
        for (int i = 0; i < 3; ++i) {
            enumerator.handleSplitRequest(i, "test-host");
        }
        this.triggerCheckpointAndComplete(enumerator, checkpointId++);
        Assertions.assertThat((Long)scan.getNextSnapshotIdForConsumer()).isNull();
        this.scanNextSnapshot(context);
        this.triggerCheckpointAndComplete(enumerator, checkpointId++);
        Assertions.assertThat((Long)scan.getNextSnapshotIdForConsumer()).isEqualTo(1L);
        enumerator.handleSourceEvent(0, (SourceEvent)new ReaderConsumeProgressEvent(1L));
        this.triggerCheckpointAndComplete(enumerator, checkpointId++);
        Assertions.assertThat((Long)scan.getNextSnapshotIdForConsumer()).isEqualTo(1L);
        enumerator.handleSourceEvent(2, (SourceEvent)new ReaderConsumeProgressEvent(1L));
        this.triggerCheckpointAndComplete(enumerator, checkpointId++);
        Assertions.assertThat((Long)scan.getNextSnapshotIdForConsumer()).isEqualTo(1L);
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(2, "test-host");
        this.triggerCheckpointAndComplete(enumerator, checkpointId++);
        Assertions.assertThat((Long)scan.getNextSnapshotIdForConsumer()).isEqualTo(2L);
        this.scanNextSnapshot(context);
        this.triggerCheckpointAndComplete(enumerator, checkpointId++);
        Assertions.assertThat((Long)scan.getNextSnapshotIdForConsumer()).isEqualTo(2L);
    }

    @Test
    public void testEnumeratorSplitMax() throws Exception {
        int i;
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        TreeMap<Long, TableScan.Plan> results = new TreeMap<Long, TableScan.Plan>();
        MockScan scan = new MockScan(results);
        ContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(scan).withBucketMode(BucketMode.UNAWARE).build();
        enumerator.start();
        long snapshot = 0L;
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (i = 0; i < 16; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot++, i, Collections.emptyList()));
        }
        results.put(1L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        splits = new ArrayList();
        for (i = 0; i < 16; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot++, i, Collections.emptyList()));
        }
        results.put(2L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        splits = new ArrayList();
        for (i = 0; i < 16; ++i) {
            splits.add(ContinuousFileSplitEnumeratorTest.createDataSplit(snapshot++, i, Collections.emptyList()));
        }
        results.put(3L, (TableScan.Plan)new DataFilePlan(splits));
        context.triggerAllActions();
        Assertions.assertThat((int)enumerator.splitAssigner.remainingSplits().size()).isEqualTo(32);
    }

    private void triggerCheckpointAndComplete(ContinuousFileSplitEnumerator enumerator, long checkpointId) throws Exception {
        enumerator.snapshotState(checkpointId);
        enumerator.notifyCheckpointComplete(checkpointId);
    }

    private void scanNextSnapshot(TestingAsyncSplitEnumeratorContext<FileStoreSourceSplit> context) {
        ((TestingAsyncSplitEnumeratorContext)context).workerExecutor.triggerPeriodicScheduledTasks();
        context.triggerAlCoordinatorAction();
    }

    private static PendingSplitsCheckpoint checkpointWithoutException(ContinuousFileSplitEnumerator enumerator, long checkpointId) {
        try {
            return enumerator.snapshotState(checkpointId);
        }
        catch (Exception e) {
            return null;
        }
    }

    private static List<DataSplit> toDataSplits(Collection<FileStoreSourceSplit> splits) {
        return splits.stream().map(FileStoreSourceSplit::split).map(split -> (DataSplit)split).collect(Collectors.toList());
    }

    private static DataSplit createDataSplit(long snapshotId, int bucket, List<DataFileMeta> files) {
        return DataSplit.builder().withSnapshot(snapshotId).withPartition(DataFileTestUtils.row((int)1)).withBucket(bucket).withDataFiles(files).isStreaming(true).build();
    }

    private static class TestingAsyncSplitEnumeratorContext<SplitT extends SourceSplit>
    extends TestingSplitEnumeratorContext<SplitT> {
        private final ManuallyTriggeredScheduledExecutorService workerExecutor = new ManuallyTriggeredScheduledExecutorService();
        private final ExecutorNotifier notifier = new ExecutorNotifier((ScheduledExecutorService)this.workerExecutor, (Executor)super.getExecutorService());

        public TestingAsyncSplitEnumeratorContext(int parallelism) {
            super(parallelism);
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
            this.notifier.notifyReadyAsync(callable, handler);
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period) {
            this.notifier.notifyReadyAsync(callable, handler, initialDelay, period);
        }

        public void triggerAllWorkerAction() {
            this.workerExecutor.triggerPeriodicScheduledTasks();
            this.workerExecutor.triggerAll();
        }

        public void triggerAlCoordinatorAction() {
            super.triggerAllActions();
        }

        public void triggerNextCoordinatorAction() {
            super.getExecutorService().trigger();
        }
    }

    private static class MockScan
    implements StreamTableScan {
        private final TreeMap<Long, TableScan.Plan> results;
        @Nullable
        private Long nextSnapshotId;
        private boolean allowEnd = true;
        private Long nextSnapshotIdForConsumer;

        public MockScan(TreeMap<Long, TableScan.Plan> results) {
            this.results = results;
            this.nextSnapshotId = null;
            this.nextSnapshotIdForConsumer = null;
        }

        public TableScan.Plan plan() {
            Map.Entry<Long, TableScan.Plan> planEntry = this.results.pollFirstEntry();
            if (planEntry == null) {
                if (this.allowEnd) {
                    throw new EndOfScanException();
                }
                return SnapshotNotExistPlan.INSTANCE;
            }
            this.nextSnapshotId = planEntry.getKey() + 1L;
            return planEntry.getValue();
        }

        public List<BinaryRow> listPartitions() {
            throw new UnsupportedOperationException();
        }

        public Long checkpoint() {
            return this.nextSnapshotId;
        }

        public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
            this.nextSnapshotIdForConsumer = nextSnapshot;
        }

        @Nullable
        public Long watermark() {
            return null;
        }

        public void restore(Long state) {
        }

        public void allowEnd(boolean allowEnd) {
            this.allowEnd = allowEnd;
        }

        public Long getNextSnapshotIdForConsumer() {
            return this.nextSnapshotIdForConsumer;
        }
    }

    private static class Builder {
        private SplitEnumeratorContext<FileStoreSourceSplit> context;
        private Collection<FileStoreSourceSplit> initialSplits = Collections.emptyList();
        private long discoveryInterval = Long.MAX_VALUE;
        private StreamTableScan scan;
        private BucketMode bucketMode = BucketMode.FIXED;

        private Builder() {
        }

        public Builder setSplitEnumeratorContext(SplitEnumeratorContext<FileStoreSourceSplit> context) {
            this.context = context;
            return this;
        }

        public Builder setInitialSplits(Collection<FileStoreSourceSplit> initialSplits) {
            this.initialSplits = initialSplits;
            return this;
        }

        public Builder setDiscoveryInterval(long discoveryInterval) {
            this.discoveryInterval = discoveryInterval;
            return this;
        }

        public Builder setScan(StreamTableScan scan) {
            this.scan = scan;
            return this;
        }

        public Builder withBucketMode(BucketMode bucketMode) {
            this.bucketMode = bucketMode;
            return this;
        }

        public ContinuousFileSplitEnumerator build() {
            return new ContinuousFileSplitEnumerator(this.context, this.initialSplits, null, this.discoveryInterval, this.scan, this.bucketMode, 10);
        }
    }
}

