/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.VarCharType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class SchemaEvolutionTest
extends TableTestBase {
    private FileStoreTable table;
    private String tableName = "MyTable";

    private static List<CdcSchema> prepareData() {
        CdcSchema upSchema1 = CdcSchema.newBuilder().column("col_0", (DataType)new VarCharType(), "test description.").column("col_1", (DataType)new IntType(), "test description.").column("col_2", (DataType)new IntType(), "test description.").column("col_3", (DataType)new VarCharType(), "Someone's desc.").column("col_4", (DataType)new VarCharType(), "Someone's desc.").column("col_5", (DataType)new VarCharType(), "Someone's desc.").column("col_6", (DataType)new DecimalType(), "Someone's desc.").column("col_7", (DataType)new VarCharType(), "Someone's desc.").column("col_8", (DataType)new VarCharType(), "Someone's desc.").column("col_9", (DataType)new VarCharType(), "Someone's desc.").column("col_10", (DataType)new VarCharType(), "Someone's desc.").column("col_11", (DataType)new VarCharType(), "Someone's desc.").column("col_12", (DataType)new DoubleType(), "Someone's desc.").column("col_13", (DataType)new VarCharType(), "Someone's desc.").column("col_14", (DataType)new VarCharType(), "Someone's desc.").column("col_15", (DataType)new VarCharType(), "Someone's desc.").column("col_16", (DataType)new VarCharType(), "Someone's desc.").column("col_17", (DataType)new VarCharType(), "Someone's desc.").column("col_18", (DataType)new VarCharType(), "Someone's desc.").column("col_19", (DataType)new VarCharType(), "Someone's desc.").column("col_20", (DataType)new VarCharType(), "Someone's desc.").build();
        CdcSchema upSchema2 = CdcSchema.newBuilder().column("col_0", (DataType)new VarCharType(), "test description.").column("col_1", (DataType)new BigIntType(), "test description.").column("col_2", (DataType)new IntType(), "test description.").column("col_3", (DataType)new VarCharType(), "Someone's desc.").column("col_4", (DataType)new VarCharType(), "Someone's desc.").column("col_5", (DataType)new VarCharType(), "Someone's desc.").column("col_6", (DataType)new DecimalType(), "Someone's desc.").column("col_7", (DataType)new VarCharType(), "Someone's desc.").column("col_8", (DataType)new VarCharType(), "Someone's desc.").column("col_9", (DataType)new VarCharType(), "Someone's desc.").column("col_10", (DataType)new VarCharType(), "Someone's desc.").column("col_11", (DataType)new VarCharType(), "Someone's desc.").column("col_12", (DataType)new DoubleType(), "Someone's desc.").column("col_13", (DataType)new VarCharType(), "Someone's desc.").column("col_14", (DataType)new VarCharType(), "Someone's desc.").column("col_15", (DataType)new VarCharType(), "Someone's desc.").column("col_16", (DataType)new VarCharType(), "Someone's desc.").column("col_17", (DataType)new VarCharType(), "Someone's desc.").column("col_18", (DataType)new VarCharType(), "Someone's desc.").column("col_19", (DataType)new VarCharType(), "Someone's desc.").column("col_20", (DataType)new VarCharType(), "Someone's desc.").build();
        CdcSchema upSchema3 = CdcSchema.newBuilder().column("col_0", (DataType)new VarCharType(), "test description.").column("col_1", (DataType)new BigIntType(), "test description.").column("col_2", (DataType)new IntType(), "test description 2.").column("col_3", (DataType)new VarCharType(), "Someone's desc.").column("col_4", (DataType)new VarCharType(), "Someone's desc.").column("col_5", (DataType)new VarCharType(), "Someone's desc.").column("col_6", (DataType)new DecimalType(), "Someone's desc.").column("col_7", (DataType)new VarCharType(), "Someone's desc.").column("col_8", (DataType)new VarCharType(), "Someone's desc.").column("col_9", (DataType)new VarCharType(), "Someone's desc.").column("col_10", (DataType)new VarCharType(), "Someone's desc.").column("col_11", (DataType)new VarCharType(), "Someone's desc.").column("col_12", (DataType)new DoubleType(), "Someone's desc.").column("col_13", (DataType)new VarCharType(), "Someone's desc.").column("col_14", (DataType)new VarCharType(), "Someone's desc.").column("col_15", (DataType)new VarCharType(), "Someone's desc.").column("col_16", (DataType)new VarCharType(), "Someone's desc.").column("col_17", (DataType)new VarCharType(), "Someone's desc.").column("col_18", (DataType)new VarCharType(), "Someone's desc.").column("col_19", (DataType)new VarCharType(), "Someone's desc.").column("col_20", (DataType)new VarCharType(), "Someone's desc.").build();
        CdcSchema upSchema4 = CdcSchema.newBuilder().column("col_0", (DataType)new VarCharType(), "test description.").column("col_1", (DataType)new BigIntType(), "test description.").column("col_2", (DataType)new IntType(), "test description.").column("col_3_1", (DataType)new VarCharType(), "Someone's desc.").column("col_4", (DataType)new VarCharType(), "Someone's desc.").column("col_5", (DataType)new VarCharType(), "Someone's desc.").column("col_6", (DataType)new DecimalType(), "Someone's desc.").column("col_7", (DataType)new VarCharType(), "Someone's desc.").column("col_8", (DataType)new VarCharType(), "Someone's desc.").column("col_9", (DataType)new VarCharType(), "Someone's desc.").column("col_10", (DataType)new VarCharType(), "Someone's desc.").column("col_11", (DataType)new VarCharType(), "Someone's desc.").column("col_12", (DataType)new DoubleType(), "Someone's desc.").column("col_13", (DataType)new VarCharType(), "Someone's desc.").column("col_14", (DataType)new VarCharType(), "Someone's desc.").column("col_15", (DataType)new VarCharType(), "Someone's desc.").column("col_16", (DataType)new VarCharType(), "Someone's desc.").column("col_17", (DataType)new VarCharType(), "Someone's desc.").column("col_18", (DataType)new VarCharType(), "Someone's desc.").column("col_19", (DataType)new VarCharType(), "Someone's desc.").column("col_20", (DataType)new VarCharType(), "Someone's desc.").build();
        CdcSchema upSchema5 = CdcSchema.newBuilder().column("col_0", (DataType)new VarCharType(), "test description.").column("col_1", (DataType)new BigIntType(), "test description.").column("col_2_1", (DataType)new BigIntType(), "test description 2.").column("col_3", (DataType)new VarCharType(), "Someone's desc.").column("col_4", (DataType)new VarCharType(), "Someone's desc.").column("col_5", (DataType)new VarCharType(), "Someone's desc.").column("col_6", (DataType)new DecimalType(), "Someone's desc.").column("col_7", (DataType)new VarCharType(), "Someone's desc.").column("col_8", (DataType)new VarCharType(), "Someone's desc.").column("col_9", (DataType)new VarCharType(), "Someone's desc.").column("col_10", (DataType)new VarCharType(), "Someone's desc.").column("col_11", (DataType)new VarCharType(), "Someone's desc.").column("col_12", (DataType)new DoubleType(), "Someone's desc.").column("col_13", (DataType)new VarCharType(), "Someone's desc.").column("col_14", (DataType)new VarCharType(), "Someone's desc.").column("col_15", (DataType)new VarCharType(), "Someone's desc.").column("col_16", (DataType)new VarCharType(), "Someone's desc.").column("col_17", (DataType)new VarCharType(), "Someone's desc.").column("col_18", (DataType)new VarCharType(), "Someone's desc.").column("col_19", (DataType)new VarCharType(), "Someone's desc.").column("col_20", (DataType)new VarCharType(), "Someone's desc.").build();
        return Arrays.asList(upSchema1, upSchema2, upSchema3, upSchema4, upSchema5);
    }

    @BeforeEach
    public void before() throws Exception {
        LocalFileIO fileIO = LocalFileIO.create();
        Path tablePath = new Path(String.format("%s/%s.db/%s", this.warehouse, this.database, this.tableName));
        Schema schema = Schema.newBuilder().column("pk", (DataType)DataTypes.INT()).column("pt1", (DataType)DataTypes.INT()).column("pt2", (DataType)DataTypes.INT()).column("col1", (DataType)DataTypes.INT()).partitionKeys(new String[]{"pt1", "pt2"}).primaryKey(new String[]{"pk", "pt1", "pt2"}).option(CoreOptions.CHANGELOG_PRODUCER.key(), "input").option(CoreOptions.BUCKET.key(), "2").option(CoreOptions.SEQUENCE_FIELD.key(), "col1").build();
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager((FileIO)fileIO, tablePath), (Schema)schema);
        this.table = FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)tablePath, (TableSchema)tableSchema);
    }

    @Test
    public void testSchemaEvolution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource upDataFieldStream = env.fromCollection(SchemaEvolutionTest.prepareData());
        Options options = new Options();
        options.set("warehouse", this.tempPath.toString());
        CatalogLoader & Serializable catalogLoader = (CatalogLoader & Serializable)() -> FlinkCatalogFactory.createPaimonCatalog((Options)options);
        Identifier identifier = Identifier.create((String)this.database, (String)this.tableName);
        SingleOutputStreamOperator schemaChangeProcessFunction = upDataFieldStream.process((ProcessFunction)new UpdatedDataFieldsProcessFunction(new SchemaManager(this.table.fileIO(), this.table.location()), identifier, (CatalogLoader)catalogLoader, TypeMapping.defaultMapping())).name("Schema Evolution");
        schemaChangeProcessFunction.getTransformation().setParallelism(1);
        schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
        env.execute();
    }
}

