/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestFlinkIcebergSink
extends TestFlinkIcebergSinkBase {
    @RegisterExtension
    public static MiniClusterExtension miniClusterResource = MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
    @RegisterExtension
    private static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("default", "t");
    private TableLoader tableLoader;
    @Parameter(index=0)
    private FileFormat format;
    @Parameter(index=1)
    private int parallelism;
    @Parameter(index=2)
    private boolean partitioned;

    @Parameters(name="format={0}, parallelism = {1}, partitioned = {2}")
    public static Object[][] parameters() {
        return new Object[][]{{FileFormat.AVRO, 1, true}, {FileFormat.AVRO, 1, false}, {FileFormat.AVRO, 2, true}, {FileFormat.AVRO, 2, false}, {FileFormat.ORC, 1, true}, {FileFormat.ORC, 1, false}, {FileFormat.ORC, 2, true}, {FileFormat.ORC, 2, false}, {FileFormat.PARQUET, 1, true}, {FileFormat.PARQUET, 1, false}, {FileFormat.PARQUET, 2, true}, {FileFormat.PARQUET, 2, false}};
    }

    @BeforeEach
    public void before() throws IOException {
        this.table = CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SimpleDataUtil.SCHEMA, this.partitioned ? PartitionSpec.builderFor((Schema)SimpleDataUtil.SCHEMA).identity("data").build() : PartitionSpec.unpartitioned(), (Map)ImmutableMap.of((Object)"write.format.default", (Object)this.format.name()));
        this.env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).enableCheckpointing(100L).setParallelism(this.parallelism).setMaxParallelism(this.parallelism);
        this.tableLoader = CATALOG_EXTENSION.tableLoader();
    }

    @TestTemplate
    public void testWriteRowData() throws Exception {
        ArrayList rows = Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "hello"}), Row.of((Object[])new Object[]{2, "world"}), Row.of((Object[])new Object[]{3, "foo"})});
        SingleOutputStreamOperator dataStream = this.env.addSource(this.createBoundedSource(rows), ROW_TYPE_INFO).map(arg_0 -> ((DataFormatConverters.RowConverter)CONVERTER).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo((RowType)SimpleDataUtil.ROW_TYPE));
        FlinkSink.forRowData((DataStream)dataStream).table(this.table).tableLoader(this.tableLoader).writeParallelism(this.parallelism).append();
        this.env.execute("Test Iceberg DataStream");
        SimpleDataUtil.assertTableRows(this.table, this.convertToRowData(rows));
    }

    private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception {
        List<Row> rows = this.createRows("");
        DataStreamSource dataStream = this.env.addSource(this.createBoundedSource(rows), ROW_TYPE_INFO);
        FlinkSink.forRow((DataStream)dataStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).table(this.table).tableLoader(this.tableLoader).tableSchema(tableSchema).writeParallelism(this.parallelism).distributionMode(distributionMode).append();
        this.env.execute("Test Iceberg DataStream.");
        SimpleDataUtil.assertTableRows(this.table, this.convertToRowData(rows));
    }

    private int partitionFiles(String partition) throws IOException {
        return SimpleDataUtil.partitionDataFiles(this.table, (Map<String, Object>)ImmutableMap.of((Object)"data", (Object)partition)).size();
    }

    @TestTemplate
    public void testWriteRow() throws Exception {
        this.testWriteRow(null, DistributionMode.NONE);
    }

    @TestTemplate
    public void testWriteRowWithTableSchema() throws Exception {
        this.testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
    }

    @TestTemplate
    public void testJobNoneDistributeMode() throws Exception {
        this.table.updateProperties().set("write.distribution-mode", DistributionMode.HASH.modeName()).commit();
        this.testWriteRow(null, DistributionMode.NONE);
        if (this.parallelism > 1 && this.partitioned) {
            int files = this.partitionFiles("aaa") + this.partitionFiles("bbb") + this.partitionFiles("ccc");
            Assertions.assertThat((int)files).isGreaterThan(3);
        }
    }

    @TestTemplate
    public void testJobHashDistributionMode() {
        this.table.updateProperties().set("write.distribution-mode", DistributionMode.HASH.modeName()).commit();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.testWriteRow(null, DistributionMode.RANGE)).isInstanceOf(IllegalArgumentException.class)).hasMessage("Flink does not support 'range' write distribution mode now.");
    }

    @TestTemplate
    public void testJobNullDistributionMode() throws Exception {
        this.table.updateProperties().set("write.distribution-mode", DistributionMode.HASH.modeName()).commit();
        this.testWriteRow(null, null);
        if (this.partitioned) {
            Assertions.assertThat((int)this.partitionFiles("aaa")).isEqualTo(1);
            Assertions.assertThat((int)this.partitionFiles("bbb")).isEqualTo(1);
            Assertions.assertThat((int)this.partitionFiles("ccc")).isEqualTo(1);
        }
    }

    @TestTemplate
    public void testPartitionWriteMode() throws Exception {
        this.testWriteRow(null, DistributionMode.HASH);
        if (this.partitioned) {
            Assertions.assertThat((int)this.partitionFiles("aaa")).isEqualTo(1);
            Assertions.assertThat((int)this.partitionFiles("bbb")).isEqualTo(1);
            Assertions.assertThat((int)this.partitionFiles("ccc")).isEqualTo(1);
        }
    }

    @TestTemplate
    public void testShuffleByPartitionWithSchema() throws Exception {
        this.testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
        if (this.partitioned) {
            Assertions.assertThat((int)this.partitionFiles("aaa")).isEqualTo(1);
            Assertions.assertThat((int)this.partitionFiles("bbb")).isEqualTo(1);
            Assertions.assertThat((int)this.partitionFiles("ccc")).isEqualTo(1);
        }
    }

    @TestTemplate
    public void testTwoSinksInDisjointedDAG() throws Exception {
        ImmutableMap props = ImmutableMap.of((Object)"write.format.default", (Object)this.format.name());
        Table leftTable = CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of((String[])new String[]{"left"}), SimpleDataUtil.SCHEMA, this.partitioned ? PartitionSpec.builderFor((Schema)SimpleDataUtil.SCHEMA).identity("data").build() : PartitionSpec.unpartitioned(), (Map)props);
        TableLoader leftTableLoader = TableLoader.fromCatalog((CatalogLoader)CATALOG_EXTENSION.catalogLoader(), (TableIdentifier)TableIdentifier.of((String[])new String[]{"left"}));
        Table rightTable = CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of((String[])new String[]{"right"}), SimpleDataUtil.SCHEMA, this.partitioned ? PartitionSpec.builderFor((Schema)SimpleDataUtil.SCHEMA).identity("data").build() : PartitionSpec.unpartitioned(), (Map)props);
        TableLoader rightTableLoader = TableLoader.fromCatalog((CatalogLoader)CATALOG_EXTENSION.catalogLoader(), (TableIdentifier)TableIdentifier.of((String[])new String[]{"right"}));
        this.env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).enableCheckpointing(100L).setParallelism(this.parallelism).setMaxParallelism(this.parallelism);
        this.env.getConfig().disableAutoGeneratedUIDs();
        List<Row> leftRows = this.createRows("left-");
        SingleOutputStreamOperator leftStream = this.env.fromCollection(leftRows, ROW_TYPE_INFO).name("leftCustomSource").uid("leftCustomSource");
        FlinkSink.forRow((DataStream)leftStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).table(leftTable).tableLoader(leftTableLoader).tableSchema(SimpleDataUtil.FLINK_SCHEMA).distributionMode(DistributionMode.NONE).uidPrefix("leftIcebergSink").append();
        List<Row> rightRows = this.createRows("right-");
        SingleOutputStreamOperator rightStream = this.env.fromCollection(rightRows, ROW_TYPE_INFO).name("rightCustomSource").uid("rightCustomSource");
        FlinkSink.forRow((DataStream)rightStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).table(rightTable).tableLoader(rightTableLoader).tableSchema(SimpleDataUtil.FLINK_SCHEMA).writeParallelism(this.parallelism).distributionMode(DistributionMode.HASH).uidPrefix("rightIcebergSink").setSnapshotProperty("flink.test", TestFlinkIcebergSink.class.getName()).setSnapshotProperties(Collections.singletonMap("direction", "rightTable")).append();
        this.env.execute("Test Iceberg DataStream.");
        SimpleDataUtil.assertTableRows(leftTable, this.convertToRowData(leftRows));
        SimpleDataUtil.assertTableRows(rightTable, this.convertToRowData(rightRows));
        leftTable.refresh();
        Assertions.assertThat((Map)leftTable.currentSnapshot().summary()).doesNotContainKeys((Object[])new String[]{"flink.test", "direction"});
        rightTable.refresh();
        ((MapAssert)Assertions.assertThat((Map)rightTable.currentSnapshot().summary()).containsEntry((Object)"flink.test", (Object)TestFlinkIcebergSink.class.getName())).containsEntry((Object)"direction", (Object)"rightTable");
    }

    @TestTemplate
    public void testOverrideWriteConfigWithUnknownDistributionMode() {
        HashMap newProps = Maps.newHashMap();
        newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
        List<Row> rows = this.createRows("");
        DataStreamSource dataStream = this.env.addSource(this.createBoundedSource(rows), ROW_TYPE_INFO);
        FlinkSink.Builder builder = FlinkSink.forRow((DataStream)dataStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).table(this.table).tableLoader(this.tableLoader).writeParallelism(this.parallelism).setAll((Map)newProps);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((FlinkSink.Builder)builder).append()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Invalid distribution mode: UNRECOGNIZED");
    }

    @TestTemplate
    public void testOverrideWriteConfigWithUnknownFileFormat() {
        HashMap newProps = Maps.newHashMap();
        newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
        List<Row> rows = this.createRows("");
        DataStreamSource dataStream = this.env.addSource(this.createBoundedSource(rows), ROW_TYPE_INFO);
        FlinkSink.Builder builder = FlinkSink.forRow((DataStream)dataStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).table(this.table).tableLoader(this.tableLoader).writeParallelism(this.parallelism).setAll((Map)newProps);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((FlinkSink.Builder)builder).append()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Invalid file format: UNRECOGNIZED");
    }

    @TestTemplate
    public void testWriteRowWithTableRefreshInterval() throws Exception {
        ArrayList rows = Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "hello"}), Row.of((Object[])new Object[]{2, "world"}), Row.of((Object[])new Object[]{3, "foo"})});
        SingleOutputStreamOperator dataStream = this.env.addSource(this.createBoundedSource(rows), ROW_TYPE_INFO).map(arg_0 -> ((DataFormatConverters.RowConverter)CONVERTER).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo((RowType)SimpleDataUtil.ROW_TYPE));
        Configuration flinkConf = new Configuration();
        flinkConf.setString(FlinkWriteOptions.TABLE_REFRESH_INTERVAL.key(), "100ms");
        FlinkSink.forRowData((DataStream)dataStream).table(this.table).tableLoader(this.tableLoader).flinkConf((ReadableConfig)flinkConf).writeParallelism(this.parallelism).append();
        this.env.execute("Test Iceberg DataStream");
        SimpleDataUtil.assertTableRows(this.table, this.convertToRowData(rows));
    }
}

