/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.util;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;

public class ReadWriteTableTestUtil {
    private static final Duration TIME_OUT = Duration.ofSeconds(10L);
    public static final int DEFAULT_PARALLELISM = 2;
    public static final Map<String, String> SCAN_LATEST = new HashMap<String, String>(){
        {
            this.put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST.toString());
        }
    };
    public static TableEnvironment sEnv;
    public static StreamExecutionEnvironment bExeEnv;
    public static TableEnvironment bEnv;
    public static String warehouse;

    public static void init(String warehouse) {
        ReadWriteTableTestUtil.init(warehouse, 2);
    }

    public static void init(String warehouse, int parallelism) {
        StreamExecutionEnvironment sExeEnv = ReadWriteTableTestUtil.buildStreamEnv(parallelism, "none");
        sEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)sExeEnv);
        bExeEnv = ReadWriteTableTestUtil.buildBatchEnv(parallelism, "none");
        bEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)bExeEnv, (EnvironmentSettings)EnvironmentSettings.inBatchMode());
        ReadWriteTableTestUtil.warehouse = warehouse;
        String catalog = "PAIMON";
        sEnv.executeSql(String.format("CREATE CATALOG %s WITH ('type'='paimon', 'warehouse'='%s');", catalog, warehouse));
        sEnv.useCatalog(catalog);
        bEnv.registerCatalog(catalog, (Catalog)sEnv.getCatalog(catalog).get());
        bEnv.useCatalog(catalog);
    }

    public static StreamExecutionEnvironment buildStreamEnv(int parallelism, String restartStrategy) {
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)restartStrategy);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(100L);
        env.setParallelism(parallelism);
        return env;
    }

    public static StreamExecutionEnvironment buildBatchEnv(int parallelism, String restartStrategy) {
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)restartStrategy);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(parallelism);
        return env;
    }

    public static String createTable(List<String> fieldsSpec, List<String> primaryKeys, List<String> bucketKeys, List<String> partitionKeys) {
        return ReadWriteTableTestUtil.createTable(fieldsSpec, primaryKeys, bucketKeys, partitionKeys, new HashMap<String, String>());
    }

    public static String createTable(List<String> fieldsSpec, List<String> primaryKeys, List<String> bucketKeys, List<String> partitionKeys, Map<String, String> options) {
        String table = ("MyTable_" + UUID.randomUUID()).replace("-", "_");
        HashMap<String, String> newOptions = new HashMap<String, String>(options);
        if (!newOptions.containsKey("bucket")) {
            newOptions.put("bucket", "1");
        }
        if (!bucketKeys.isEmpty()) {
            newOptions.put("bucket-key", String.join((CharSequence)",", bucketKeys));
        }
        sEnv.executeSql(ReadWriteTableTestUtil.buildDdl(table, fieldsSpec, primaryKeys, partitionKeys, newOptions));
        return table;
    }

    public static String createTemporaryTable(List<String> fieldsSpec, List<String> primaryKeys, List<String> partitionKeys, List<Row> initialRecords, @Nullable String partitionList, boolean bounded, String changelogMode) {
        String temporaryTableDdlFormat = "CREATE TEMPORARY TABLE `%s`( %s %s) %s WITH (\n'connector' = 'values',\n'disable-lookup' = 'true',\n'data-id' = '%s',\n%s'bounded' = '%s',\n'changelog-mode' = '%s'\n);";
        String temporaryTable = "temp_" + UUID.randomUUID();
        sEnv.executeSql(String.format(temporaryTableDdlFormat, temporaryTable, String.join((CharSequence)",", fieldsSpec), ReadWriteTableTestUtil.buildPkConstraint(primaryKeys), ReadWriteTableTestUtil.buildPartitionSpec(partitionKeys), TestValuesTableFactory.registerData(initialRecords), partitionList == null ? "" : String.format("'partition-list' = '%s',\n", partitionList), bounded, changelogMode));
        return temporaryTable;
    }

    public static void insertInto(String table, String ... records) throws Exception {
        ReadWriteTableTestUtil.insertIntoPartition(table, "", records);
    }

    public static void insertIntoPartition(String table, String partitionSpec, String ... records) throws Exception {
        sEnv.executeSql(String.format("INSERT INTO `%s` %s VALUES %s;", table, partitionSpec, String.join((CharSequence)",", records))).await();
    }

    public static void insertIntoFromTable(String source, String sink) throws Exception {
        sEnv.executeSql(String.format("INSERT INTO `%s` SELECT * FROM `%s`;", sink, source)).await();
    }

    public static void insertOverwrite(String table, String ... records) throws Exception {
        ReadWriteTableTestUtil.insertOverwritePartition(table, "", records);
    }

    public static void insertOverwritePartition(String table, String partitionSpe, String ... records) throws Exception {
        String insert = String.format("INSERT OVERWRITE `%s` %s VALUES %s;", table, partitionSpe, String.join((CharSequence)",", records));
        bEnv.executeSql(insert).await();
    }

    public static String buildSimpleQuery(String table) {
        return ReadWriteTableTestUtil.buildQuery(table, "*", "");
    }

    public static String buildQuery(String table, String projection, String filter) {
        return ReadWriteTableTestUtil.buildQueryWithTableOptions(table, projection, filter, new HashMap<String, String>());
    }

    public static String buildQueryWithTableOptions(String table, String projection, String filter, Long limit, Map<String, String> options) {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(projection);
        params.add(table);
        params.add(ReadWriteTableTestUtil.buildTableOptionsSpec(options));
        params.add(filter);
        StringBuilder queryFormat = new StringBuilder("SELECT %s FROM `%s` %s %s");
        if (null != limit) {
            queryFormat.append(" limit %s");
            params.add(limit);
        }
        return String.format(queryFormat.toString(), params.toArray());
    }

    public static String buildQueryWithTableOptions(String table, String projection, String filter, Map<String, String> options) {
        return ReadWriteTableTestUtil.buildQueryWithTableOptions(table, projection, filter, null, options);
    }

    public static void checkFileStorePath(String table, List<String> partitionSpec) {
        String relativeFilePath = String.format("/%s.db/%s", sEnv.getCurrentDatabase(), table);
        Assertions.assertThat((Path)Paths.get(warehouse, relativeFilePath, "snapshot")).exists();
        Assertions.assertThat((Path)Paths.get(warehouse, relativeFilePath, "manifest")).exists();
        if (partitionSpec.isEmpty()) {
            partitionSpec = Collections.singletonList("");
        }
        partitionSpec.stream().map(str -> str.replaceAll(",", "/")).map(str -> str.replaceAll("null", "__DEFAULT_PARTITION__")).forEach(partition -> {
            Assertions.assertThat((Path)Paths.get(warehouse, relativeFilePath, partition)).exists();
            Assertions.assertThat((Path)Paths.get(warehouse, relativeFilePath, partition, "bucket-0")).exists();
        });
    }

    public static void testBatchRead(String query, List<Row> expected) throws Exception {
        CloseableIterator resultItr = bEnv.executeSql(query).collect();
        try (BlockingIterator iterator = BlockingIterator.of((Iterator)resultItr);){
            if (!expected.isEmpty()) {
                List result = iterator.collect(expected.size(), TIME_OUT.getSeconds(), TimeUnit.SECONDS);
                Assertions.assertThat(ReadWriteTableTestUtil.toInsertOnlyRows(result)).containsExactlyInAnyOrderElementsOf(ReadWriteTableTestUtil.toInsertOnlyRows(expected));
            }
            Assertions.assertThat((boolean)resultItr.hasNext()).isFalse();
        }
    }

    private static List<Row> toInsertOnlyRows(List<Row> rows) {
        ArrayList<Row> result = new ArrayList<Row>();
        for (Row row : rows) {
            Assertions.assertThat((Comparable)row.getKind()).isIn(new Object[]{RowKind.INSERT, RowKind.UPDATE_AFTER});
            Row newRow = new Row(row.getArity());
            for (int i = 0; i < row.getArity(); ++i) {
                newRow.setField(i, row.getField(i));
            }
            result.add(newRow);
        }
        return result;
    }

    public static BlockingIterator<Row, Row> testStreamingRead(String query, List<Row> expected) throws Exception {
        BlockingIterator iterator = BlockingIterator.of((Iterator)sEnv.executeSql(query).collect());
        ReadWriteTableTestUtil.validateStreamingReadResult((BlockingIterator<Row, Row>)iterator, expected);
        return iterator;
    }

    public static BlockingIterator<Row, Row> testStreamingReadWithReadFirst(String source, String sink, String query, List<Row> expected) throws Exception {
        BlockingIterator iterator = BlockingIterator.of((Iterator)sEnv.executeSql(query).collect());
        ReadWriteTableTestUtil.insertIntoFromTable(source, sink);
        ReadWriteTableTestUtil.validateStreamingReadResult((BlockingIterator<Row, Row>)iterator, expected);
        return iterator;
    }

    public static void validateStreamingReadResult(BlockingIterator<Row, Row> streamingItr, List<Row> expected) throws Exception {
        if (expected.isEmpty()) {
            ReadWriteTableTestUtil.assertNoMoreRecords(streamingItr);
        } else {
            Assertions.assertThat((List)streamingItr.collect(expected.size())).containsExactlyInAnyOrderElementsOf(expected);
        }
    }

    public static void assertNoMoreRecords(BlockingIterator<Row, Row> iterator) {
        List expectedRecords = Collections.emptyList();
        try {
            expectedRecords = iterator.collect(1, 5L, TimeUnit.SECONDS);
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assertions.assertThat(expectedRecords).isEmpty();
    }

    public static String buildDdl(String table, List<String> fieldsSpec, List<String> primaryKeys, List<String> partitionKeys, Map<String, String> options) {
        return String.format("CREATE TABLE `%s`(%s %s) %s %s;", table, String.join((CharSequence)",", fieldsSpec), ReadWriteTableTestUtil.buildPkConstraint(primaryKeys), ReadWriteTableTestUtil.buildPartitionSpec(partitionKeys), ReadWriteTableTestUtil.buildOptionsSpec(options));
    }

    private static String buildPkConstraint(List<String> primaryKeys) {
        if (!primaryKeys.isEmpty()) {
            return String.format(",PRIMARY KEY (%s) NOT ENFORCED", String.join((CharSequence)",", primaryKeys));
        }
        return "";
    }

    private static String buildPartitionSpec(List<String> partitionKeys) {
        if (!partitionKeys.isEmpty()) {
            return String.format("PARTITIONED BY (%s)", String.join((CharSequence)",", partitionKeys));
        }
        return "";
    }

    private static String buildOptionsSpec(Map<String, String> options) {
        if (!options.isEmpty()) {
            return String.format("WITH ( %s )", ReadWriteTableTestUtil.optionsToString(options));
        }
        return "";
    }

    private static String buildTableOptionsSpec(Map<String, String> hints) {
        if (!hints.isEmpty()) {
            return String.format("/*+ OPTIONS ( %s ) */", ReadWriteTableTestUtil.optionsToString(hints));
        }
        return "";
    }

    private static String optionsToString(Map<String, String> options) {
        ArrayList pairs = new ArrayList();
        options.forEach((k, v) -> pairs.add(String.format("'%s' = '%s'", k, v)));
        return String.join((CharSequence)",", pairs);
    }
}

