/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={ParameterizedTestExtension.class})
public class ParallelismConfiguredITCase {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelismConfiguredITCase.class);
    private static final RowType TABLE_TYPE = new RowType(Arrays.asList(new RowType.RowField("_k", (LogicalType)new IntType()), new RowType.RowField("p", (LogicalType)new VarCharType(10)), new RowType.RowField("v", (LogicalType)new IntType())));
    private static final DataType INPUT_TYPE = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"_k", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"p", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"v", (DataType)DataTypes.INT())});
    private static final int NUM_KEYS = 100;
    @TempDir
    private static java.nio.file.Path temporaryFolder;
    private final boolean isBatch;
    private final boolean hasPrimaryKey;
    private final int numBucket;

    public ParallelismConfiguredITCase(boolean isBatch, boolean hasPrimaryKey, int numBucket) {
        this.isBatch = isBatch;
        this.hasPrimaryKey = hasPrimaryKey;
        this.numBucket = numBucket;
    }

    @Parameters(name="isBatch={0}, hasPrimaryKey={1}, numBucket={2}")
    public static List<Object[]> getVarSeg() {
        List<Boolean> isBatchList = Arrays.asList(true, false);
        List<Boolean> hasPrimaryKeyList = Arrays.asList(true, false);
        List<Integer> numBucketList = Arrays.asList(-1, 1, 8);
        ArrayList<Object[]> result = new ArrayList<Object[]>();
        for (Boolean isBatch : isBatchList) {
            for (Boolean hasPrimaryKey : hasPrimaryKeyList) {
                for (Integer numBucket : numBucketList) {
                    result.add(new Object[]{isBatch, hasPrimaryKey, numBucket});
                }
            }
        }
        return result;
    }

    @TestTemplate
    public void testParallelismConfigurable() throws Exception {
        int[] primaryKey;
        int[] nArray;
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.BUCKET.key(), Integer.toString(this.numBucket));
        if (this.hasPrimaryKey) {
            int[] nArray2 = new int[2];
            nArray2[0] = 0;
            nArray = nArray2;
            nArray2[1] = 1;
        } else {
            nArray = primaryKey = new int[]{};
        }
        if (primaryKey.length == 0 && this.numBucket > 0) {
            options.put(CoreOptions.BUCKET_KEY.key(), "_k");
        }
        options.put(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING.key(), "false");
        String tempDirPath = new File(temporaryFolder.toFile(), UUID.randomUUID() + "/").toString();
        FileStoreTable table = ParallelismConfiguredITCase.buildFileStoreTable(tempDirPath, new int[]{1}, primaryKey, options);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)(this.isBatch ? RuntimeExecutionMode.BATCH : RuntimeExecutionMode.STREAMING));
        configuration.set(org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM, (Object)1);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, (Object)1);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, (Object)1);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        this.buildJob(env, table, 1);
        this.verifyJobGraph(env, table);
        env.execute();
        this.verifyResult(table, 1);
        LOG.info("restart job with parallelism 3");
        configuration.set(org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM, (Object)3);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, (Object)3);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, (Object)3);
        env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        this.buildJob(env, table, 2);
        this.verifyJobGraph(env, table);
        env.execute();
        this.verifyResult(table, 2);
        LOG.info("restart job with parallelism 5");
        configuration.set(org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM, (Object)5);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, (Object)5);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, (Object)5);
        env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        this.buildJob(env, table, 3);
        this.verifyJobGraph(env, table);
        env.execute();
        this.verifyResult(table, 3);
    }

    private static FileStoreTable buildFileStoreTable(String temporaryPath, int[] partitions, int[] primaryKey, Map<String, String> optionsMap) throws Exception {
        Options options = new Options();
        options.set(CoreOptions.PATH, (Object)temporaryPath);
        options.set(CoreOptions.FILE_FORMAT, (Object)"avro");
        for (Map.Entry<String, String> entry : optionsMap.entrySet()) {
            options.set(entry.getKey(), entry.getValue());
        }
        Path tablePath = new CoreOptions(options.toMap()).path();
        Schema schema = new Schema(LogicalTypeConversion.toDataType((RowType)TABLE_TYPE).getFields(), Arrays.stream(partitions).mapToObj(i -> (String)TABLE_TYPE.getFieldNames().get(i)).collect(Collectors.toList()), Arrays.stream(primaryKey).mapToObj(i -> (String)TABLE_TYPE.getFieldNames().get(i)).collect(Collectors.toList()), options.toMap(), "");
        return (FileStoreTable)FailingFileIO.retryArtificialException(() -> {
            new SchemaManager((FileIO)LocalFileIO.create(), tablePath).createTable(schema);
            return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Options)options);
        });
    }

    private void buildJob(StreamExecutionEnvironment env, FileStoreTable table, int round) {
        SingleOutputStreamOperator source = env.fromSequence((long)(round - 1) * 100L, (long)round * 100L - 1L).map((MapFunction & Serializable)x -> Row.of((Object[])new Object[]{x.intValue() % 100, String.valueOf(x % 100L), x.intValue()}));
        source.getTransformation().setParallelism(source.getParallelism(), false);
        new FlinkSinkBuilder((Table)table).forRow((DataStream)source, INPUT_TYPE).build();
    }

    private void verifyJobGraph(StreamExecutionEnvironment env, FileStoreTable table) {
        for (JobVertex jobVertex : env.getStreamGraph(false).getJobGraph().getVertices()) {
            if (jobVertex.getName().startsWith("Global Committer") || jobVertex.getName().startsWith("end: Writer") || jobVertex.getName().startsWith("Compact Coordinator")) {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)jobVertex.isParallelismConfigured()).withFailMessage("Vertex %s should have parallelism configured", new Object[]{jobVertex})).isTrue();
                ((AbstractIntegerAssert)Assertions.assertThat((int)jobVertex.getParallelism()).withFailMessage("Vertex %s should have parallelism 1", new Object[]{jobVertex})).isOne();
                continue;
            }
            if (BucketMode.HASH_DYNAMIC.equals((Object)table.bucketMode()) && this.isBatch && (jobVertex.getName().contains("Writer") || jobVertex.getName().contains("dynamic-bucket-assigner"))) {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)jobVertex.isParallelismConfigured()).withFailMessage("Vertex %s should have parallelism configured", new Object[]{jobVertex})).isTrue();
                continue;
            }
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)jobVertex.isParallelismConfigured()).withFailMessage("Vertex %s should not have parallelism configured", new Object[]{jobVertex})).isFalse();
        }
    }

    private void verifyResult(FileStoreTable table, int round) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream source = new FlinkSourceBuilder((Table)table).env(env).sourceBounded(true).buildForRow();
        ArrayList<Object> results = new ArrayList<Object>();
        try (CloseableIterator iterator = source.executeAndCollect();){
            while (iterator.hasNext()) {
                results.add(iterator.next());
            }
        }
        if (this.hasPrimaryKey) {
            Assertions.assertThat(results).hasSize(100);
            results.sort(Comparator.comparingInt(x -> (Integer)x.getFieldAs(0)));
            for (int i = 0; i < 100; ++i) {
                Row result = (Row)results.get(i);
                Assertions.assertThat((Object)result.getField(0)).isEqualTo((Object)i);
                Assertions.assertThat((Object)result.getField(1)).isEqualTo((Object)Integer.toString(i));
                Assertions.assertThat((int)((Integer)result.getFieldAs(2))).isEqualTo((round - 1) * 100 + i);
            }
        } else {
            Assertions.assertThat(results).hasSize(100 * round);
            results.sort(Comparator.comparingInt(x -> (Integer)x.getFieldAs(2)));
            for (int i = 0; i < 100 * round; ++i) {
                Assertions.assertThat(results.get(i)).isEqualTo((Object)Row.of((Object[])new Object[]{i % 100, String.valueOf(i % 100), i}));
            }
        }
    }
}

