/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.sink.IcebergStreamWriter;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableSupplier;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestIcebergStreamWriter {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private Table table;
    private final FileFormat format;
    private final boolean partitioned;

    @Parameterized.Parameters(name="format = {0}, partitioned = {1}")
    public static Object[][] parameters() {
        return new Object[][]{{"avro", true}, {"avro", false}, {"orc", true}, {"orc", false}, {"parquet", true}, {"parquet", false}};
    }

    public TestIcebergStreamWriter(String format, boolean partitioned) {
        this.format = FileFormat.fromString((String)format);
        this.partitioned = partitioned;
    }

    @Before
    public void before() throws IOException {
        File folder = this.tempFolder.newFolder();
        ImmutableMap props = ImmutableMap.of((Object)"write.format.default", (Object)this.format.name());
        this.table = SimpleDataUtil.createTable(folder.getAbsolutePath(), (Map<String, String>)props, this.partitioned);
    }

    @Test
    public void testWritingTable() throws Exception {
        long checkpointId = 1L;
        try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness = this.createIcebergStreamWriter();){
            testHarness.processElement((Object)SimpleDataUtil.createRowData(1, "hello"), 1L);
            testHarness.processElement((Object)SimpleDataUtil.createRowData(2, "world"), 1L);
            testHarness.processElement((Object)SimpleDataUtil.createRowData(3, "hello"), 1L);
            testHarness.prepareSnapshotPreBarrier(checkpointId);
            long expectedDataFiles = this.partitioned ? 2L : 1L;
            WriteResult result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
            testHarness.processElement((Object)SimpleDataUtil.createRowData(4, "foo"), 1L);
            testHarness.processElement((Object)SimpleDataUtil.createRowData(5, "bar"), 2L);
            testHarness.prepareSnapshotPreBarrier(++checkpointId);
            expectedDataFiles = this.partitioned ? 4L : 2L;
            result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
            AppendFiles appendFiles = this.table.newAppend();
            Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((AppendFiles)appendFiles).appendFile(arg_0));
            appendFiles.commit();
            SimpleDataUtil.assertTableRecords(this.table, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world"), SimpleDataUtil.createRecord(3, "hello"), SimpleDataUtil.createRecord(4, "foo"), SimpleDataUtil.createRecord(5, "bar")}));
        }
    }

    @Test
    public void testSnapshotTwice() throws Exception {
        long checkpointId = 1L;
        long timestamp = 1L;
        try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness = this.createIcebergStreamWriter();){
            testHarness.processElement((Object)SimpleDataUtil.createRowData(1, "hello"), timestamp++);
            testHarness.processElement((Object)SimpleDataUtil.createRowData(2, "world"), timestamp);
            testHarness.prepareSnapshotPreBarrier(checkpointId++);
            long expectedDataFiles = this.partitioned ? 2L : 1L;
            WriteResult result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
            for (int i = 0; i < 5; ++i) {
                testHarness.prepareSnapshotPreBarrier(checkpointId++);
                result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
                Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
                Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
            }
        }
    }

    @Test
    public void testTableWithoutSnapshot() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness = this.createIcebergStreamWriter();){
            Assert.assertEquals((long)0L, (long)testHarness.extractOutputValues().size());
        }
        Assert.assertEquals((long)0L, (long)this.scanDataFiles().size());
        testHarness = this.createIcebergStreamWriter();
        var2_2 = null;
        try {
            testHarness.processElement((Object)SimpleDataUtil.createRowData(1, "hello"), 1L);
            Assert.assertEquals((long)0L, (long)testHarness.extractOutputValues().size());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (testHarness != null) {
                TestIcebergStreamWriter.$closeResource(var2_2, testHarness);
            }
        }
        Assert.assertEquals((long)1L, (long)this.scanDataFiles().size());
    }

    private Set<String> scanDataFiles() throws IOException {
        Path dataDir = new Path(this.table.location(), "data");
        FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)new org.apache.hadoop.conf.Configuration());
        if (!fs.exists(dataDir)) {
            return ImmutableSet.of();
        }
        HashSet paths = Sets.newHashSet();
        RemoteIterator iterators = fs.listFiles(dataDir, true);
        while (iterators.hasNext()) {
            Path path;
            LocatedFileStatus status = (LocatedFileStatus)iterators.next();
            if (!status.isFile() || !(path = status.getPath()).getName().endsWith("." + this.format.toString().toLowerCase())) continue;
            paths.add(path.toString());
        }
        return paths;
    }

    @Test
    public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness = this.createIcebergStreamWriter();){
            testHarness.processElement((Object)SimpleDataUtil.createRowData(1, "hello"), 1L);
            testHarness.processElement((Object)SimpleDataUtil.createRowData(2, "world"), 2L);
            Assertions.assertThat((Object)testHarness.getOneInputOperator()).isInstanceOf(BoundedOneInput.class);
            ((BoundedOneInput)testHarness.getOneInputOperator()).endInput();
            long expectedDataFiles = this.partitioned ? 2L : 1L;
            WriteResult result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
            ((BoundedOneInput)testHarness.getOneInputOperator()).endInput();
            result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
        }
    }

    @Test
    public void testBoundedStreamTriggeredEndInputBeforeTriggeringCheckpoint() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness = this.createIcebergStreamWriter();){
            testHarness.processElement((Object)SimpleDataUtil.createRowData(1, "hello"), 1L);
            testHarness.processElement((Object)SimpleDataUtil.createRowData(2, "world"), 2L);
            testHarness.endInput();
            long expectedDataFiles = this.partitioned ? 2L : 1L;
            WriteResult result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
            testHarness.prepareSnapshotPreBarrier(1L);
            result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)expectedDataFiles, (long)result.dataFiles().length);
        }
    }

    @Test
    public void testTableWithTargetFileSize() throws Exception {
        this.table.updateProperties().set("write.target-file-size-bytes", "4").commit();
        ArrayList rows = Lists.newArrayListWithCapacity((int)8000);
        ArrayList records = Lists.newArrayListWithCapacity((int)8000);
        for (int i = 0; i < 2000; ++i) {
            for (String data : new String[]{"a", "b", "c", "d"}) {
                rows.add(SimpleDataUtil.createRowData(i, data));
                records.add(SimpleDataUtil.createRecord(i, data));
            }
        }
        OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness = this.createIcebergStreamWriter();
        String[] stringArray = null;
        try {
            for (RowData row : rows) {
                testHarness.processElement((Object)row, 1L);
            }
            testHarness.prepareSnapshotPreBarrier(1L);
            WriteResult result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)8L, (long)result.dataFiles().length);
            for (DataFile dataFile : result.dataFiles()) {
                Assert.assertEquals((long)1000L, (long)dataFile.recordCount());
            }
            AppendFiles appendFiles = this.table.newAppend();
            Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((AppendFiles)appendFiles).appendFile(arg_0));
            appendFiles.commit();
        }
        catch (Throwable object) {
            stringArray = object;
            throw object;
        }
        finally {
            if (testHarness != null) {
                TestIcebergStreamWriter.$closeResource((Throwable)stringArray, testHarness);
            }
        }
        SimpleDataUtil.assertTableRecords(this.table, (List<Record>)records);
    }

    @Test
    public void testPromotedFlinkDataType() throws Exception {
        Schema iSchema = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"tinyint", (Type)Types.IntegerType.get()), Types.NestedField.required((int)2, (String)"smallint", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)3, (String)"int", (Type)Types.IntegerType.get())});
        TableSchema flinkSchema = TableSchema.builder().field("tinyint", (DataType)DataTypes.TINYINT().notNull()).field("smallint", (DataType)DataTypes.SMALLINT().notNull()).field("int", (DataType)DataTypes.INT().nullable()).build();
        PartitionSpec spec = this.partitioned ? PartitionSpec.builderFor((Schema)iSchema).identity("smallint").identity("tinyint").identity("int").build() : PartitionSpec.unpartitioned();
        String location = this.tempFolder.newFolder().getAbsolutePath();
        ImmutableMap props = ImmutableMap.of((Object)"write.format.default", (Object)this.format.name());
        Table icebergTable = new HadoopTables().create(iSchema, spec, (Map)props, location);
        ArrayList rows = Lists.newArrayList((Object[])new RowData[]{GenericRowData.of((Object[])new Object[]{(byte)1, (short)Short.MIN_VALUE, 101}), GenericRowData.of((Object[])new Object[]{(byte)2, (short)0, 102}), GenericRowData.of((Object[])new Object[]{(byte)3, (short)Short.MAX_VALUE, 103})});
        GenericRecord record = GenericRecord.create((Schema)iSchema);
        ArrayList expected = Lists.newArrayList((Object[])new Record[]{record.copy((Map)ImmutableMap.of((Object)"tinyint", (Object)1, (Object)"smallint", (Object)Short.MIN_VALUE, (Object)"int", (Object)101)), record.copy((Map)ImmutableMap.of((Object)"tinyint", (Object)2, (Object)"smallint", (Object)0, (Object)"int", (Object)102)), record.copy((Map)ImmutableMap.of((Object)"tinyint", (Object)3, (Object)"smallint", (Object)Short.MAX_VALUE, (Object)"int", (Object)103))});
        try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness = this.createIcebergStreamWriter(icebergTable, flinkSchema);){
            for (RowData row : rows) {
                testHarness.processElement((Object)row, 1L);
            }
            testHarness.prepareSnapshotPreBarrier(1L);
            WriteResult result = WriteResult.builder().addAll((Iterable)testHarness.extractOutputValues()).build();
            Assert.assertEquals((long)0L, (long)result.deleteFiles().length);
            Assert.assertEquals((long)(this.partitioned ? 3L : 1L), (long)result.dataFiles().length);
            AppendFiles appendFiles = icebergTable.newAppend();
            Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((AppendFiles)appendFiles).appendFile(arg_0));
            appendFiles.commit();
        }
        SimpleDataUtil.assertTableRecords(location, (List<Record>)expected);
    }

    private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter() throws Exception {
        return this.createIcebergStreamWriter(this.table, SimpleDataUtil.FLINK_SCHEMA);
    }

    private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(Table icebergTable, TableSchema flinkSchema) throws Exception {
        RowType flinkRowType = FlinkSink.toFlinkRowType((Schema)icebergTable.schema(), (TableSchema)flinkSchema);
        FlinkWriteConf flinkWriteConfig = new FlinkWriteConf(icebergTable, (Map)Maps.newHashMap(), (ReadableConfig)new Configuration());
        IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter((SerializableSupplier & Serializable)() -> icebergTable, (FlinkWriteConf)flinkWriteConfig, (RowType)flinkRowType, null);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)streamWriter, 1, 1, 0);
        harness.setup();
        harness.open();
        return harness;
    }
}

