/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="functional")
public class TestHiveTableSchemaEvolution {
    private SparkSession spark = null;
    @TempDir
    java.nio.file.Path basePath;

    @BeforeEach
    public void setUp() {
        this.initSparkContexts("HiveSchemaEvolution");
    }

    @AfterEach
    public void clean() {
        if (this.spark != null) {
            this.spark.close();
        }
    }

    private void initSparkContexts(String appName) {
        SparkConf sparkConf = HoodieClientTestUtils.getSparkConfForTest((String)appName);
        this.spark = SparkSession.builder().config("hoodie.support.write.lock", "false").config("spark.sql.session.timeZone", "CTT").config("spark.sql.hive.convertMetastoreParquet", "false").config(sparkConf).getOrCreate();
        this.spark.sparkContext().setLogLevel("ERROR");
    }

    @Test
    public void testHiveReadTimestampColumnAsTimestampWritable() throws Exception {
        String tableName = "hudi_test" + new Date().getTime();
        String path = new Path(this.basePath.toAbsolutePath().toString()).toUri().toString();
        this.spark.sql("set hoodie.schema.on.read.enable=true");
        this.spark.sql(String.format("create table %s (col0 int, col1 float, col2 string, col3 timestamp) using hudi tblproperties (type='mor', primaryKey='col0', preCombineField='col1', hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload') location '%s'", tableName, path));
        this.spark.sql(String.format("insert into %s values(1, 1.1, 'text', timestamp('2021-12-25 12:01:01'))", tableName));
        this.spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));
        this.spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName));
        JobConf jobConf = new JobConf();
        jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
        jobConf.set("hive.io.file.read.all.columns", "false");
        jobConf.set("hive.io.file.readcolumn.names", "col1,col2_new,col3");
        jobConf.set("hive.io.file.readcolumn.ids", "6,7,8");
        jobConf.set("columns", "_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new,col3");
        jobConf.set("columns.types", "string,string,string,string,string,int,float,string,timestamp,string");
        FileInputFormat.setInputPaths((JobConf)jobConf, (String)path);
        HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
        inputFormat.setConf((Configuration)jobConf);
        InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
        int expectedSplits = 1;
        Assertions.assertEquals((int)expectedSplits, (int)splits.length);
        RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);
        List<List<Writable>> records = this.getWritableList((RecordReader<NullWritable, ArrayWritable>)recordReader);
        Assertions.assertEquals((int)1, (int)records.size());
        List<Writable> record1 = records.get(0);
        Assertions.assertEquals((int)6, (int)record1.size());
        Writable c3 = record1.get(5);
        Assertions.assertTrue((boolean)(c3 instanceof TimestampWritable));
        recordReader.close();
    }

    @ParameterizedTest
    @ValueSource(strings={"cow", "mor"})
    public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception {
        String tableName = "hudi_test" + new Date().getTime();
        String path = new Path(this.basePath.toAbsolutePath().toString()).toUri().toString();
        this.spark.sql("set hoodie.schema.on.read.enable=true");
        this.spark.sql(String.format("create table %s (col0 int, col1 float, col2 string) using hudi tblproperties (type='%s', primaryKey='col0', preCombineField='col1', hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload') location '%s'", tableName, tableType, path));
        this.spark.sql(String.format("insert into %s values(1, 1.1, 'text')", tableName));
        this.spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));
        this.spark.sql(String.format("alter table %s alter column col1 type double", tableName));
        this.spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName));
        JobConf jobConf = new JobConf();
        jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
        jobConf.set("hive.io.file.read.all.columns", "false");
        jobConf.set("hive.io.file.readcolumn.names", "col1,col2_new");
        jobConf.set("hive.io.file.readcolumn.ids", "6,7");
        jobConf.set("columns", "_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new");
        jobConf.set("columns.types", "string,string,string,string,string,int,double,string");
        FileInputFormat.setInputPaths((JobConf)jobConf, (String)path);
        HoodieParquetInputFormat inputFormat = "cow".equals(tableType) ? new HoodieParquetInputFormat() : new HoodieParquetRealtimeInputFormat();
        inputFormat.setConf((Configuration)jobConf);
        InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
        Assertions.assertEquals((int)1, (int)splits.length);
        RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);
        List<List<Writable>> records = this.getWritableList((RecordReader<NullWritable, ArrayWritable>)recordReader);
        Assertions.assertEquals((int)1, (int)records.size());
        List<Writable> record1 = records.get(0);
        if ("cow".equals(tableType)) {
            Assertions.assertEquals((int)2, (int)record1.size());
            Writable c1 = record1.get(0);
            Assertions.assertTrue((boolean)(c1 instanceof DoubleWritable));
            Assertions.assertEquals((Object)"1.1", (Object)c1.toString().substring(0, 3));
            Writable c2 = record1.get(1);
            Assertions.assertTrue((boolean)(c2 instanceof Text));
            Assertions.assertEquals((Object)"text2", (Object)c2.toString());
        } else {
            Assertions.assertEquals((int)5, (int)record1.size());
            Writable c1 = record1.get(3);
            Assertions.assertTrue((boolean)(c1 instanceof DoubleWritable));
            Assertions.assertEquals((Object)"1.1", (Object)c1.toString().substring(0, 3));
            Writable c2 = record1.get(4);
            Assertions.assertTrue((boolean)(c2 instanceof Text));
            Assertions.assertEquals((Object)"text2", (Object)c2.toString());
        }
        recordReader.close();
    }

    @ParameterizedTest
    @ValueSource(strings={"mor", "cow"})
    public void testHiveReadSchemaEvolutionWithAddingColumns(String tableType) throws Exception {
        String tableName = "hudi_test" + new Date().getTime();
        String path = new Path(this.basePath.toAbsolutePath().toString()).toUri().toString();
        this.spark.sql("set hoodie.schema.on.read.enable=true");
        this.spark.sql(String.format("create table %s (col0 int, col1 float, col2 string, col3 timestamp) using hudi tblproperties (type='%s', primaryKey='col0', preCombineField='col1', hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload') location '%s'", tableName, tableType, path));
        this.spark.sql(String.format("insert into %s values(1, 1.1, 'text', timestamp('2021-12-25 12:01:01'))", tableName));
        this.spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));
        this.spark.sql(String.format("alter table %s add columns (col4 string)", tableName));
        JobConf jobConf = new JobConf();
        jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
        jobConf.set("hive.io.file.read.all.columns", "false");
        jobConf.set("hive.io.file.readcolumn.names", "col1,col2,col3,col4");
        jobConf.set("hive.io.file.readcolumn.ids", "6,7,8,9");
        jobConf.set("columns", "_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2,col3,col4");
        jobConf.set("columns.types", "string,string,string,string,string,int,float,string,timestamp,string,string");
        FileInputFormat.setInputPaths((JobConf)jobConf, (String)path);
        HoodieParquetInputFormat inputFormat = tableType.equals("cow") ? new HoodieParquetInputFormat() : new HoodieParquetRealtimeInputFormat();
        inputFormat.setConf((Configuration)jobConf);
        InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
        int expectedSplits = 1;
        Assertions.assertEquals((int)expectedSplits, (int)splits.length);
        RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);
        List<List<Writable>> records = this.getWritableList((RecordReader<NullWritable, ArrayWritable>)recordReader, false);
        Assertions.assertEquals((int)1, (int)records.size());
        List<Writable> record1 = records.get(0);
        Assertions.assertEquals((int)10, (int)record1.size());
        Assertions.assertEquals((Object)new FloatWritable(1.1f), (Object)record1.get(6));
        Assertions.assertEquals((Object)new Text("text2"), (Object)record1.get(7));
        Assertions.assertInstanceOf(TimestampWritable.class, (Object)record1.get(8));
        Assertions.assertNull((Object)record1.get(9));
        recordReader.close();
    }

    private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {
        return this.getWritableList(recordReader, true);
    }

    private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader, boolean filterNull) throws IOException {
        ArrayList<List<Writable>> records = new ArrayList<List<Writable>>();
        NullWritable key = (NullWritable)recordReader.createKey();
        ArrayWritable writable = (ArrayWritable)recordReader.createValue();
        while (writable != null && recordReader.next((Object)key, (Object)writable)) {
            records.add(Arrays.stream(writable.get()).filter(f -> !filterNull || Objects.nonNull(f)).collect(Collectors.toList()));
        }
        return records;
    }
}

