/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.assigner.DefaultSplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
import org.apache.iceberg.flink.source.enumerator.ManualContinuousSplitPlanner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestContinuousIcebergEnumerator {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Test
    public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        Collection pendingSplitsEmpty = enumerator.snapshotState(1L).pendingSplits();
        Assert.assertEquals((long)0L, (long)pendingSplitsEmpty.size());
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Collection pendingSplits = enumerator.snapshotState(2L).pendingSplits();
        Assert.assertEquals((long)1L, (long)pendingSplits.size());
        IcebergSourceSplitState pendingSplit = (IcebergSourceSplitState)pendingSplits.iterator().next();
        Assert.assertEquals((Object)splits.get(0).splitId(), (Object)pendingSplit.split().splitId());
        Assert.assertEquals((Object)IcebergSourceSplitStatus.UNASSIGNED, (Object)pendingSplit.status());
    }

    @Test
    public void testDiscoverWhenReaderRegistered() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assert.assertTrue((boolean)enumerator.snapshotState(1L).pendingSplits().isEmpty());
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).contains((Object[])new IcebergSourceSplit[]{splits.get(0)});
    }

    @Test
    public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        enumeratorContext.registeredReaders().remove(2);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        Assert.assertEquals((long)1L, (long)splits.size());
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assert.assertFalse((boolean)enumeratorContext.getSplitAssignments().containsKey(2));
        List pendingSplitIds = enumerator.snapshotState(1L).pendingSplits().stream().map(IcebergSourceSplitState::split).map(IcebergSourceSplit::splitId).collect(Collectors.toList());
        Assert.assertEquals((long)splits.size(), (long)pendingSplitIds.size());
        Assert.assertEquals((Object)splits.get(0).splitId(), pendingSplitIds.get(0));
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        Assert.assertTrue((boolean)enumerator.snapshotState(2L).pendingSplits().isEmpty());
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).contains((Object[])new IcebergSourceSplit[]{splits.get(0)});
    }

    @Test
    public void testThrottlingDiscovery() throws Exception {
        int i;
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 1);
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        splitPlanner.addSplits(Arrays.asList(splits.get(0)));
        enumeratorContext.triggerAllActions();
        Assert.assertEquals((long)0L, (long)enumerator.snapshotState(1L).pendingSplits().size());
        Assert.assertEquals(splits.subList(0, 1), (Object)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits());
        for (i = 1; i < 10; ++i) {
            splitPlanner.addSplits(Arrays.asList(splits.get(i)));
            enumeratorContext.triggerAllActions();
        }
        Assert.assertEquals((long)3L, (long)enumerator.snapshotState(2L).pendingSplits().size());
        Assert.assertEquals(splits.subList(0, 1), (Object)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits());
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
        enumeratorContext.triggerAllActions();
        Assert.assertEquals((long)3L, (long)enumerator.snapshotState(3L).pendingSplits().size());
        Assert.assertEquals(splits.subList(0, 2), (Object)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits());
        for (i = 0; i < 3; ++i) {
            enumeratorContext.triggerAllActions();
        }
        Assert.assertEquals((long)3L, (long)enumerator.snapshotState(4L).pendingSplits().size());
        Assert.assertEquals(splits.subList(0, 2), (Object)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits());
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
        enumeratorContext.triggerAllActions();
        Assert.assertEquals((long)3L, (long)enumerator.snapshotState(5L).pendingSplits().size());
        Assert.assertEquals(splits.subList(0, 3), (Object)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits());
    }

    @Test
    public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).maxAllowedPlanningFailures(2).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assert.assertEquals((long)0L, (long)enumerator.snapshotState(2L).pendingSplits().size());
        enumeratorContext.triggerAllActions();
        Collection pendingSplits = enumerator.snapshotState(3L).pendingSplits();
        Assert.assertEquals((long)1L, (long)pendingSplits.size());
        IcebergSourceSplitState pendingSplit = (IcebergSourceSplitState)pendingSplits.iterator().next();
        Assert.assertEquals((Object)splits.get(0).splitId(), (Object)pendingSplit.split().splitId());
        Assert.assertEquals((Object)IcebergSourceSplitStatus.UNASSIGNED, (Object)pendingSplit.status());
    }

    @Test
    public void testOverMaxAllowedPlanningErrors() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).maxAllowedPlanningFailures(1).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
        TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assert.assertFalse((boolean)((ScheduledFuture)enumeratorContext.getExecutorService().getAllScheduledTasks().get(0)).isDone());
        enumeratorContext.triggerAllActions();
        Assert.assertTrue((boolean)((ScheduledFuture)enumeratorContext.getExecutorService().getAllScheduledTasks().get(0)).isDone());
        Assertions.assertThatThrownBy(() -> ((ScheduledFuture)enumeratorContext.getExecutorService().getAllScheduledTasks().get(0)).get()).hasCauseInstanceOf(RuntimeException.class).hasMessageContaining("Failed to discover new split");
    }

    @Test
    public void testPlanningIgnoringErrors() throws Exception {
        Collection pendingSplits;
        int expectedFailures = 3;
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).maxAllowedPlanningFailures(-1).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, expectedFailures);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        splitPlanner.addSplits(splits);
        for (int i = 0; i < expectedFailures; ++i) {
            enumeratorContext.triggerAllActions();
            pendingSplits = enumerator.snapshotState((long)i).pendingSplits();
            Assert.assertEquals((long)0L, (long)pendingSplits.size());
        }
        enumeratorContext.triggerAllActions();
        pendingSplits = enumerator.snapshotState((long)(expectedFailures + 1)).pendingSplits();
        Assert.assertEquals((long)1L, (long)pendingSplits.size());
        IcebergSourceSplitState pendingSplit = (IcebergSourceSplitState)pendingSplits.iterator().next();
        Assert.assertEquals((Object)splits.get(0).splitId(), (Object)pendingSplit.split().splitId());
        Assert.assertEquals((Object)IcebergSourceSplitStatus.UNASSIGNED, (Object)pendingSplit.status());
    }

    private static ContinuousIcebergEnumerator createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> context, ScanContext scanContext, ContinuousSplitPlanner splitPlanner) {
        ContinuousIcebergEnumerator enumerator = new ContinuousIcebergEnumerator(context, (SplitAssigner)new DefaultSplitAssigner(null, Collections.emptyList()), scanContext, splitPlanner, null);
        enumerator.start();
        return enumerator;
    }
}

