/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;

public class TestFlinkTableSink
extends CatalogTestBase {
    private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source";
    private static final String TABLE_NAME = "test_table";
    private TableEnvironment tEnv;
    private Table icebergTable;
    @Parameter(index=2)
    private FileFormat format;
    @Parameter(index=3)
    private boolean isStreamingJob;

    @Parameters(name="catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
    public static List<Object[]> parameters() {
        ArrayList parameters = Lists.newArrayList();
        for (FileFormat format : new FileFormat[]{FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) {
            for (Boolean isStreaming : new Boolean[]{true, false}) {
                for (Object[] catalogParams : CatalogTestBase.parameters()) {
                    String catalogName = (String)catalogParams[0];
                    Namespace baseNamespace = (Namespace)catalogParams[1];
                    parameters.add(new Object[]{catalogName, baseNamespace, format, isStreaming});
                }
            }
        }
        return parameters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            TestFlinkTableSink testFlinkTableSink = this;
            synchronized (testFlinkTableSink) {
                EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
                if (this.isStreamingJob) {
                    settingsBuilder.inStreamingMode();
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                    env.enableCheckpointing(400L);
                    env.setMaxParallelism(2);
                    env.setParallelism(2);
                    this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)settingsBuilder.build());
                } else {
                    settingsBuilder.inBatchMode();
                    this.tEnv = TableEnvironment.create((EnvironmentSettings)settingsBuilder.build());
                }
            }
        }
        return this.tEnv;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.sql("CREATE DATABASE %s", this.flinkDatabase);
        this.sql("USE CATALOG %s", this.catalogName);
        this.sql("USE %s", "db");
        this.sql("CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')", TABLE_NAME, this.format.name());
        this.icebergTable = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE_NAME));
    }

    @Override
    @AfterEach
    public void clean() {
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME);
        this.sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        BoundedTableFactory.clearDataSets();
        super.clean();
    }

    @TestTemplate
    public void testInsertFromSourceTable() throws Exception {
        this.getTableEnv().createTemporaryView("sourceTable", this.getTableEnv().fromValues((AbstractDataType)SimpleDataUtil.FLINK_SCHEMA.toRowDataType(), new Expression[]{Expressions.row((Object)1, (Object[])new Object[]{"hello"}), Expressions.row((Object)2, (Object[])new Object[]{"world"}), Expressions.row((Object)3, (Object[])new Object[]{null}), Expressions.row(null, (Object[])new Object[]{"bar"})}));
        this.sql("INSERT INTO %s SELECT id,data from sourceTable", TABLE_NAME);
        SimpleDataUtil.assertTableRecords(this.icebergTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world"), SimpleDataUtil.createRecord(3, null), SimpleDataUtil.createRecord(null, "bar")}));
    }

    @TestTemplate
    public void testOverwriteTable() throws Exception {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)this.isStreamingJob).as("Flink unbounded streaming does not support overwrite operation", new Object[0])).isFalse();
        this.sql("INSERT INTO %s SELECT 1, 'a'", TABLE_NAME);
        SimpleDataUtil.assertTableRecords(this.icebergTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "a")}));
        this.sql("INSERT OVERWRITE %s SELECT 2, 'b'", TABLE_NAME);
        SimpleDataUtil.assertTableRecords(this.icebergTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(2, "b")}));
    }

    @TestTemplate
    public void testWriteParallelism() throws Exception {
        List dataSet = IntStream.range(1, 1000).mapToObj(i -> ImmutableList.of((Object)Row.of((Object[])new Object[]{i, "aaa"}), (Object)Row.of((Object[])new Object[]{i, "bbb"}), (Object)Row.of((Object[])new Object[]{i, "ccc"}))).flatMap(Collection::stream).collect(Collectors.toList());
        String dataId = BoundedTableFactory.registerDataSet((List<List<Row>>)ImmutableList.of(dataSet));
        this.sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL) WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId);
        PlannerBase planner = (PlannerBase)((TableEnvironmentImpl)this.getTableEnv()).getPlanner();
        String insertSQL = String.format("INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s", TABLE_NAME, SOURCE_TABLE);
        ModifyOperation operation = (ModifyOperation)planner.getParser().parse(insertSQL).get(0);
        Transformation dummySink = (Transformation)planner.translate(Collections.singletonList(operation)).get(0);
        Transformation committer = (Transformation)dummySink.getInputs().get(0);
        Transformation writer = (Transformation)committer.getInputs().get(0);
        ((AbstractIntegerAssert)Assertions.assertThat((int)writer.getParallelism()).as("Should have the expected 1 parallelism.", new Object[0])).isEqualTo(1);
        writer.getInputs().forEach(input -> ((AbstractIntegerAssert)Assertions.assertThat((int)input.getParallelism()).as("Should have the expected parallelism.", new Object[0])).isEqualTo(this.isStreamingJob ? 2 : 4));
    }

    @TestTemplate
    public void testReplacePartitions() throws Exception {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)this.isStreamingJob).as("Flink unbounded streaming does not support overwrite operation", new Object[0])).isFalse();
        String tableName = "test_partition";
        this.sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", tableName, this.format.name());
        try {
            Table partitionedTable = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)tableName));
            this.sql("INSERT INTO %s SELECT 1, 'a'", tableName);
            this.sql("INSERT INTO %s SELECT 2, 'b'", tableName);
            this.sql("INSERT INTO %s SELECT 3, 'c'", tableName);
            SimpleDataUtil.assertTableRecords(partitionedTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "a"), SimpleDataUtil.createRecord(2, "b"), SimpleDataUtil.createRecord(3, "c")}));
            this.sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName);
            this.sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName);
            SimpleDataUtil.assertTableRecords(partitionedTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(5, "a"), SimpleDataUtil.createRecord(4, "b"), SimpleDataUtil.createRecord(3, "c")}));
            this.sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName);
            SimpleDataUtil.assertTableRecords(partitionedTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(6, "a"), SimpleDataUtil.createRecord(4, "b"), SimpleDataUtil.createRecord(3, "c")}));
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }

    @TestTemplate
    public void testInsertIntoPartition() throws Exception {
        String tableName = "test_insert_into_partition";
        this.sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", tableName, this.format.name());
        try {
            Table partitionedTable = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)tableName));
            this.sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName);
            this.sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName);
            this.sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName);
            SimpleDataUtil.assertTableRecords(partitionedTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "a"), SimpleDataUtil.createRecord(2, "a"), SimpleDataUtil.createRecord(3, "b")}));
            this.sql("INSERT INTO %s SELECT 4, 'c'", tableName);
            this.sql("INSERT INTO %s SELECT 5, 'd'", tableName);
            SimpleDataUtil.assertTableRecords(partitionedTable, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "a"), SimpleDataUtil.createRecord(2, "a"), SimpleDataUtil.createRecord(3, "b"), SimpleDataUtil.createRecord(4, "c"), SimpleDataUtil.createRecord(5, "d")}));
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testHashDistributeMode() throws Exception {
        String tableName = "test_hash_distribution_mode";
        ImmutableMap tableProps = ImmutableMap.of((Object)"write.format.default", (Object)this.format.name(), (Object)"write.distribution-mode", (Object)DistributionMode.HASH.modeName());
        List dataSet = IntStream.range(1, 1000).mapToObj(i -> ImmutableList.of((Object)Row.of((Object[])new Object[]{i, "aaa"}), (Object)Row.of((Object[])new Object[]{i, "bbb"}), (Object)Row.of((Object[])new Object[]{i, "ccc"}))).flatMap(Collection::stream).collect(Collectors.toList());
        String dataId = BoundedTableFactory.registerDataSet((List<List<Row>>)ImmutableList.of(dataSet));
        this.sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL) WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId);
        ((ListAssert)Assertions.assertThat(this.sql("SELECT * FROM %s", SOURCE_TABLE)).as("Should have the expected rows in source table.", new Object[0])).containsExactlyInAnyOrderElementsOf(dataSet);
        this.sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", tableName, TestFlinkTableSink.toWithClause((Map<String, String>)tableProps));
        try {
            this.sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
            ((ListAssert)Assertions.assertThat(this.sql("SELECT * FROM %s", tableName)).as("Should have the expected rows in sink table.", new Object[0])).containsExactlyInAnyOrderElementsOf(dataSet);
            Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)tableName));
            Map<Long, List<DataFile>> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table);
            for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
                if (dataFiles.isEmpty()) continue;
                Assertions.assertThat(SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), (Map<String, Object>)ImmutableMap.of((Object)"data", (Object)"aaa"))).hasSize(1);
                Assertions.assertThat(SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), (Map<String, Object>)ImmutableMap.of((Object)"data", (Object)"bbb"))).hasSize(1);
                Assertions.assertThat(SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), (Map<String, Object>)ImmutableMap.of((Object)"data", (Object)"ccc"))).hasSize(1);
            }
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }
}

