/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingMonitorFunction;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.UnmodifiableIterator;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestStreamingMonitorFunction
extends TestBase {
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.required((int)2, (String)"data", (Type)Types.StringType.get())});
    private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;
    private static final long WAIT_TIME_MILLIS = 10000L;

    @Parameters(name="formatVersion = {0}")
    protected static List<Object> parameters() {
        return Arrays.asList(1, 2);
    }

    @BeforeEach
    public void setupTable() throws IOException {
        this.tableDir = Files.createTempDirectory(this.temp, "junit", new FileAttribute[0]).toFile();
        this.metadataDir = new File(this.tableDir, "metadata");
        Assertions.assertThat((boolean)this.tableDir.delete()).isTrue();
        this.table = this.create(SCHEMA, PartitionSpec.unpartitioned());
    }

    private void runSourceFunctionInTask(TestSourceContext sourceContext, StreamingMonitorFunction function) {
        Thread task = new Thread(() -> {
            try {
                function.run((SourceFunction.SourceContext)sourceContext);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        task.start();
    }

    @TestTemplate
    public void testConsumeWithoutStartSnapshotId() throws Exception {
        List<List<Record>> recordsList = this.generateRecordsAndCommitTxn(10);
        ScanContext scanContext = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).build();
        StreamingMonitorFunction function = this.createFunction(scanContext);
        try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
            this.runSourceFunctionInTask(sourceContext, function);
            this.awaitExpectedSplits(sourceContext);
            function.close();
            TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList((Iterable)Iterables.concat(recordsList)), SCHEMA);
        }
    }

    @TestTemplate
    public void testConsumeFromStartSnapshotId() throws Exception {
        this.generateRecordsAndCommitTxn(5);
        long startSnapshotId = this.table.currentSnapshot().snapshotId();
        List<List<Record>> recordsList = this.generateRecordsAndCommitTxn(5);
        ScanContext scanContext = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).startSnapshotId(Long.valueOf(startSnapshotId)).build();
        StreamingMonitorFunction function = this.createFunction(scanContext);
        try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
            this.runSourceFunctionInTask(sourceContext, function);
            this.awaitExpectedSplits(sourceContext);
            function.close();
            TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList((Iterable)Iterables.concat(recordsList)), SCHEMA);
        }
    }

    @TestTemplate
    public void testConsumeFromStartTag() throws Exception {
        this.generateRecordsAndCommitTxn(5);
        long startSnapshotId = this.table.currentSnapshot().snapshotId();
        String tagName = "t1";
        this.table.manageSnapshots().createTag(tagName, startSnapshotId).commit();
        List<List<Record>> recordsList = this.generateRecordsAndCommitTxn(5);
        ScanContext scanContext = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).startTag(tagName).build();
        StreamingMonitorFunction function = this.createFunction(scanContext);
        try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = this.createHarness(function);){
            harness.setup();
            harness.open();
            TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
            this.runSourceFunctionInTask(sourceContext, function);
            this.awaitExpectedSplits(sourceContext);
            function.close();
            TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList((Iterable)Iterables.concat(recordsList)), SCHEMA);
        }
    }

    @TestTemplate
    public void testCheckpointRestore() throws Exception {
        OperatorSubtaskState state;
        List<List<Record>> recordsList = this.generateRecordsAndCommitTxn(10);
        ScanContext scanContext = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).build();
        StreamingMonitorFunction func = this.createFunction(scanContext);
        try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = this.createHarness(func);){
            harness.setup();
            harness.open();
            TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
            this.runSourceFunctionInTask(sourceContext, func);
            this.awaitExpectedSplits(sourceContext);
            state = harness.snapshot(1L, 1L);
            func.close();
            TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList((Iterable)Iterables.concat(recordsList)), SCHEMA);
        }
        List<List<Record>> newRecordsList = this.generateRecordsAndCommitTxn(10);
        StreamingMonitorFunction newFunc = this.createFunction(scanContext);
        try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = this.createHarness(newFunc);){
            harness.setup();
            harness.initializeState(state);
            harness.open();
            TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
            this.runSourceFunctionInTask(sourceContext, newFunc);
            this.awaitExpectedSplits(sourceContext);
            newFunc.close();
            TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList((Iterable)Iterables.concat(newRecordsList)), SCHEMA);
        }
    }

    private void awaitExpectedSplits(TestSourceContext sourceContext) {
        Awaitility.await((String)"expected splits should be produced").atMost(Duration.ofMillis(10000L)).untilAsserted(() -> {
            Assertions.assertThat((long)sourceContext.latch.getCount()).isEqualTo(0L);
            ((ListAssert)Assertions.assertThat((List)sourceContext.splits).as("Should produce the expected splits", new Object[0])).hasSize(1);
        });
    }

    @TestTemplate
    public void testInvalidMaxPlanningSnapshotCount() {
        ScanContext scanContext1 = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).maxPlanningSnapshotCount(0).build();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.createFunction(scanContext1)).isInstanceOf(IllegalArgumentException.class)).hasMessage("The max-planning-snapshot-count must be greater than zero");
        ScanContext scanContext2 = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).maxPlanningSnapshotCount(-10).build();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.createFunction(scanContext2)).isInstanceOf(IllegalArgumentException.class)).hasMessage("The max-planning-snapshot-count must be greater than zero");
    }

    @TestTemplate
    public void testConsumeWithMaxPlanningSnapshotCount() throws Exception {
        this.generateRecordsAndCommitTxn(10);
        long oldestSnapshotId = SnapshotUtil.oldestAncestor((Table)this.table).snapshotId();
        ScanContext scanContext = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).splitSize(Long.valueOf(1000L)).startSnapshotId(Long.valueOf(oldestSnapshotId)).maxPlanningSnapshotCount(Integer.MAX_VALUE).build();
        Object[] expectedSplits = FlinkSplitPlanner.planInputSplits((Table)this.table, (ScanContext)scanContext, (ExecutorService)ThreadPools.getWorkerPool());
        Assertions.assertThat((Object[])expectedSplits).hasSize(9);
        UnmodifiableIterator unmodifiableIterator = ImmutableList.of((Object)1, (Object)9, (Object)15).iterator();
        while (unmodifiableIterator.hasNext()) {
            int maxPlanningSnapshotCount = (Integer)unmodifiableIterator.next();
            scanContext = ScanContext.builder().monitorInterval(Duration.ofMillis(500L)).startSnapshotId(Long.valueOf(oldestSnapshotId)).splitSize(Long.valueOf(1000L)).maxPlanningSnapshotCount(maxPlanningSnapshotCount).build();
            StreamingMonitorFunction function = this.createFunction(scanContext);
            AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = this.createHarness(function);
            Throwable throwable = null;
            try {
                harness.setup();
                harness.open();
                CountDownLatch latch = new CountDownLatch(1);
                TestSourceContext sourceContext = new TestSourceContext(latch);
                function.sourceContext((SourceFunction.SourceContext)sourceContext);
                function.monitorAndForwardSplits();
                if (maxPlanningSnapshotCount >= 10) continue;
                Assertions.assertThat((List)sourceContext.splits).hasSize(maxPlanningSnapshotCount);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (harness == null) continue;
                TestStreamingMonitorFunction.$closeResource(throwable, harness);
            }
        }
    }

    private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
        ArrayList expectedRecords = Lists.newArrayList();
        for (int i = 0; i < commitTimes; ++i) {
            List records = RandomGenericData.generate((Schema)SCHEMA, (int)100, (long)0L);
            expectedRecords.add(records);
            this.writeRecords(records);
        }
        return expectedRecords;
    }

    private void writeRecords(List<Record> records) throws IOException {
        GenericAppenderHelper appender = new GenericAppenderHelper((Table)this.table, DEFAULT_FORMAT, this.temp);
        appender.appendToTable(records);
    }

    private StreamingMonitorFunction createFunction(ScanContext scanContext) {
        return new StreamingMonitorFunction(TestTableLoader.of(this.tableDir.getAbsolutePath()), scanContext);
    }

    private AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness(StreamingMonitorFunction function) throws Exception {
        StreamSource streamSource = new StreamSource((SourceFunction)function);
        return new AbstractStreamOperatorTestHarness((StreamOperator)streamSource, 1, 1, 0);
    }

    private class TestSourceContext
    implements SourceFunction.SourceContext<FlinkInputSplit> {
        private final List<FlinkInputSplit> splits = Lists.newArrayList();
        private final Object checkpointLock = new Object();
        private final CountDownLatch latch;

        TestSourceContext(CountDownLatch latch) {
            this.latch = latch;
        }

        public void collect(FlinkInputSplit element) {
            this.splits.add(element);
            this.latch.countDown();
        }

        public void collectWithTimestamp(FlinkInputSplit element, long timestamp) {
            this.collect(element);
        }

        public void emitWatermark(Watermark mark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        public void close() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<Row> toRows() throws IOException {
            FlinkInputFormat format = FlinkSource.forRowData().tableLoader(TestTableLoader.of(TestStreamingMonitorFunction.this.tableDir.getAbsolutePath())).buildFormat();
            ArrayList rows = Lists.newArrayList();
            for (FlinkInputSplit split : this.splits) {
                format.open(split);
                RowData element = null;
                try {
                    while (!format.reachedEnd()) {
                        element = format.nextRecord(element);
                        rows.add(Row.of((Object[])new Object[]{element.getInt(0), element.getString(1).toString()}));
                    }
                }
                finally {
                    format.close();
                }
            }
            return rows;
        }
    }
}

