/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.flink.FlinkCatalogTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestFlinkUpsert
extends FlinkCatalogTestBase {
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final boolean isStreamingJob;
    private final Map<String, String> tableUpsertProps = Maps.newHashMap();
    private TableEnvironment tEnv;

    public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) {
        super(catalogName, baseNamespace);
        this.isStreamingJob = isStreamingJob;
        this.tableUpsertProps.put("format-version", "2");
        this.tableUpsertProps.put("write.upsert.enabled", "true");
        this.tableUpsertProps.put("write.format.default", format.name());
    }

    @Parameterized.Parameters(name="catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
    public static Iterable<Object[]> parameters() {
        ArrayList parameters = Lists.newArrayList();
        for (FileFormat format : new FileFormat[]{FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) {
            for (Boolean isStreaming : new Boolean[]{true, false}) {
                String catalogName = "testhadoop";
                Namespace baseNamespace = Namespace.of((String[])new String[]{"default"});
                parameters.add(new Object[]{catalogName, baseNamespace, format, isStreaming});
            }
        }
        return parameters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            TestFlinkUpsert testFlinkUpsert = this;
            synchronized (testFlinkUpsert) {
                EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
                if (this.isStreamingJob) {
                    settingsBuilder.inStreamingMode();
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                    env.enableCheckpointing(400L);
                    env.setMaxParallelism(2);
                    env.setParallelism(2);
                    this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)settingsBuilder.build());
                } else {
                    settingsBuilder.inBatchMode();
                    this.tEnv = TableEnvironment.create((EnvironmentSettings)settingsBuilder.build());
                }
            }
        }
        return this.tEnv;
    }

    @Override
    @Before
    public void before() {
        super.before();
        this.sql("CREATE DATABASE IF NOT EXISTS %s", this.flinkDatabase);
        this.sql("USE CATALOG %s", this.catalogName);
        this.sql("USE %s", "db");
    }

    @Override
    @After
    public void clean() {
        this.sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        super.clean();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpsertAndQuery() {
        String tableName = "test_upsert_query";
        LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
        LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
        this.sql("CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) PARTITIONED BY (dt) WITH %s", tableName, TestFlinkUpsert.toWithClause(this.tableUpsertProps));
        try {
            this.sql("INSERT INTO %s VALUES (1, 'Bill', DATE '2022-03-01'),(1, 'Jane', DATE '2022-03-01'),(2, 'Jane', DATE '2022-03-01')", tableName);
            this.sql("INSERT INTO %s VALUES (2, 'Bill', DATE '2022-03-01'),(1, 'Jane', DATE '2022-03-02'),(2, 'Jane', DATE '2022-03-02')", tableName);
            ArrayList rowsOn20220301 = Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Jane", dt20220301}), Row.of((Object[])new Object[]{2, "Bill", dt20220301})});
            TestHelpers.assertRows(this.sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
            ArrayList rowsOn20220302 = Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Jane", dt20220302}), Row.of((Object[])new Object[]{2, "Jane", dt20220302})});
            TestHelpers.assertRows(this.sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Iterable)Iterables.concat((Iterable)rowsOn20220301, (Iterable)rowsOn20220302)));
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpsertOptions() {
        String tableName = "test_upsert_options";
        LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
        LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
        HashMap optionsUpsertProps = Maps.newHashMap(this.tableUpsertProps);
        optionsUpsertProps.remove("write.upsert.enabled");
        this.sql("CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, PRIMARY KEY(id,dt) NOT ENFORCED) PARTITIONED BY (dt) WITH %s", tableName, TestFlinkUpsert.toWithClause(optionsUpsertProps));
        try {
            this.sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES (1, 'Bill', DATE '2022-03-01'),(1, 'Jane', DATE '2022-03-01'),(2, 'Jane', DATE '2022-03-01')", tableName);
            this.sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES (2, 'Bill', DATE '2022-03-01'),(1, 'Jane', DATE '2022-03-02'),(2, 'Jane', DATE '2022-03-02')", tableName);
            ArrayList rowsOn20220301 = Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Jane", dt20220301}), Row.of((Object[])new Object[]{2, "Bill", dt20220301})});
            TestHelpers.assertRows(this.sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);
            ArrayList rowsOn20220302 = Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Jane", dt20220302}), Row.of((Object[])new Object[]{2, "Jane", dt20220302})});
            TestHelpers.assertRows(this.sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Iterable)Iterables.concat((Iterable)rowsOn20220301, (Iterable)rowsOn20220302)));
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }

    @Test
    public void testPrimaryKeyEqualToPartitionKey() {
        String tableName = "upsert_on_id_key";
        try {
            this.sql("CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY KEY(id) NOT ENFORCED) PARTITIONED BY (id) WITH %s", tableName, TestFlinkUpsert.toWithClause(this.tableUpsertProps));
            this.sql("INSERT INTO %s VALUES (1, 'Bill'),(1, 'Jane'),(2, 'Bill')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Jane"}), Row.of((Object[])new Object[]{2, "Bill"})}));
            this.sql("INSERT INTO %s VALUES (1, 'Bill'),(2, 'Jane')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Bill"}), Row.of((Object[])new Object[]{2, "Jane"})}));
            this.sql("INSERT INTO %s VALUES (3, 'Bill'),(4, 'Jane')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Bill"}), Row.of((Object[])new Object[]{2, "Jane"}), Row.of((Object[])new Object[]{3, "Bill"}), Row.of((Object[])new Object[]{4, "Jane"})}));
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }

    @Test
    public void testPrimaryKeyFieldsAtBeginningOfSchema() {
        String tableName = "upsert_on_pk_at_schema_start";
        LocalDate dt = LocalDate.of(2022, 3, 1);
        try {
            this.sql("CREATE TABLE %s(id INT, dt DATE NOT NULL, name STRING NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) PARTITIONED BY (dt) WITH %s", tableName, TestFlinkUpsert.toWithClause(this.tableUpsertProps));
            this.sql("INSERT INTO %s VALUES (1, DATE '2022-03-01', 'Andy'),(1, DATE '2022-03-01', 'Bill'),(2, DATE '2022-03-01', 'Jane')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, dt, "Bill"}), Row.of((Object[])new Object[]{2, dt, "Jane"})}));
            this.sql("INSERT INTO %s VALUES (1, DATE '2022-03-01', 'Jane'),(2, DATE '2022-03-01', 'Bill')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, dt, "Jane"}), Row.of((Object[])new Object[]{2, dt, "Bill"})}));
            this.sql("INSERT INTO %s VALUES (3, DATE '2022-03-01', 'Duke'),(4, DATE '2022-03-01', 'Leon')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{1, dt, "Jane"}), Row.of((Object[])new Object[]{2, dt, "Bill"}), Row.of((Object[])new Object[]{3, dt, "Duke"}), Row.of((Object[])new Object[]{4, dt, "Leon"})}));
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }

    @Test
    public void testPrimaryKeyFieldsAtEndOfTableSchema() {
        String tableName = "upsert_on_pk_at_schema_end";
        LocalDate dt = LocalDate.of(2022, 3, 1);
        try {
            this.sql("CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, PRIMARY KEY(id,dt) NOT ENFORCED) PARTITIONED BY (dt) WITH %s", tableName, TestFlinkUpsert.toWithClause(this.tableUpsertProps));
            this.sql("INSERT INTO %s VALUES ('Andy', 1, DATE '2022-03-01'),('Bill', 1, DATE '2022-03-01'),('Jane', 2, DATE '2022-03-01')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{"Bill", 1, dt}), Row.of((Object[])new Object[]{"Jane", 2, dt})}));
            this.sql("INSERT INTO %s VALUES ('Jane', 1, DATE '2022-03-01'),('Bill', 2, DATE '2022-03-01')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{"Jane", 1, dt}), Row.of((Object[])new Object[]{"Bill", 2, dt})}));
            this.sql("INSERT INTO %s VALUES ('Duke', 3, DATE '2022-03-01'),('Leon', 4, DATE '2022-03-01')", tableName);
            TestHelpers.assertRows(this.sql("SELECT * FROM %s", tableName), Lists.newArrayList((Object[])new Row[]{Row.of((Object[])new Object[]{"Jane", 1, dt}), Row.of((Object[])new Object[]{"Bill", 2, dt}), Row.of((Object[])new Object[]{"Duke", 3, dt}), Row.of((Object[])new Object[]{"Leon", 4, dt})}));
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, tableName);
    }
}

