/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.assigner.DefaultSplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
import org.apache.iceberg.flink.source.enumerator.ManualContinuousSplitPlanner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestContinuousIcebergEnumerator {
    @TempDir
    protected Path temporaryFolder;

    @Test
    public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        Collection pendingSplitsEmpty = enumerator.snapshotState(1L).pendingSplits();
        Assertions.assertThat((Collection)pendingSplitsEmpty).isEmpty();
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(this.temporaryFolder, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Collection pendingSplits = enumerator.snapshotState(2L).pendingSplits();
        Assertions.assertThat((Collection)pendingSplits).hasSize(1);
        IcebergSourceSplitState pendingSplit = (IcebergSourceSplitState)pendingSplits.iterator().next();
        Assertions.assertThat((String)pendingSplit.split().splitId()).isEqualTo(splits.get(0).splitId());
        Assertions.assertThat((Comparable)pendingSplit.status()).isEqualTo((Object)IcebergSourceSplitStatus.UNASSIGNED);
    }

    @Test
    public void testDiscoverWhenReaderRegistered() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(this.temporaryFolder, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((Collection)enumerator.snapshotState(1L).pendingSplits()).isEmpty();
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).contains((Object[])new IcebergSourceSplit[]{splits.get(0)});
    }

    @Test
    public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        enumeratorContext.registeredReaders().remove(2);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(this.temporaryFolder, 1, 1);
        Assertions.assertThat(splits).hasSize(1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((Map)enumeratorContext.getSplitAssignments()).doesNotContainKey((Object)2);
        List pendingSplitIds = enumerator.snapshotState(1L).pendingSplits().stream().map(IcebergSourceSplitState::split).map(IcebergSourceSplit::splitId).collect(Collectors.toList());
        ((ObjectAssert)((ListAssert)Assertions.assertThat(pendingSplitIds).hasSameSizeAs(splits)).first()).isEqualTo((Object)splits.get(0).splitId());
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        Assertions.assertThat((Collection)enumerator.snapshotState(2L).pendingSplits()).isEmpty();
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).contains((Object[])new IcebergSourceSplit[]{splits.get(0)});
    }

    @Test
    public void testThrottlingDiscovery() throws Exception {
        int i;
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(this.temporaryFolder, 10, 1);
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        enumeratorContext.registerReader(2, "localhost");
        enumerator.addReader(2);
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent());
        splitPlanner.addSplits(Arrays.asList(splits.get(0)));
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((Collection)enumerator.snapshotState(1L).pendingSplits()).isEmpty();
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).containsExactlyElementsOf(splits.subList(0, 1));
        for (i = 1; i < 10; ++i) {
            splitPlanner.addSplits(Arrays.asList(splits.get(i)));
            enumeratorContext.triggerAllActions();
        }
        Assertions.assertThat((Collection)enumerator.snapshotState(2L).pendingSplits()).hasSize(3);
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).containsExactlyElementsOf(splits.subList(0, 1));
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((Collection)enumerator.snapshotState(3L).pendingSplits()).hasSize(3);
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).containsExactlyElementsOf(splits.subList(0, 2));
        for (i = 0; i < 3; ++i) {
            enumeratorContext.triggerAllActions();
        }
        Assertions.assertThat((Collection)enumerator.snapshotState(4L).pendingSplits()).hasSize(3);
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).containsExactlyElementsOf(splits.subList(0, 2));
        enumerator.handleSourceEvent(2, (SourceEvent)new SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((Collection)enumerator.snapshotState(5L).pendingSplits()).hasSize(3);
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)enumeratorContext.getSplitAssignments().get(2)).getAssignedSplits()).containsExactlyElementsOf(splits.subList(0, 3));
    }

    @Test
    public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).maxAllowedPlanningFailures(2).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(this.temporaryFolder, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((Collection)enumerator.snapshotState(2L).pendingSplits()).isEmpty();
        enumeratorContext.triggerAllActions();
        Collection pendingSplits = enumerator.snapshotState(3L).pendingSplits();
        Assertions.assertThat((Collection)pendingSplits).hasSize(1);
        IcebergSourceSplitState pendingSplit = (IcebergSourceSplitState)pendingSplits.iterator().next();
        Assertions.assertThat((String)pendingSplit.split().splitId()).isEqualTo(splits.get(0).splitId());
        Assertions.assertThat((Comparable)pendingSplit.status()).isEqualTo((Object)IcebergSourceSplitStatus.UNASSIGNED);
    }

    @Test
    public void testOverMaxAllowedPlanningErrors() throws Exception {
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).maxAllowedPlanningFailures(1).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
        TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(this.temporaryFolder, 1, 1);
        splitPlanner.addSplits(splits);
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((boolean)((ScheduledFuture)enumeratorContext.getExecutorService().getAllScheduledTasks().get(0)).isDone()).isFalse();
        enumeratorContext.triggerAllActions();
        Assertions.assertThat((boolean)((ScheduledFuture)enumeratorContext.getExecutorService().getAllScheduledTasks().get(0)).isDone()).isTrue();
        Assertions.assertThatThrownBy(() -> ((ScheduledFuture)enumeratorContext.getExecutorService().getAllScheduledTasks().get(0)).get()).hasCauseInstanceOf(RuntimeException.class).hasMessageContaining("Failed to discover new split");
    }

    @Test
    public void testPlanningIgnoringErrors() throws Exception {
        Collection pendingSplits;
        int expectedFailures = 3;
        TestingSplitEnumeratorContext enumeratorContext = new TestingSplitEnumeratorContext(4);
        ScanContext scanContext = ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).maxPlanningSnapshotCount(1).maxAllowedPlanningFailures(-1).build();
        ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, expectedFailures);
        ContinuousIcebergEnumerator enumerator = TestContinuousIcebergEnumerator.createEnumerator((SplitEnumeratorContext<IcebergSourceSplit>)enumeratorContext, scanContext, splitPlanner);
        List<IcebergSourceSplit> splits = SplitHelpers.createSplitsFromTransientHadoopTable(this.temporaryFolder, 1, 1);
        splitPlanner.addSplits(splits);
        for (int i = 0; i < expectedFailures; ++i) {
            enumeratorContext.triggerAllActions();
            pendingSplits = enumerator.snapshotState((long)i).pendingSplits();
            Assertions.assertThat((Collection)pendingSplits).isEmpty();
        }
        enumeratorContext.triggerAllActions();
        pendingSplits = enumerator.snapshotState((long)(expectedFailures + 1)).pendingSplits();
        Assertions.assertThat((Collection)pendingSplits).hasSize(1);
        IcebergSourceSplitState pendingSplit = (IcebergSourceSplitState)pendingSplits.iterator().next();
        Assertions.assertThat((String)pendingSplit.split().splitId()).isEqualTo(splits.get(0).splitId());
        Assertions.assertThat((Comparable)pendingSplit.status()).isEqualTo((Object)IcebergSourceSplitStatus.UNASSIGNED);
    }

    private static ContinuousIcebergEnumerator createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> context, ScanContext scanContext, ContinuousSplitPlanner splitPlanner) {
        ContinuousIcebergEnumerator enumerator = new ContinuousIcebergEnumerator(context, (SplitAssigner)new DefaultSplitAssigner(null, Collections.emptyList()), scanContext, splitPlanner, null);
        enumerator.start();
        return enumerator;
    }
}

