/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieAvroUtils;
import org.apache.hudi.common.util.LogReaderUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericArray;
import org.apache.hudi.org.apache.avro.generic.GenericFixed;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;

public abstract class AbstractRealtimeRecordReader {
    public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
    public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
    public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
    public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
    public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
    public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 0x100000;
    public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
    public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
    private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
    protected final HoodieRealtimeFileSplit split;
    protected final JobConf jobConf;
    private final MessageType baseFileSchema;
    protected final boolean usesCustomPayload;
    private Schema readerSchema;
    private Schema writerSchema;
    private Schema hiveSchema;

    public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
        this.split = split;
        this.jobConf = job;
        LOG.info((Object)("cfg ==> " + job.get("hive.io.file.readcolumn.names")));
        LOG.info((Object)("columnIds ==> " + job.get("hive.io.file.readcolumn.ids")));
        LOG.info((Object)("partitioningColumns ==> " + job.get("partition_columns", "")));
        try {
            this.usesCustomPayload = this.usesCustomPayload();
            LOG.info((Object)("usesCustomPayload ==> " + this.usesCustomPayload));
            this.baseFileSchema = AbstractRealtimeRecordReader.readSchema((Configuration)this.jobConf, split.getPath());
            this.init();
        }
        catch (IOException e) {
            throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
        }
    }

    private boolean usesCustomPayload() {
        HoodieTableMetaClient metaClient = new HoodieTableMetaClient((Configuration)this.jobConf, this.split.getBasePath());
        return !metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName()) && !metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload");
    }

    private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
        try {
            return ParquetFileReader.readFooter((Configuration)conf, (Path)parquetFilePath).getFileMetaData().getSchema();
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
        }
    }

    protected static String arrayWritableToString(ArrayWritable writable) {
        if (writable == null) {
            return "null";
        }
        StringBuilder builder = new StringBuilder();
        Writable[] values = writable.get();
        builder.append("\"values_" + Math.random() + "_" + values.length + "\": {");
        int i = 0;
        for (Writable w : values) {
            if (w instanceof ArrayWritable) {
                builder.append(AbstractRealtimeRecordReader.arrayWritableToString((ArrayWritable)w)).append(",");
            } else {
                builder.append("\"value" + i + "\":\"" + w + "\"").append(",");
                if (w == null) {
                    builder.append("\"type" + i + "\":\"unknown\"").append(",");
                } else {
                    builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(",");
                }
            }
            ++i;
        }
        builder.deleteCharAt(builder.length() - 1);
        builder.append("}");
        return builder.toString();
    }

    private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
        String[] fieldOrdersWithDups;
        LinkedHashSet<String> fieldOrdersSet = new LinkedHashSet<String>();
        for (String fieldOrder : fieldOrdersWithDups = fieldOrderCsv.split(",")) {
            fieldOrdersSet.add(fieldOrder);
        }
        String[] fieldOrders = fieldOrdersSet.toArray(new String[fieldOrdersSet.size()]);
        List fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
        LinkedHashSet<String> fieldNamesSet = new LinkedHashSet<String>();
        for (String fieldName : fieldNames) {
            fieldNamesSet.add(fieldName);
        }
        if (fieldNamesSet.size() != fieldOrders.length) {
            throw new HoodieException(String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", fieldNames.size(), fieldOrders.length));
        }
        TreeMap<Integer, String> orderedFieldMap = new TreeMap<Integer, String>();
        String[] fieldNamesArray = fieldNamesSet.toArray(new String[fieldNamesSet.size()]);
        for (int ox = 0; ox < fieldOrders.length; ++ox) {
            orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
        }
        return new ArrayList<String>(orderedFieldMap.values());
    }

    public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap, List<String> fieldNames) {
        ArrayList<Schema.Field> projectedFields = new ArrayList<Schema.Field>();
        for (String fn : fieldNames) {
            Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
            if (field == null) {
                throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! Derived Schema Fields: " + new ArrayList<String>(schemaFieldsMap.keySet()));
            }
            projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
        }
        Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(), writeSchema.getNamespace(), writeSchema.isError());
        projectedSchema.setFields(projectedFields);
        return projectedSchema;
    }

    public static Map<String, Schema.Field> getNameToFieldMap(Schema schema) {
        return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
    }

    public static Writable avroToArrayWritable(Object value, Schema schema) {
        if (value == null) {
            return null;
        }
        switch (schema.getType()) {
            case STRING: {
                return new Text(value.toString());
            }
            case BYTES: {
                return new BytesWritable((byte[])value);
            }
            case INT: {
                return new IntWritable(((Integer)value).intValue());
            }
            case LONG: {
                return new LongWritable(((Long)value).longValue());
            }
            case FLOAT: {
                return new FloatWritable(((Float)value).floatValue());
            }
            case DOUBLE: {
                return new DoubleWritable(((Double)value).doubleValue());
            }
            case BOOLEAN: {
                return new BooleanWritable(((Boolean)value).booleanValue());
            }
            case NULL: {
                return null;
            }
            case RECORD: {
                GenericRecord record = (GenericRecord)value;
                Writable[] recordValues = new Writable[schema.getFields().size()];
                int recordValueIndex = 0;
                for (Schema.Field field : schema.getFields()) {
                    recordValues[recordValueIndex++] = AbstractRealtimeRecordReader.avroToArrayWritable(record.get(field.name()), field.schema());
                }
                return new ArrayWritable(Writable.class, recordValues);
            }
            case ENUM: {
                return new Text(value.toString());
            }
            case ARRAY: {
                GenericArray arrayValue = (GenericArray)value;
                Writable[] arrayValues = new Writable[arrayValue.size()];
                int arrayValueIndex = 0;
                for (Object obj : arrayValue) {
                    arrayValues[arrayValueIndex++] = AbstractRealtimeRecordReader.avroToArrayWritable(obj, schema.getElementType());
                }
                return new ArrayWritable(Writable.class, arrayValues);
            }
            case MAP: {
                Map mapValue = (Map)value;
                Writable[] mapValues = new Writable[mapValue.size()];
                int mapValueIndex = 0;
                Iterator iterator = mapValue.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry entry;
                    Map.Entry mapEntry = entry = iterator.next();
                    Writable[] nestedMapValues = new Writable[]{new Text(mapEntry.getKey().toString()), AbstractRealtimeRecordReader.avroToArrayWritable(mapEntry.getValue(), schema.getValueType())};
                    mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues);
                }
                return new ArrayWritable(Writable.class, mapValues);
            }
            case UNION: {
                List<Schema> types = schema.getTypes();
                if (types.size() != 2) {
                    throw new IllegalArgumentException("Only support union with 2 fields");
                }
                Schema s1 = types.get(0);
                Schema s2 = types.get(1);
                if (s1.getType() == Schema.Type.NULL) {
                    return AbstractRealtimeRecordReader.avroToArrayWritable(value, s2);
                }
                if (s2.getType() == Schema.Type.NULL) {
                    return AbstractRealtimeRecordReader.avroToArrayWritable(value, s1);
                }
                throw new IllegalArgumentException("Only support union with null");
            }
            case FIXED: {
                return new BytesWritable(((GenericFixed)value).bytes());
            }
        }
        return null;
    }

    private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
        Set firstLevelFieldNames = schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toSet());
        List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase).filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
        return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
    }

    private void init() throws IOException {
        Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(this.split.getBasePath(), this.split.getDeltaLogPaths(), this.jobConf);
        if (schemaFromLogFile == null) {
            this.writerSchema = new AvroSchemaConverter().convert(this.baseFileSchema);
            LOG.debug((Object)("Writer Schema From Parquet => " + this.writerSchema.getFields()));
        } else {
            this.writerSchema = schemaFromLogFile;
            LOG.debug((Object)("Writer Schema From Log => " + this.writerSchema.getFields()));
        }
        String partitionFields = this.jobConf.get("partition_columns", "");
        ArrayList<String> partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) : new ArrayList<String>();
        this.writerSchema = AbstractRealtimeRecordReader.addPartitionFields(this.writerSchema, partitioningFields);
        List<String> projectionFields = AbstractRealtimeRecordReader.orderFields(this.jobConf.get("hive.io.file.readcolumn.names"), this.jobConf.get("hive.io.file.readcolumn.ids"), partitioningFields);
        Map<String, Schema.Field> schemaFieldsMap = AbstractRealtimeRecordReader.getNameToFieldMap(this.writerSchema);
        this.hiveSchema = this.constructHiveOrderedSchema(this.writerSchema, schemaFieldsMap);
        this.readerSchema = AbstractRealtimeRecordReader.generateProjectionSchema(this.writerSchema, schemaFieldsMap, projectionFields);
        LOG.info((Object)String.format("About to read compacted logs %s for base split %s, projecting cols %s", this.split.getDeltaLogPaths(), this.split.getPath(), projectionFields));
    }

    private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Schema.Field> schemaFieldsMap) {
        String hiveColumnString = this.jobConf.get("columns");
        String[] hiveColumns = hiveColumnString.split(",");
        ArrayList<Schema.Field> hiveSchemaFields = new ArrayList<Schema.Field>();
        for (String columnName : hiveColumns) {
            Schema.Field field = schemaFieldsMap.get(columnName.toLowerCase());
            if (field != null) {
                hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
                continue;
            }
            LOG.debug((Object)("Skipping Hive Column => " + columnName));
        }
        Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(), writerSchema.isError());
        hiveSchema.setFields(hiveSchemaFields);
        return hiveSchema;
    }

    public Schema getReaderSchema() {
        return this.readerSchema;
    }

    public Schema getWriterSchema() {
        return this.writerSchema;
    }

    public Schema getHiveSchema() {
        return this.hiveSchema;
    }

    public long getMaxCompactionMemoryInBytes() {
        return (long)Math.ceil(Double.valueOf(this.jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION)) * (double)this.jobConf.getMemoryForMapTask() * 1024.0 * 1024.0);
    }
}

