/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.CompiledPlanUtils;
import org.apache.flink.util.TimeUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class WriterChainingStrategyTest {
    private static final String TABLE_NAME = "paimon_table";
    @TempDir
    Path tempDir;
    private StreamTableEnvironment tEnv;

    @BeforeEach
    public void beforeEach() {
        Configuration config = new Configuration();
        config.setString("execution.checkpointing.interval", TimeUtils.formatWithHighestUnit((Duration)Duration.ofMillis(500L)));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        String catalog = "PAIMON";
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("type", "paimon");
        options.put("warehouse", this.tempDir.toString());
        this.tEnv.executeSql(String.format("CREATE CATALOG %s WITH ( %s )", catalog, options.entrySet().stream().map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())).collect(Collectors.joining(","))));
        this.tEnv.useCatalog(catalog);
    }

    @Test
    public void testAppendTable() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (id INT, data STRING, dt STRING) WITH ('bucket' = '1', 'bucket-key'='id', 'write-only' = 'true')", TABLE_NAME)).await();
        this.verifyChaining(false, true);
    }

    @Test
    public void testAppendTableWithUnawareBucket() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (id INT, data STRING, dt STRING) WITH ('bucket' = '-1', 'write-only' = 'true')", TABLE_NAME)).await();
        this.verifyChaining(true, true);
    }

    @Test
    public void testPrimaryKeyTable() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ('bucket' = '1', 'bucket-key'='id', 'write-only' = 'true')", TABLE_NAME)).await();
        this.verifyChaining(false, true);
    }

    @Test
    public void testPrimaryKeyTableWithDynamicBucket() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ('bucket' = '-1', 'write-only' = 'true')", TABLE_NAME)).await();
        this.verifyChaining(false, true);
    }

    @Test
    public void testPrimaryKeyTableWithMultipleWriter() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ('bucket' = '1', 'bucket-key'='id', 'write-only' = 'true', 'sink.parallelism' = '2')", TABLE_NAME)).await();
        this.verifyChaining(false, false);
    }

    @Test
    public void testPrimaryKeyTableWithCrossPartitionUpdate() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) PARTITIONED BY ( dt ) WITH ('bucket' = '-1', 'write-only' = 'true')", TABLE_NAME)).await();
        List<JobVertex> vertices = this.verifyChaining(false, true);
        JobVertex vertex = this.findVertex(vertices, "INDEX_BOOTSTRAP");
        Assertions.assertThat((String)vertex.toString()).contains(new CharSequence[]{"Source"});
    }

    @Test
    public void testPrimaryKeyTableWithLocalMerge() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ('bucket' = '-1', 'write-only' = 'true', 'local-merge-buffer-size' = '1MB')", TABLE_NAME)).await();
        List<JobVertex> vertices = this.verifyChaining(false, true);
        JobVertex vertex = this.findVertex(vertices, "local merge");
        Assertions.assertThat((String)vertex.toString()).contains(new CharSequence[]{"Source"});
    }

    private List<JobVertex> verifyChaining(boolean isWriterChainedWithUpstream, boolean isWriterChainedWithDownStream) {
        CompiledPlan plan = this.tEnv.compilePlanSql(String.format("INSERT INTO %s VALUES (1, 'AAA', ''), (2, 'BBB', '')", TABLE_NAME));
        List transformations = CompiledPlanUtils.toTransformations((TableEnvironment)this.tEnv, (CompiledPlan)plan);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        transformations.forEach(arg_0 -> ((StreamExecutionEnvironment)env).addOperator(arg_0));
        ArrayList<JobVertex> vertices = new ArrayList<JobVertex>();
        env.getStreamGraph().getJobGraph().getVertices().forEach(vertices::add);
        JobVertex vertex = this.findVertex(vertices, "Writer(write-only)");
        if (isWriterChainedWithUpstream) {
            Assertions.assertThat((String)vertex.toString()).contains(new CharSequence[]{"Source"});
        } else {
            Assertions.assertThat((String)vertex.toString()).doesNotContain(new CharSequence[]{"Source"});
        }
        if (isWriterChainedWithDownStream) {
            Assertions.assertThat((String)vertex.toString()).contains(new CharSequence[]{"Committer"});
        } else {
            Assertions.assertThat((String)vertex.toString()).doesNotContain(new CharSequence[]{"Committer"});
        }
        return vertices;
    }

    private JobVertex findVertex(List<JobVertex> vertices, String key) {
        for (JobVertex vertex : vertices) {
            if (!vertex.toString().contains(key)) continue;
            return vertex;
        }
        throw new IllegalStateException(String.format("Cannot find vertex with keyword %s among job vertices %s", key, vertices));
    }
}

