/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

public abstract class ActionITCaseBase
extends AbstractTestBase {
    protected String warehouse;
    protected String database;
    protected String tableName;
    protected String commitUser;
    protected StreamTableWrite write;
    protected StreamTableCommit commit;
    protected Catalog catalog;
    private long incrementalIdentifier;

    @BeforeEach
    public void before() throws IOException {
        this.warehouse = this.getTempDirPath();
        this.database = "default";
        this.tableName = "test_table_" + UUID.randomUUID();
        this.commitUser = UUID.randomUUID().toString();
        this.incrementalIdentifier = 0L;
        this.catalog = CatalogFactory.createCatalog((CatalogContext)CatalogContext.create((Path)new Path(this.warehouse)));
    }

    @AfterEach
    public void after() throws Exception {
        if (this.write != null) {
            this.write.close();
            this.write = null;
        }
        if (this.commit != null) {
            this.commit.close();
            this.commit = null;
        }
        this.catalog.close();
    }

    protected FileStoreTable createFileStoreTable(RowType rowType, List<String> partitionKeys, List<String> primaryKeys, Map<String, String> options) throws Exception {
        return this.createFileStoreTable(this.tableName, rowType, partitionKeys, primaryKeys, options);
    }

    protected FileStoreTable createFileStoreTable(String tableName, RowType rowType, List<String> partitionKeys, List<String> primaryKeys, Map<String, String> options) throws Exception {
        Identifier identifier = Identifier.create((String)this.database, (String)tableName);
        this.catalog.createDatabase(this.database, true);
        this.catalog.createTable(identifier, new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, ""), false);
        return (FileStoreTable)this.catalog.getTable(identifier);
    }

    protected FileStoreTable getFileStoreTable(String tableName) throws Exception {
        Identifier identifier = Identifier.create((String)this.database, (String)tableName);
        return (FileStoreTable)this.catalog.getTable(identifier);
    }

    protected GenericRow rowData(Object ... values) {
        return GenericRow.of((Object[])values);
    }

    protected void writeData(GenericRow ... data) throws Exception {
        for (GenericRow d : data) {
            this.write.write((InternalRow)d);
        }
        this.commit.commit(this.incrementalIdentifier, this.write.prepareCommit(true, this.incrementalIdentifier));
        ++this.incrementalIdentifier;
    }

    protected List<String> getResult(TableRead read, List<Split> splits, RowType rowType) throws Exception {
        try (RecordReader recordReader = read.createReader(splits);){
            ArrayList<String> result = new ArrayList<String>();
            recordReader.forEachRemaining(row -> result.add(DataFormatTestUtil.internalRowToString((InternalRow)row, (RowType)rowType)));
            ArrayList<String> arrayList = result;
            return arrayList;
        }
    }

    protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
        if (isStreaming) {
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointInterval(500L);
        } else {
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        }
        return env;
    }

    protected <T extends ActionBase> T createAction(Class<T> clazz, List<String> args) {
        return this.createAction(clazz, args.toArray(new String[0]));
    }

    protected <T extends ActionBase> T createAction(Class<T> clazz, String ... args) {
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.confuseArgs(args, "_", "-");
        } else {
            this.confuseArgs(args, "-", "_");
        }
        return (T)ActionFactory.createAction((String[])args).filter(clazz::isInstance).map(clazz::cast).orElseThrow(() -> new RuntimeException("Failed to create action"));
    }

    private void confuseArgs(String[] args, String regex, String replacement) {
        args[0] = args[0].replaceAll(regex, replacement);
        for (int i = 1; i < args.length; i += 2) {
            String arg = args[i].substring(2);
            args[i] = "--" + arg.replaceAll(regex, replacement);
        }
    }

    protected void callProcedure(String procedureStatement) {
        this.callProcedure(procedureStatement, true, false);
    }

    protected void callProcedure(String procedureStatement, boolean isStreaming, boolean dmlSync) {
        StreamTableEnvironment tEnv;
        StreamExecutionEnvironment env = this.buildDefaultEnv(isStreaming);
        if (isStreaming) {
            tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)EnvironmentSettings.inStreamingMode());
            tEnv.getConfig().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofMillis(500L));
        } else {
            tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)EnvironmentSettings.inBatchMode());
        }
        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)dmlSync);
        tEnv.executeSql(String.format("CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", this.warehouse));
        tEnv.useCatalog("PAIMON");
        tEnv.executeSql(procedureStatement);
    }
}

