/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.zorder;

import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;

public class ZOrderingIndexHelper {
    private static final Logger LOG = LogManager.getLogger(ZOrderingIndexHelper.class);
    private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
    private static final String Z_INDEX_FILE_COLUMN_NAME = "file";
    private static final String Z_INDEX_MIN_VALUE_STAT_NAME = "minValue";
    private static final String Z_INDEX_MAX_VALUE_STAT_NAME = "maxValue";
    private static final String Z_INDEX_NUM_NULLS_STAT_NAME = "num_nulls";

    public static String getMinColumnNameFor(String colName) {
        return ZOrderingIndexHelper.composeZIndexColName(colName, Z_INDEX_MIN_VALUE_STAT_NAME);
    }

    public static String getMaxColumnNameFor(String colName) {
        return ZOrderingIndexHelper.composeZIndexColName(colName, Z_INDEX_MAX_VALUE_STAT_NAME);
    }

    public static String getNumNullsColumnNameFor(String colName) {
        return ZOrderingIndexHelper.composeZIndexColName(colName, Z_INDEX_NUM_NULLS_STAT_NAME);
    }

    public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, List<String> zCols, int fileNum) {
        Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
        int fieldNum = df.schema().fields().length;
        List checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
        if (zCols.size() != checkCols.size()) {
            return df;
        }
        if (zCols.size() == 1) {
            return df.repartitionByRange(fieldNum, new Column[]{functions.col((String)zCols.get(0))});
        }
        Map<Integer, StructField> fieldMap = zCols.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> (StructField)columnsMap.get(e)));
        JavaRDD sortedRdd = df.toJavaRDD().map((Function & Serializable)row -> {
            List zBytesList = fieldMap.entrySet().stream().map(entry -> {
                int index = (Integer)entry.getKey();
                StructField field = (StructField)entry.getValue();
                DataType dataType = field.dataType();
                if (dataType instanceof LongType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
                }
                if (dataType instanceof DoubleType) {
                    return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
                }
                if (dataType instanceof IntegerType) {
                    return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
                }
                if (dataType instanceof FloatType) {
                    return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? 3.4028234663852886E38 : (double)row.getFloat(index));
                }
                if (dataType instanceof StringType) {
                    return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
                }
                if (dataType instanceof DateType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
                }
                if (dataType instanceof TimestampType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
                }
                if (dataType instanceof ByteType) {
                    return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? (byte)127 : row.getByte(index));
                }
                if (dataType instanceof ShortType) {
                    return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : (int)row.getShort(index));
                }
                if (dataType instanceof DecimalType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
                }
                if (dataType instanceof BooleanType) {
                    boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
                    return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
                }
                if (dataType instanceof BinaryType) {
                    byte[] byArray;
                    if (row.isNullAt(index)) {
                        byte[] byArray2 = new byte[1];
                        byArray = byArray2;
                        byArray2[0] = 0;
                    } else {
                        byArray = (byte[])row.get(index);
                    }
                    return ZOrderingUtil.paddingTo8Byte(byArray);
                }
                return null;
            }).filter(f -> f != null).collect(Collectors.toList());
            byte[][] zBytes = new byte[zBytesList.size()][];
            for (int i = 0; i < zBytesList.size(); ++i) {
                zBytes[i] = (byte[])zBytesList.get(i);
            }
            ArrayList<byte[]> zVaules = new ArrayList<byte[]>();
            zVaules.addAll((Collection)JavaConverters.bufferAsJavaListConverter((Buffer)row.toSeq().toBuffer()).asJava());
            zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
            return Row$.MODULE$.apply((Seq)JavaConversions.asScalaBuffer(zVaules));
        }).sortBy((Function & Serializable)f -> new ZorderingBinarySort((byte[])f.get(fieldNum)), true, fileNum);
        ArrayList<StructField> newFields = new ArrayList<StructField>();
        newFields.addAll(Arrays.asList(df.schema().fields()));
        newFields.add(new StructField("zIndex", (DataType)BinaryType$.MODULE$, true, Metadata.empty()));
        return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex");
    }

    public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, String zCols, int fileNum) {
        if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
            return df;
        }
        return ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
    }

    public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) {
        return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, (Seq<String>)JavaConversions.asScalaBuffer(zCols), fileNum, HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString());
    }

    public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) {
        if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
            return df;
        }
        return ZOrderingIndexHelper.createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    public static Dataset<Row> buildZIndexTableFor(@Nonnull SparkSession sparkSession, @Nonnull List<String> baseFilesPaths, @Nonnull List<StructField> zorderedColumnSchemas) {
        List colMinMaxInfos;
        SparkContext sc = sparkSession.sparkContext();
        JavaSparkContext jsc = new JavaSparkContext(sc);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
        int numParallelism = baseFilesPaths.size() / 3 + 1;
        String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
        try {
            jsc.setJobDescription("Listing parquet column statistics");
            colMinMaxInfos = jsc.parallelize(baseFilesPaths, numParallelism).mapPartitions((FlatMapFunction & Serializable)paths -> {
                ParquetUtils utils = (ParquetUtils)BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
                Iterable iterable = () -> paths;
                return StreamSupport.stream(iterable.spliterator(), false).flatMap(path -> utils.readRangeFromParquetMetadata(serializableConfiguration.value(), new Path(path), zorderedColumnSchemas.stream().map(StructField::name).collect(Collectors.toList())).stream()).iterator();
            }).collect();
        }
        finally {
            jsc.setJobDescription(previousJobDescription);
        }
        Map<String, List<HoodieColumnRangeMetadata>> filePathToColumnMetadataMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getFilePath));
        JavaRDD allMetaDataRDD = jsc.parallelize(new ArrayList<List<HoodieColumnRangeMetadata>>(filePathToColumnMetadataMap.values()), 1).map((Function & Serializable)fileColumnsMetadata -> {
            int colSize = fileColumnsMetadata.size();
            if (colSize == 0) {
                return null;
            }
            String filePath = ((HoodieColumnRangeMetadata)fileColumnsMetadata.get(0)).getFilePath();
            ArrayList<String> indexRow = new ArrayList<String>();
            indexRow.add(filePath);
            zorderedColumnSchemas.forEach(colSchema -> {
                String colName = colSchema.name();
                HoodieColumnRangeMetadata colMetadata = fileColumnsMetadata.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(colName)).findFirst().orElse(null);
                DataType colType = colSchema.dataType();
                if (colMetadata == null || colType == null) {
                    throw new HoodieException(String.format("Cannot collect min/max statistics for column (%s)", colSchema));
                }
                Pair<Object, Object> minMaxValue = ZOrderingIndexHelper.fetchMinMaxValues(colType, colMetadata);
                indexRow.add((String)minMaxValue.getLeft());
                indexRow.add((String)minMaxValue.getRight());
                indexRow.add((String)colMetadata.getNumNulls());
            });
            return Row$.MODULE$.apply((Seq)JavaConversions.asScalaBuffer(indexRow));
        }).filter(Objects::nonNull);
        StructType indexSchema = ZOrderingIndexHelper.composeIndexSchema(zorderedColumnSchemas);
        return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
    }

    public static void updateZIndexFor(@Nonnull SparkSession sparkSession, @Nonnull StructType sourceTableSchema, @Nonnull List<String> sourceBaseFiles, @Nonnull List<String> zorderedCols, @Nonnull String zindexFolderPath, @Nonnull String commitTime, @Nonnull List<String> completedCommits) {
        FileSystem fs = FSUtils.getFs(zindexFolderPath, sparkSession.sparkContext().hadoopConfiguration());
        Dataset<Row> newZIndexDf = ZOrderingIndexHelper.buildZIndexTableFor(sparkSession, sourceBaseFiles, zorderedCols.stream().map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)]).collect(Collectors.toList()));
        try {
            Dataset<Row> finalZIndexDf;
            Path newIndexTablePath = new Path(zindexFolderPath, commitTime);
            if (!fs.exists(new Path(zindexFolderPath))) {
                newZIndexDf.repartition(1).write().format("parquet").mode("overwrite").save(newIndexTablePath.toString());
                return;
            }
            List allIndexTables = Arrays.stream(fs.listStatus(new Path(zindexFolderPath))).filter(FileStatus::isDirectory).map(f -> f.getPath().getName()).collect(Collectors.toList());
            List validIndexTables = allIndexTables.stream().filter(completedCommits::contains).sorted().collect(Collectors.toList());
            List<String> tablesToCleanup = allIndexTables.stream().filter(f -> !completedCommits.contains(f)).collect(Collectors.toList());
            if (validIndexTables.isEmpty()) {
                finalZIndexDf = newZIndexDf;
            } else {
                finalZIndexDf = ZOrderingIndexHelper.tryMergeMostRecentIndexTableInto(sparkSession, newZIndexDf, (Dataset<Row>)sparkSession.read().load(new Path(zindexFolderPath, (String)validIndexTables.get(validIndexTables.size() - 1)).toString()));
                tablesToCleanup.addAll(validIndexTables);
            }
            finalZIndexDf.repartition(1).write().format("parquet").save(newIndexTablePath.toString());
            tablesToCleanup.forEach(f -> {
                try {
                    fs.delete(new Path(zindexFolderPath, f), true);
                }
                catch (IOException ie) {
                    LOG.warn((Object)String.format("Failed to cleanup residual Z-index table: %s", f), (Throwable)ie);
                }
            });
        }
        catch (IOException e) {
            LOG.error((Object)"Failed to build new Z-index table", (Throwable)e);
            throw new HoodieException("Failed to build new Z-index table", e);
        }
    }

    @Nonnull
    private static Dataset<Row> tryMergeMostRecentIndexTableInto(@Nonnull SparkSession sparkSession, @Nonnull Dataset<Row> newIndexTableDf, @Nonnull Dataset<Row> existingIndexTableDf) {
        if (!DataTypeUtils.areCompatible((DataType)existingIndexTableDf.schema(), (DataType)newIndexTableDf.schema())) {
            return newIndexTableDf;
        }
        String randomSuffix = UUID.randomUUID().toString().replace("-", "");
        String existingIndexTempTableName = "existingIndexTable_" + randomSuffix;
        String newIndexTempTableName = "newIndexTable_" + randomSuffix;
        existingIndexTableDf.registerTempTable(existingIndexTempTableName);
        newIndexTableDf.registerTempTable(newIndexTempTableName);
        List<String> newTableColumns = Arrays.asList(newIndexTableDf.schema().fieldNames());
        return sparkSession.sql(ZOrderingIndexHelper.createIndexMergeSql(existingIndexTempTableName, newIndexTempTableName, newTableColumns));
    }

    @Nonnull
    public static StructType composeIndexSchema(@Nonnull List<StructField> zorderedColumnsSchemas) {
        ArrayList<StructField> schema = new ArrayList<StructField>();
        schema.add(new StructField(Z_INDEX_FILE_COLUMN_NAME, (DataType)StringType$.MODULE$, true, Metadata.empty()));
        zorderedColumnsSchemas.forEach(colSchema -> {
            schema.add(ZOrderingIndexHelper.composeColumnStatStructType(colSchema.name(), Z_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType()));
            schema.add(ZOrderingIndexHelper.composeColumnStatStructType(colSchema.name(), Z_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType()));
            schema.add(ZOrderingIndexHelper.composeColumnStatStructType(colSchema.name(), Z_INDEX_NUM_NULLS_STAT_NAME, (DataType)LongType$.MODULE$));
        });
        return StructType$.MODULE$.apply(schema);
    }

    private static StructField composeColumnStatStructType(String col, String statName, DataType dataType) {
        return new StructField(ZOrderingIndexHelper.composeZIndexColName(col, statName), dataType, true, Metadata.empty());
    }

    @Nullable
    private static String mapToSourceTableColumnName(StructField fieldStruct) {
        String name = fieldStruct.name();
        int maxStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MAX_VALUE_STAT_NAME));
        if (maxStatSuffixIdx != -1) {
            return name.substring(0, maxStatSuffixIdx);
        }
        int minStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MIN_VALUE_STAT_NAME));
        if (minStatSuffixIdx != -1) {
            return name.substring(0, minStatSuffixIdx);
        }
        int numNullsSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_NUM_NULLS_STAT_NAME));
        if (numNullsSuffixIdx != -1) {
            return name.substring(0, numNullsSuffixIdx);
        }
        return null;
    }

    private static String composeZIndexColName(String col, String statName) {
        return String.format("%s_%s", col, statName);
    }

    private static Pair<Object, Object> fetchMinMaxValues(@Nonnull DataType colType, @Nonnull HoodieColumnRangeMetadata<Comparable> colMetadata) {
        if (colType instanceof IntegerType) {
            return Pair.of(new Integer(colMetadata.getMinValue().toString()), new Integer(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof DoubleType) {
            return Pair.of(new Double(colMetadata.getMinValue().toString()), new Double(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof StringType) {
            return Pair.of(new String(((Binary)colMetadata.getMinValue()).getBytes()), new String(((Binary)colMetadata.getMaxValue()).getBytes()));
        }
        if (colType instanceof DecimalType) {
            return Pair.of(new BigDecimal(colMetadata.getMinValue().toString()), new BigDecimal(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof DateType) {
            return Pair.of(Date.valueOf(colMetadata.getMinValue().toString()), Date.valueOf(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof LongType) {
            return Pair.of(new Long(colMetadata.getMinValue().toString()), new Long(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof ShortType) {
            return Pair.of(new Short(colMetadata.getMinValue().toString()), new Short(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof FloatType) {
            return Pair.of(new Float(colMetadata.getMinValue().toString()), new Float(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof BinaryType) {
            return Pair.of(((Binary)colMetadata.getMinValue()).getBytes(), ((Binary)colMetadata.getMaxValue()).getBytes());
        }
        if (colType instanceof BooleanType) {
            return Pair.of(Boolean.valueOf(colMetadata.getMinValue().toString()), Boolean.valueOf(colMetadata.getMaxValue().toString()));
        }
        if (colType instanceof ByteType) {
            return Pair.of(Byte.valueOf(colMetadata.getMinValue().toString()), Byte.valueOf(colMetadata.getMaxValue().toString()));
        }
        throw new HoodieException(String.format("Not support type:  %s", colType));
    }

    @Nonnull
    public static String createIndexMergeSql(@Nonnull String originalIndexTable, @Nonnull String newIndexTable, @Nonnull List<String> columns) {
        StringBuilder selectBody = new StringBuilder();
        for (int i = 0; i < columns.size(); ++i) {
            String col = columns.get(i);
            String originalTableColumn = String.format("%s.%s", originalIndexTable, col);
            String newTableColumn = String.format("%s.%s", newIndexTable, col);
            selectBody.append(String.format("if (%s is null, %s, %s) AS %s", newTableColumn, originalTableColumn, newTableColumn, col));
            if (i >= columns.size() - 1) continue;
            selectBody.append(", ");
        }
        return String.format("SELECT %s FROM %s FULL JOIN %s ON %s = %s", selectBody, originalIndexTable, newIndexTable, String.format("%s.%s", originalIndexTable, columns.get(0)), String.format("%s.%s", newIndexTable, columns.get(0)));
    }
}

