/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.MissingSchemaFieldException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionBase;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDeltaStreamerSchemaEvolutionBase {
    @Override
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.resetTargetSchema();
    }

    protected static Stream<Arguments> testArgs() {
        boolean fullTest = false;
        Stream.Builder<Arguments> b = Stream.builder();
        if (fullTest) {
            for (Boolean rowWriterEnable : new Boolean[]{false, true}) {
                for (Boolean nullForDeletedCols : new Boolean[]{false, true}) {
                    for (Boolean useKafkaSource : new Boolean[]{false, true}) {
                        for (Boolean addFilegroups : new Boolean[]{false, true}) {
                            for (Boolean multiLogFiles : new Boolean[]{false, true}) {
                                for (Boolean useParquetLogFile : new Boolean[]{false, true}) {
                                    for (Boolean shouldCluster : new Boolean[]{false, true}) {
                                        for (String tableType : new String[]{"COPY_ON_WRITE", "MERGE_ON_READ"}) {
                                            if ((multiLogFiles.booleanValue() || useParquetLogFile.booleanValue()) && !tableType.equals("MERGE_ON_READ")) continue;
                                            b.add(Arguments.of((Object[])new Object[]{tableType, shouldCluster, false, rowWriterEnable, addFilegroups, multiLogFiles, useKafkaSource, nullForDeletedCols, useParquetLogFile}));
                                        }
                                    }
                                    b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", false, true, rowWriterEnable, addFilegroups, multiLogFiles, useKafkaSource, nullForDeletedCols, useParquetLogFile}));
                                }
                            }
                        }
                    }
                }
            }
        } else {
            b.add(Arguments.of((Object[])new Object[]{"COPY_ON_WRITE", true, false, true, false, false, true, false, false}));
            b.add(Arguments.of((Object[])new Object[]{"COPY_ON_WRITE", true, false, true, false, false, true, true, false}));
            b.add(Arguments.of((Object[])new Object[]{"COPY_ON_WRITE", true, false, false, false, false, true, true, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", true, false, false, true, true, true, true, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", false, true, true, true, true, true, true, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", false, true, true, true, true, true, true, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", false, false, true, true, true, false, true, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", false, false, false, true, true, false, true, true}));
        }
        return b.build();
    }

    protected static Stream<Arguments> testReorderedColumn() {
        Stream.Builder<Arguments> b = Stream.builder();
        for (Boolean rowWriterEnable : new Boolean[]{false, true}) {
            for (Boolean nullForDeletedCols : new Boolean[]{false, true}) {
                for (Boolean useKafkaSource : new Boolean[]{false, true}) {
                    for (String tableType : new String[]{"COPY_ON_WRITE", "MERGE_ON_READ"}) {
                        b.add(Arguments.of((Object[])new Object[]{tableType, rowWriterEnable, useKafkaSource, nullForDeletedCols}));
                    }
                }
            }
        }
        return b.build();
    }

    protected static Stream<Arguments> testParamsWithSchemaTransformer() {
        boolean fullTest = false;
        Stream.Builder<Arguments> b = Stream.builder();
        if (fullTest) {
            for (Boolean useTransformer : new Boolean[]{false, true}) {
                for (Boolean setSchema : new Boolean[]{false, true}) {
                    for (Boolean rowWriterEnable : new Boolean[]{false, true}) {
                        for (Boolean nullForDeletedCols : new Boolean[]{false, true}) {
                            for (Boolean useKafkaSource : new Boolean[]{false, true}) {
                                for (String tableType : new String[]{"COPY_ON_WRITE", "MERGE_ON_READ"}) {
                                    b.add(Arguments.of((Object[])new Object[]{tableType, rowWriterEnable, useKafkaSource, nullForDeletedCols, useTransformer, setSchema}));
                                }
                            }
                        }
                    }
                }
            }
        } else {
            b.add(Arguments.of((Object[])new Object[]{"COPY_ON_WRITE", true, true, true, true, true}));
            b.add(Arguments.of((Object[])new Object[]{"COPY_ON_WRITE", true, false, false, false, true}));
            b.add(Arguments.of((Object[])new Object[]{"COPY_ON_WRITE", false, false, false, false, true}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", true, true, true, false, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", true, true, false, false, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", true, false, true, true, false}));
            b.add(Arguments.of((Object[])new Object[]{"MERGE_ON_READ", false, false, true, true, false}));
        }
        return b.build();
    }

    @ParameterizedTest
    @MethodSource(value={"testArgs"})
    public void testBase(String tableType, Boolean shouldCluster, Boolean shouldCompact, Boolean rowWriterEnable, Boolean addFilegroups, Boolean multiLogFiles, Boolean useKafkaSource, Boolean allowNullForDeletedCols, Boolean useParquetLogBlock) throws Exception {
        this.tableType = tableType;
        this.shouldCluster = shouldCluster;
        this.shouldCompact = shouldCompact;
        this.rowWriterEnable = rowWriterEnable;
        this.addFilegroups = addFilegroups;
        this.multiLogFiles = multiLogFiles;
        this.useParquetLogBlock = useParquetLogBlock;
        this.useKafkaSource = useKafkaSource;
        if (useKafkaSource.booleanValue()) {
            this.useSchemaProvider = true;
        }
        this.useTransformer = true;
        boolean isCow = tableType.equals("COPY_ON_WRITE");
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
        this.tableName = "test_parquet_table" + testNum;
        this.tableBasePath = basePath + this.tableName;
        this.deltaStreamer = new HoodieDeltaStreamer(this.getDeltaStreamerConfig(allowNullForDeletedCols), jsc);
        String datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        Dataset df = sparkSession.read().json(datapath);
        this.addData((Dataset<Row>)df, true);
        this.deltaStreamer.sync();
        int numRecords = 6;
        int numFiles = 3;
        this.assertRecordCount(numRecords);
        this.assertFileNumber(numFiles, isCow);
        if (multiLogFiles.booleanValue()) {
            datapath = String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
            df = sparkSession.read().json(datapath);
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            this.assertRecordCount(numRecords);
            this.assertFileNumber(numFiles, false);
        }
        if (addFilegroups.booleanValue()) {
            datapath = String.class.getResource("/data/schema-evolution/newFileGroupsTestEverything.json").getPath();
            df = sparkSession.read().json(datapath);
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            this.assertRecordCount(numRecords += 3);
            this.assertFileNumber(numFiles += 3, isCow);
        }
        datapath = String.class.getResource("/data/schema-evolution/endTestEverything.json").getPath();
        df = sparkSession.read().json(datapath);
        Column col = df.col("tip_history");
        df = df.withColumn("tip_history", col.cast((DataType)DataTypes.createArrayType((DataType)DataTypes.LongType)));
        col = df.col("fare");
        df = df.withColumn("fare", col.cast((DataType)DataTypes.createStructType((StructField[])new StructField[]{new StructField("amount", DataTypes.StringType, true, Metadata.empty()), new StructField("currency", DataTypes.StringType, true, Metadata.empty()), new StructField("zextra_col_nested", DataTypes.StringType, true, Metadata.empty())})));
        col = df.col("begin_lat");
        df = df.withColumn("begin_lat", col.cast(DataTypes.DoubleType));
        col = df.col("end_lat");
        df = df.withColumn("end_lat", col.cast(DataTypes.StringType));
        col = df.col("distance_in_meters");
        df = df.withColumn("distance_in_meters", col.cast(DataTypes.FloatType));
        col = df.col("seconds_since_epoch");
        df = df.withColumn("seconds_since_epoch", col.cast(DataTypes.StringType));
        try {
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            Assertions.assertTrue((boolean)allowNullForDeletedCols);
        }
        catch (MissingSchemaFieldException e) {
            Assertions.assertFalse((boolean)allowNullForDeletedCols);
            return;
        }
        if (shouldCluster.booleanValue()) {
            this.assertBaseFileOnlyNumber(3);
        } else if (shouldCompact.booleanValue() || isCow) {
            this.assertBaseFileOnlyNumber(numFiles);
        } else {
            this.assertFileNumber(numFiles += 2, false);
        }
        this.assertRecordCount(numRecords);
        df = sparkSession.read().format("hudi").load(this.tableBasePath);
        df.show(100, false);
        df.cache();
        this.assertDataType((Dataset<Row>)df, "tip_history", (DataType)DataTypes.createArrayType((DataType)DataTypes.LongType));
        this.assertDataType((Dataset<Row>)df, "fare", (DataType)DataTypes.createStructType((StructField[])new StructField[]{new StructField("amount", DataTypes.StringType, true, Metadata.empty()), new StructField("currency", DataTypes.StringType, true, Metadata.empty()), new StructField("extra_col_struct", DataTypes.LongType, true, Metadata.empty()), new StructField("zextra_col_nested", DataTypes.StringType, true, Metadata.empty())}));
        this.assertDataType((Dataset<Row>)df, "begin_lat", DataTypes.DoubleType);
        this.assertDataType((Dataset<Row>)df, "end_lat", DataTypes.StringType);
        this.assertDataType((Dataset<Row>)df, "distance_in_meters", DataTypes.FloatType);
        this.assertDataType((Dataset<Row>)df, "seconds_since_epoch", DataTypes.StringType);
        this.assertCondition((Dataset<Row>)df, "zextra_col = 'yes'", 2);
        this.assertCondition((Dataset<Row>)df, "_extra_col = 'yes'", 2);
        this.assertCondition((Dataset<Row>)df, "fare.zextra_col_nested = 'yes'", 2);
        this.assertCondition((Dataset<Row>)df, "size(zcomplex_array) > 0", 2);
        this.assertCondition((Dataset<Row>)df, "extra_col_regular is NULL", 2);
        this.assertCondition((Dataset<Row>)df, "fare.extra_col_struct is NULL", 2);
    }

    @ParameterizedTest
    @MethodSource(value={"testReorderedColumn"})
    public void testReorderingColumn(String tableType, Boolean rowWriterEnable, Boolean useKafkaSource, Boolean allowNullForDeletedCols) throws Exception {
        this.tableType = tableType;
        this.rowWriterEnable = rowWriterEnable;
        this.useKafkaSource = useKafkaSource;
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = true;
        if (useKafkaSource.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean isCow = tableType.equals("COPY_ON_WRITE");
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
        this.tableName = "test_parquet_table" + testNum;
        this.tableBasePath = basePath + this.tableName;
        String datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        Dataset df = sparkSession.read().json(datapath);
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        this.addData((Dataset<Row>)df, true);
        this.deltaStreamer.sync();
        int numRecords = 6;
        int numFiles = 3;
        this.assertRecordCount(numRecords);
        this.assertFileNumber(numFiles, isCow);
        if (tableType.equals("MERGE_ON_READ")) {
            datapath = String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
            df = sparkSession.read().json(datapath);
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            this.assertRecordCount(numRecords);
            this.assertFileNumber(numFiles, false);
        }
        this.assertRecordCount(numRecords);
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        HoodieStreamer.Config dsConfig = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = TestHoodieDeltaStreamerSchemaEvolutionQuick.getMetaClient(dsConfig);
        HoodieInstant lastInstant = (HoodieInstant)metaClient.getActiveTimeline().lastInstant().get();
        datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        df = sparkSession.read().json(datapath);
        df = df.drop("rider").withColumn("rider", functions.lit((Object)"rider-003"));
        this.addData((Dataset<Row>)df, false);
        this.deltaStreamer.sync();
        metaClient.reloadActiveTimeline();
        Option latestTableSchemaOpt = UtilHelpers.getLatestTableSchema((JavaSparkContext)jsc, (HoodieStorage)storage, (String)dsConfig.targetBasePath, (HoodieTableMetaClient)metaClient);
        Assertions.assertTrue((boolean)((Schema)latestTableSchemaOpt.get()).getField("rider").schema().getTypes().stream().anyMatch(t -> t.getType().equals((Object)Schema.Type.STRING)));
        Assertions.assertTrue((((HoodieInstant)metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(lastInstant) > 0 ? 1 : 0) != 0);
    }

    @ParameterizedTest
    @MethodSource(value={"testParamsWithSchemaTransformer"})
    public void testDroppedColumn(String tableType, Boolean rowWriterEnable, Boolean useKafkaSource, Boolean allowNullForDeletedCols, Boolean useTransformer, Boolean targetSchemaSameAsTableSchema) throws Exception {
        this.tableType = tableType;
        this.rowWriterEnable = rowWriterEnable;
        this.useKafkaSource = useKafkaSource;
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = useTransformer;
        if (useKafkaSource.booleanValue() || targetSchemaSameAsTableSchema.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean isCow = tableType.equals("COPY_ON_WRITE");
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
        this.tableName = "test_parquet_table" + testNum;
        this.tableBasePath = basePath + this.tableName;
        String datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        Dataset df = sparkSession.read().json(datapath);
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        this.addData((Dataset<Row>)df, true);
        this.deltaStreamer.sync();
        int numRecords = 6;
        int numFiles = 3;
        this.assertRecordCount(numRecords);
        this.assertFileNumber(numFiles, isCow);
        if (tableType.equals("MERGE_ON_READ")) {
            datapath = String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
            df = sparkSession.read().json(datapath);
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            this.assertRecordCount(numRecords);
            this.assertFileNumber(numFiles, false);
        }
        if (targetSchemaSameAsTableSchema.booleanValue()) {
            TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.setTargetSchema(TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema);
        }
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        HoodieStreamer.Config dsConfig = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = TestHoodieDeltaStreamerSchemaEvolutionQuick.getMetaClient(dsConfig);
        HoodieInstant lastInstant = (HoodieInstant)metaClient.getActiveTimeline().lastInstant().get();
        datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        df = sparkSession.read().json(datapath);
        Dataset droppedColumnDf = df.drop("rider");
        try {
            this.addData((Dataset<Row>)droppedColumnDf, true);
            this.deltaStreamer.sync();
            Assertions.assertTrue((allowNullForDeletedCols != false || targetSchemaSameAsTableSchema != false ? 1 : 0) != 0);
            metaClient.reloadActiveTimeline();
            Option latestTableSchemaOpt = UtilHelpers.getLatestTableSchema((JavaSparkContext)jsc, (HoodieStorage)storage, (String)dsConfig.targetBasePath, (HoodieTableMetaClient)metaClient);
            Assertions.assertTrue((boolean)((Schema)latestTableSchemaOpt.get()).getField("rider").schema().getTypes().stream().anyMatch(t -> t.getType().equals((Object)Schema.Type.STRING)));
            Assertions.assertTrue((((HoodieInstant)metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(lastInstant) > 0 ? 1 : 0) != 0);
        }
        catch (MissingSchemaFieldException e) {
            Assertions.assertFalse((allowNullForDeletedCols != false || targetSchemaSameAsTableSchema != false ? 1 : 0) != 0);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"testParamsWithSchemaTransformer"})
    public void testNonNullableColumnDrop(String tableType, Boolean rowWriterEnable, Boolean useKafkaSource, Boolean allowNullForDeletedCols, Boolean useTransformer, Boolean targetSchemaSameAsTableSchema) throws Exception {
        this.tableType = tableType;
        this.rowWriterEnable = rowWriterEnable;
        this.useKafkaSource = useKafkaSource;
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = useTransformer;
        if (useKafkaSource.booleanValue() || targetSchemaSameAsTableSchema.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean isCow = tableType.equals("COPY_ON_WRITE");
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
        this.tableName = "test_parquet_table" + testNum;
        this.tableBasePath = basePath + this.tableName;
        String datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        Dataset df = sparkSession.read().json(datapath);
        df = TestHoodieSparkUtils.setColumnNotNullable((Dataset)df, (String)"rider");
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        this.addData((Dataset<Row>)df, true);
        this.deltaStreamer.sync();
        int numRecords = 6;
        int numFiles = 3;
        this.assertRecordCount(numRecords);
        this.assertFileNumber(numFiles, isCow);
        if (tableType.equals("MERGE_ON_READ")) {
            datapath = String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
            df = sparkSession.read().json(datapath);
            df = TestHoodieSparkUtils.setColumnNotNullable((Dataset)df, (String)"rider");
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            this.assertRecordCount(numRecords);
            this.assertFileNumber(numFiles, false);
        }
        if (targetSchemaSameAsTableSchema.booleanValue()) {
            TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.setTargetSchema(TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema);
        }
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        HoodieStreamer.Config dsConfig = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = TestHoodieDeltaStreamerSchemaEvolutionQuick.getMetaClient(dsConfig);
        HoodieInstant lastInstant = (HoodieInstant)metaClient.getActiveTimeline().lastInstant().get();
        datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        df = sparkSession.read().json(datapath);
        Dataset droppedColumnDf = df.drop("rider");
        try {
            this.addData((Dataset<Row>)droppedColumnDf, true);
            this.deltaStreamer.sync();
            Assertions.assertTrue((allowNullForDeletedCols != false || targetSchemaSameAsTableSchema != false ? 1 : 0) != 0);
            metaClient.reloadActiveTimeline();
            Option latestTableSchemaOpt = UtilHelpers.getLatestTableSchema((JavaSparkContext)jsc, (HoodieStorage)storage, (String)dsConfig.targetBasePath, (HoodieTableMetaClient)metaClient);
            Assertions.assertTrue((boolean)((Schema)latestTableSchemaOpt.get()).getField("rider").schema().getTypes().stream().anyMatch(t -> t.getType().equals((Object)Schema.Type.STRING)));
            Assertions.assertTrue((((HoodieInstant)metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(lastInstant) > 0 ? 1 : 0) != 0);
        }
        catch (Exception e) {
            Assertions.assertTrue((boolean)this.containsErrorMessage(e, "has no default value and is non-nullable", "Schema validation failed due to missing field."));
        }
    }

    @ParameterizedTest
    @MethodSource(value={"testParamsWithSchemaTransformer"})
    public void testTypePromotion(String tableType, Boolean rowWriterEnable, Boolean useKafkaSource, Boolean allowNullForDeletedCols, Boolean useTransformer, Boolean targetSchemaSameAsTableSchema) throws Exception {
        this.tableType = tableType;
        this.rowWriterEnable = rowWriterEnable;
        this.useKafkaSource = useKafkaSource;
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = useTransformer;
        if (useKafkaSource.booleanValue() || targetSchemaSameAsTableSchema.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean isCow = tableType.equals("COPY_ON_WRITE");
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
        this.tableName = "test_parquet_table" + testNum;
        this.tableBasePath = basePath + this.tableName;
        String datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        Dataset df = sparkSession.read().json(datapath);
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        this.addData((Dataset<Row>)df, true);
        this.deltaStreamer.sync();
        int numRecords = 6;
        int numFiles = 3;
        this.assertRecordCount(numRecords);
        this.assertFileNumber(numFiles, isCow);
        if (tableType.equals("MERGE_ON_READ")) {
            datapath = String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
            df = sparkSession.read().json(datapath);
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            this.assertRecordCount(numRecords);
            this.assertFileNumber(numFiles, false);
        }
        if (targetSchemaSameAsTableSchema.booleanValue()) {
            TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.setTargetSchema(TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema);
        }
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        HoodieStreamer.Config dsConfig = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = TestHoodieDeltaStreamerSchemaEvolutionQuick.getMetaClient(dsConfig);
        HoodieInstant lastInstant = (HoodieInstant)metaClient.getActiveTimeline().lastInstant().get();
        datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        df = sparkSession.read().json(datapath);
        Column col = df.col("distance_in_meters");
        Dataset typePromotionDf = df.withColumn("distance_in_meters", col.cast(DataTypes.DoubleType));
        try {
            this.addData((Dataset<Row>)typePromotionDf, true);
            this.deltaStreamer.sync();
            Assertions.assertFalse((boolean)targetSchemaSameAsTableSchema);
            metaClient.reloadActiveTimeline();
            Option latestTableSchemaOpt = UtilHelpers.getLatestTableSchema((JavaSparkContext)jsc, (HoodieStorage)storage, (String)dsConfig.targetBasePath, (HoodieTableMetaClient)metaClient);
            Assertions.assertTrue((boolean)((Schema)latestTableSchemaOpt.get()).getField("distance_in_meters").schema().getTypes().stream().anyMatch(t -> t.getType().equals((Object)Schema.Type.DOUBLE)), (String)((Schema)latestTableSchemaOpt.get()).getField("distance_in_meters").schema().toString());
            Assertions.assertTrue((((HoodieInstant)metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(lastInstant) > 0 ? 1 : 0) != 0);
        }
        catch (Exception e) {
            Assertions.assertTrue((boolean)targetSchemaSameAsTableSchema);
            if (!useKafkaSource.booleanValue()) {
                Assertions.assertTrue((boolean)this.containsErrorMessage(e, "Incoming batch schema is not compatible with the table's one", "org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong", "cannot support rewrite value for schema type: \"long\" since the old schema type is: \"double\""), (String)e.getMessage());
            }
            Assertions.assertTrue((boolean)this.containsErrorMessage(e, "Incoming batch schema is not compatible with the table's one", "cannot support rewrite value for schema type: \"long\" since the old schema type is: \"double\""), (String)e.getMessage());
        }
    }

    @ParameterizedTest
    @MethodSource(value={"testParamsWithSchemaTransformer"})
    public void testTypeDemotion(String tableType, Boolean rowWriterEnable, Boolean useKafkaSource, Boolean allowNullForDeletedCols, Boolean useTransformer, Boolean targetSchemaSameAsTableSchema) throws Exception {
        this.tableType = tableType;
        this.rowWriterEnable = rowWriterEnable;
        this.useKafkaSource = useKafkaSource;
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = useTransformer;
        if (useKafkaSource.booleanValue() || targetSchemaSameAsTableSchema.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean isCow = tableType.equals("COPY_ON_WRITE");
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
        this.tableName = "test_parquet_table" + testNum;
        this.tableBasePath = basePath + this.tableName;
        String datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        Dataset df = sparkSession.read().json(datapath);
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        this.addData((Dataset<Row>)df, true);
        this.deltaStreamer.sync();
        int numRecords = 6;
        int numFiles = 3;
        this.assertRecordCount(numRecords);
        this.assertFileNumber(numFiles, isCow);
        if (tableType.equals("MERGE_ON_READ")) {
            datapath = String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
            df = sparkSession.read().json(datapath);
            this.addData((Dataset<Row>)df, false);
            this.deltaStreamer.sync();
            this.assertRecordCount(numRecords);
            this.assertFileNumber(numFiles, false);
        }
        if (targetSchemaSameAsTableSchema.booleanValue()) {
            TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.setTargetSchema(TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema);
        }
        this.resetTopicAndDeltaStreamer(allowNullForDeletedCols);
        HoodieStreamer.Config dsConfig = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = TestHoodieDeltaStreamerSchemaEvolutionQuick.getMetaClient(dsConfig);
        HoodieInstant lastInstant = (HoodieInstant)metaClient.getActiveTimeline().lastInstant().get();
        datapath = String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
        df = sparkSession.read().json(datapath);
        Column col = df.col("current_ts");
        Dataset typeDemotionDf = df.withColumn("current_ts", col.cast(DataTypes.IntegerType));
        this.addData((Dataset<Row>)typeDemotionDf, true);
        this.deltaStreamer.sync();
        metaClient.reloadActiveTimeline();
        Option latestTableSchemaOpt = UtilHelpers.getLatestTableSchema((JavaSparkContext)jsc, (HoodieStorage)storage, (String)dsConfig.targetBasePath, (HoodieTableMetaClient)metaClient);
        Assertions.assertTrue((boolean)((Schema)latestTableSchemaOpt.get()).getField("current_ts").schema().getTypes().stream().anyMatch(t -> t.getType().equals((Object)Schema.Type.LONG)));
        Assertions.assertTrue((((HoodieInstant)metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(lastInstant) > 0 ? 1 : 0) != 0);
    }

    private static HoodieTableMetaClient getMetaClient(HoodieStreamer.Config dsConfig) {
        return HoodieTableMetaClient.builder().setConf(storage.getConf().newInstance()).setBasePath(dsConfig.targetBasePath).setPayloadClassName(dsConfig.payloadClassName).build();
    }

    private void resetTopicAndDeltaStreamer(Boolean allowNullForDeletedCols) throws IOException {
        String[] stringArray;
        topicName = "topic" + ++testNum;
        if (this.deltaStreamer != null) {
            this.deltaStreamer.shutdownGracefully();
        }
        if (this.useTransformer) {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName();
        } else {
            stringArray = new String[]{};
        }
        String[] transformerClassNames = stringArray;
        TypedProperties extraProps = new TypedProperties();
        extraProps.setProperty("hoodie.streamer.checkpoint.force.skip", "true");
        HoodieDeltaStreamer.Config deltaStreamerConfig = this.getDeltaStreamerConfig(transformerClassNames, allowNullForDeletedCols, extraProps);
        deltaStreamerConfig.checkpoint = "0";
        this.deltaStreamer = new HoodieDeltaStreamer(deltaStreamerConfig, jsc);
    }

    private boolean containsErrorMessage(Throwable e, String ... messages) {
        while (e != null) {
            for (String msg : messages) {
                if (!e.getMessage().contains(msg)) continue;
                return true;
            }
            e = e.getCause();
        }
        return false;
    }

    protected void assertDataType(Dataset<Row> df, String colName, DataType expectedType) {
        Assertions.assertEquals((Object)expectedType, (Object)df.select(colName, new String[0]).schema().fields()[0].dataType());
    }

    protected void assertCondition(Dataset<Row> df, String condition, int count) {
        Assertions.assertEquals((long)count, (long)df.filter(condition).count());
    }
}

