/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingReaderOperator;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestStreamingReaderOperator
extends TestBase {
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.required((int)2, (String)"data", (Type)Types.StringType.get())});
    private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;

    @Parameters(name="formatVersion = {0}")
    protected static List<Object> parameters() {
        return Arrays.asList(1, 2);
    }

    @BeforeEach
    public void setupTable() throws IOException {
        this.tableDir = Files.createTempDirectory(this.temp, "junit", new FileAttribute[0]).toFile();
        this.metadataDir = new File(this.tableDir, "metadata");
        Assertions.assertThat((boolean)this.tableDir.delete()).isTrue();
        this.table = this.create(SCHEMA, PartitionSpec.unpartitioned());
    }

    @TestTemplate
    public void testProcessAllRecords() throws Exception {
        List<List<Record>> expectedRecords = this.generateRecordsAndCommitTxn(10);
        List<FlinkInputSplit> splits = this.generateSplits();
        Assertions.assertThat(splits).hasSize(10);
        try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = this.createReader();){
            harness.setup();
            harness.open();
            SteppingMailboxProcessor processor = this.createLocalMailbox(harness);
            ArrayList expected = Lists.newArrayList();
            for (int i = 0; i < splits.size(); ++i) {
                harness.processElement((Object)splits.get(i), -1L);
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)processor.runMailboxStep()).as("Should processed 1 split", new Object[0])).isTrue();
                expected.addAll((Collection)expectedRecords.get(i));
                TestHelpers.assertRecords(this.readOutputValues(harness), expected, SCHEMA);
            }
        }
    }

    @TestTemplate
    public void testTriggerCheckpoint() throws Exception {
        List<List<Record>> expectedRecords = this.generateRecordsAndCommitTxn(3);
        List<FlinkInputSplit> splits = this.generateSplits();
        Assertions.assertThat(splits).hasSize(3);
        long timestamp = 0L;
        try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = this.createReader();){
            harness.setup();
            harness.open();
            SteppingMailboxProcessor processor = this.createLocalMailbox(harness);
            harness.processElement((Object)splits.get(0), ++timestamp);
            harness.processElement((Object)splits.get(1), ++timestamp);
            harness.processElement((Object)splits.get(2), ++timestamp);
            processor.getMainMailboxExecutor().execute(() -> harness.snapshot(1L, 3L), "Trigger snapshot");
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)processor.runMailboxStep()).as("Should have processed the split0", new Object[0])).isTrue();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)processor.runMailboxStep()).as("Should have processed the snapshot state action", new Object[0])).isTrue();
            TestHelpers.assertRecords(this.readOutputValues(harness), expectedRecords.get(0), SCHEMA);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)processor.runMailboxStep()).as("Should have processed the split1", new Object[0])).isTrue();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)processor.runMailboxStep()).as("Should have processed the split2", new Object[0])).isTrue();
            TestHelpers.assertRecords(this.readOutputValues(harness), Lists.newArrayList((Iterable)Iterables.concat(expectedRecords)), SCHEMA);
        }
    }

    @TestTemplate
    public void testCheckpointRestore() throws Exception {
        OperatorSubtaskState state;
        int i;
        SteppingMailboxProcessor localMailbox;
        List<List<Record>> expectedRecords = this.generateRecordsAndCommitTxn(15);
        List<FlinkInputSplit> splits = this.generateSplits();
        Assertions.assertThat(splits).hasSize(15);
        ArrayList expected = Lists.newArrayList();
        try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = this.createReader();){
            harness.setup();
            harness.open();
            for (FlinkInputSplit split : splits) {
                harness.processElement((Object)split, -1L);
            }
            localMailbox = this.createLocalMailbox(harness);
            for (i = 0; i < 5; ++i) {
                expected.addAll((Collection)expectedRecords.get(i));
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)localMailbox.runMailboxStep()).as("Should have processed the split#" + i, new Object[0])).isTrue();
                TestHelpers.assertRecords(this.readOutputValues(harness), expected, SCHEMA);
            }
            state = harness.snapshot(1L, 1L);
        }
        expected.clear();
        harness = this.createReader();
        var6_5 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            localMailbox = this.createLocalMailbox(harness);
            for (i = 5; i < 10; ++i) {
                expected.addAll((Collection)expectedRecords.get(i));
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)localMailbox.runMailboxStep()).as("Should have processed the split#" + i, new Object[0])).isTrue();
                TestHelpers.assertRecords(this.readOutputValues(harness), expected, SCHEMA);
            }
            for (i = 10; i < 15; ++i) {
                expected.addAll((Collection)expectedRecords.get(i));
                harness.processElement((Object)splits.get(i), 1L);
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)localMailbox.runMailboxStep()).as("Should have processed the split#" + i, new Object[0])).isTrue();
                TestHelpers.assertRecords(this.readOutputValues(harness), expected, SCHEMA);
            }
        }
        catch (Throwable throwable) {
            var6_5 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                TestStreamingReaderOperator.$closeResource(var6_5, harness);
            }
        }
    }

    private List<Row> readOutputValues(OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness) {
        ArrayList results = Lists.newArrayList();
        for (RowData rowData : harness.extractOutputValues()) {
            results.add(Row.of((Object[])new Object[]{rowData.getInt(0), rowData.getString(1).toString()}));
        }
        return results;
    }

    private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
        ArrayList expectedRecords = Lists.newArrayList();
        for (int i = 0; i < commitTimes; ++i) {
            List records = RandomGenericData.generate((Schema)SCHEMA, (int)100, (long)0L);
            expectedRecords.add(records);
            this.writeRecords(records);
        }
        return expectedRecords;
    }

    private void writeRecords(List<Record> records) throws IOException {
        GenericAppenderHelper appender = new GenericAppenderHelper((Table)this.table, DEFAULT_FORMAT, this.temp);
        appender.appendToTable(records);
    }

    private List<FlinkInputSplit> generateSplits() {
        ArrayList inputSplits = Lists.newArrayList();
        List snapshotIds = SnapshotUtil.currentAncestorIds((Table)this.table);
        for (int i = snapshotIds.size() - 1; i >= 0; --i) {
            ScanContext scanContext = i == snapshotIds.size() - 1 ? ScanContext.builder().useSnapshotId((Long)snapshotIds.get(i)).build() : ScanContext.builder().startSnapshotId((Long)snapshotIds.get(i + 1)).endSnapshotId((Long)snapshotIds.get(i)).build();
            Collections.addAll(inputSplits, FlinkSplitPlanner.planInputSplits((Table)this.table, (ScanContext)scanContext, (ExecutorService)ThreadPools.getWorkerPool()));
        }
        return inputSplits;
    }

    private OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> createReader() throws Exception {
        FlinkInputFormat inputFormat = FlinkSource.forRowData().tableLoader(TestTableLoader.of(this.tableDir.getAbsolutePath())).buildFormat();
        OneInputStreamOperatorFactory factory = StreamingReaderOperator.factory((FlinkInputFormat)inputFormat);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(factory, 1, 1, 0);
        harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        return harness;
    }

    private SteppingMailboxProcessor createLocalMailbox(OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness) {
        return new SteppingMailboxProcessor(MailboxDefaultAction.Controller::suspendDefaultAction, harness.getTaskMailbox(), StreamTaskActionExecutor.IMMEDIATE);
    }
}

