/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceContinuous {
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER, "default", "t", TestFixtures.SCHEMA);
    private final AtomicLong randomSeed = new AtomicLong(0L);

    @Test
    public void testTableScanThenIncremental() throws Exception {
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List batch1 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch1);
        ScanContext scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        try (CloseableIterator iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());){
            List<Row> result1 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result1, batch1, this.tableResource.table().schema());
            List batch2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch2);
            this.tableResource.table().currentSnapshot().snapshotId();
            List<Row> result2 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result2, batch2, this.tableResource.table().schema());
            List batch3 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch3);
            this.tableResource.table().currentSnapshot().snapshotId();
            List<Row> result3 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result3, batch3, this.tableResource.table().schema());
        }
    }

    @Test
    public void testTableScanThenIncrementalAfterExpiration() throws Exception {
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List batch1 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch1);
        long snapshotId = this.tableResource.table().currentSnapshot().snapshotId();
        List batch2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch2);
        this.tableResource.table().expireSnapshots().expireSnapshotId(snapshotId).commit();
        Assert.assertEquals((long)1L, (long)this.tableResource.table().history().size());
        ScanContext scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        Assert.assertEquals((Object)FlinkSplitPlanner.ScanMode.BATCH, (Object)FlinkSplitPlanner.checkScanMode((ScanContext)scanContext));
        try (CloseableIterator iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());){
            List<Row> result1 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 4);
            ArrayList initialRecords = Lists.newArrayList();
            initialRecords.addAll(batch1);
            initialRecords.addAll(batch2);
            TestHelpers.assertRecords(result1, initialRecords, this.tableResource.table().schema());
            List batch3 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch3);
            this.tableResource.table().currentSnapshot().snapshotId();
            List<Row> result3 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result3, batch3, this.tableResource.table().schema());
        }
    }

    @Test
    public void testEarliestSnapshot() throws Exception {
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List batch0 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch0);
        List batch1 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch1);
        ScanContext scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).build();
        try (CloseableIterator iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());){
            List<Row> result1 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 4);
            ArrayList combinedBatch0AndBatch1 = Lists.newArrayList((Iterable)batch0);
            combinedBatch0AndBatch1.addAll(batch1);
            TestHelpers.assertRecords(result1, combinedBatch0AndBatch1, this.tableResource.table().schema());
            List batch2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch2);
            List<Row> result2 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result2, batch2, this.tableResource.table().schema());
            List batch3 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch3);
            List<Row> result3 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result3, batch3, this.tableResource.table().schema());
        }
    }

    @Test
    public void testLatestSnapshot() throws Exception {
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List batch0 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch0);
        List batch1 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch1);
        ScanContext scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT).build();
        try (CloseableIterator iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());){
            TestIcebergSourceContinuous.waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient());
            List<Row> result1 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result1, batch1, this.tableResource.table().schema());
            List batch2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch2);
            List<Row> result2 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result2, batch2, this.tableResource.table().schema());
            List batch3 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch3);
            List<Row> result3 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result3, batch3, this.tableResource.table().schema());
        }
    }

    @Test
    public void testSpecificSnapshotId() throws Exception {
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List batch0 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch0);
        long snapshot0 = this.tableResource.table().currentSnapshot().snapshotId();
        List batch1 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch1);
        long snapshot1 = this.tableResource.table().currentSnapshot().snapshotId();
        ScanContext scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID).startSnapshotId(Long.valueOf(snapshot1)).build();
        try (CloseableIterator iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());){
            List<Row> result1 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result1, batch1, this.tableResource.table().schema());
            List batch2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch2);
            List<Row> result2 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result2, batch2, this.tableResource.table().schema());
            List batch3 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch3);
            List<Row> result3 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result3, batch3, this.tableResource.table().schema());
        }
    }

    @Test
    public void testSpecificSnapshotTimestamp() throws Exception {
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List batch0 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch0);
        long snapshot0Timestamp = this.tableResource.table().currentSnapshot().timestampMillis();
        Thread.sleep(2L);
        List batch1 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batch1);
        long snapshot1Timestamp = this.tableResource.table().currentSnapshot().timestampMillis();
        ScanContext scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP).startSnapshotTimestamp(Long.valueOf(snapshot1Timestamp)).build();
        try (CloseableIterator iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());){
            List<Row> result1 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result1, batch1, this.tableResource.table().schema());
            List batch2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch2);
            List<Row> result2 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result2, batch2, this.tableResource.table().schema());
            List batch3 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batch3);
            List<Row> result3 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result3, batch3, this.tableResource.table().schema());
        }
    }

    @Test
    public void testReadingFromBranch() throws Exception {
        List<Row> resultMain;
        String branch = "b1";
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List batchBase = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(batchBase);
        this.tableResource.table().manageSnapshots().createBranch(branch, this.tableResource.table().currentSnapshot().snapshotId()).commit();
        List batch1 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(branch, batch1);
        List batch2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
        dataAppender.appendToTable(branch, batch2);
        ArrayList branchExpectedRecords = Lists.newArrayList();
        branchExpectedRecords.addAll(batchBase);
        branchExpectedRecords.addAll(batch1);
        branchExpectedRecords.addAll(batch2);
        ScanContext scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).useBranch(branch).build();
        try (CloseableIterator iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());){
            resultMain = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 6);
            TestHelpers.assertRecords(resultMain, branchExpectedRecords, this.tableResource.table().schema());
            List batch3 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(branch, batch3);
            List<Row> result3 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result3, batch3, this.tableResource.table().schema());
            List batch4 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(branch, batch4);
            List<Row> result4 = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(result4, batch4, this.tableResource.table().schema());
        }
        scanContext = ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
        iter = this.createStream(scanContext).executeAndCollect(this.getClass().getSimpleName());
        var9_9 = null;
        try {
            resultMain = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(resultMain, batchBase, this.tableResource.table().schema());
            List batchMain2 = RandomGenericData.generate((Schema)this.tableResource.table().schema(), (int)2, (long)this.randomSeed.incrementAndGet());
            dataAppender.appendToTable(batchMain2);
            resultMain = TestIcebergSourceContinuous.waitForResult((CloseableIterator<Row>)iter, 2);
            TestHelpers.assertRecords(resultMain, batchMain2, this.tableResource.table().schema());
        }
        catch (Throwable throwable) {
            var9_9 = throwable;
            throw throwable;
        }
        finally {
            if (iter != null) {
                TestIcebergSourceContinuous.$closeResource(var9_9, (AutoCloseable)iter);
            }
        }
    }

    private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator stream = env.fromSource((Source)IcebergSource.forRowData().tableLoader(this.tableResource.tableLoader()).assignerFactory((SplitAssignerFactory)new SimpleSplitAssignerFactory()).streaming(scanContext.isStreaming()).streamingStartingStrategy(scanContext.streamingStartingStrategy()).startSnapshotTimestamp(scanContext.startSnapshotTimestamp()).startSnapshotId(scanContext.startSnapshotId()).monitorInterval(Duration.ofMillis(10L)).branch(scanContext.branch()).build(), WatermarkStrategy.noWatermarks(), "icebergSource", TypeInformation.of(RowData.class)).map((MapFunction)new RowDataToRowMapper(FlinkSchemaUtil.convert((Schema)this.tableResource.table().schema())));
        return stream;
    }

    public static List<Row> waitForResult(CloseableIterator<Row> iter, int limit) {
        ArrayList results = Lists.newArrayListWithCapacity((int)limit);
        while (results.size() < limit && iter.hasNext()) {
            results.add((Row)iter.next());
        }
        return results;
    }

    public static void waitUntilJobIsRunning(ClusterClient<?> client) {
        Awaitility.await((String)"job should be running").atMost(Duration.ofSeconds(30L)).pollInterval(Duration.ofMillis(10L)).untilAsserted(() -> {
            ListAssert cfr_ignored_0 = (ListAssert)Assertions.assertThat(TestIcebergSourceContinuous.getRunningJobs(client)).isNotEmpty();
        });
    }

    public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
        Collection statusMessages = (Collection)client.listJobs().get();
        return statusMessages.stream().filter(status -> status.getJobState() == JobStatus.RUNNING).map(JobStatusMessage::getJobId).collect(Collectors.toList());
    }
}

