/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.SchemaCompatibility;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;

public class TableSchemaResolver {
    private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
    private HoodieTableMetaClient metaClient;

    public TableSchemaResolver(HoodieTableMetaClient metaClient) {
        this.metaClient = metaClient;
    }

    private MessageType getTableParquetSchemaFromDataFile() throws Exception {
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        try {
            switch (this.metaClient.getTableType()) {
                case COPY_ON_WRITE: {
                    HoodieInstant lastCommit = activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(this.metaClient.getBasePath()));
                    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
                    String filePath = commitMetadata.getFileIdAndFullPaths(this.metaClient.getBasePath()).values().stream().findAny().orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit " + lastCommit + ", could not get schema for table " + this.metaClient.getBasePath() + ", Metadata :" + commitMetadata));
                    return this.readSchemaFromBaseFile(new Path(filePath));
                }
                case MERGE_ON_READ: {
                    Option<HoodieInstant> lastCompactionCommit = this.metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
                    LOG.info((Object)("Found the last compaction commit as " + lastCompactionCommit));
                    Option<HoodieInstant> lastDeltaCommit = lastCompactionCommit.isPresent() ? this.metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant() : this.metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
                    LOG.info((Object)("Found the last delta commit " + lastDeltaCommit));
                    if (lastDeltaCommit.isPresent()) {
                        HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
                        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(), HoodieCommitMetadata.class);
                        Pair filePathWithFormat = commitMetadata.getFileIdAndFullPaths(this.metaClient.getBasePath()).values().stream().filter(s -> s.contains(".log")).findAny().map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> commitMetadata.getFileIdAndFullPaths(this.metaClient.getBasePath()).values().stream().filter(s -> s.contains(this.metaClient.getTableConfig().getBaseFileFormat().getFileExtension())).findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit " + lastDeltaInstant + ", could not get schema for table " + this.metaClient.getBasePath() + ", CommitMetadata :" + commitMetadata)));
                        switch ((HoodieFileFormat)((Object)filePathWithFormat.getRight())) {
                            case HOODIE_LOG: {
                                return this.readSchemaFromLogFile(lastCompactionCommit, new Path((String)filePathWithFormat.getLeft()));
                            }
                            case PARQUET: {
                                return this.readSchemaFromBaseFile(new Path((String)filePathWithFormat.getLeft()));
                            }
                        }
                        throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight() + " for file " + (String)filePathWithFormat.getLeft());
                    }
                    return this.readSchemaFromLastCompaction(lastCompactionCommit);
                }
            }
            LOG.error((Object)("Unknown table type " + (Object)((Object)this.metaClient.getTableType())));
            throw new InvalidTableException(this.metaClient.getBasePath());
        }
        catch (IOException e) {
            throw new HoodieException("Failed to read data schema", e);
        }
    }

    private Schema getTableAvroSchemaFromDataFile() throws Exception {
        return this.convertParquetSchemaToAvro(this.getTableParquetSchemaFromDataFile());
    }

    public Schema getTableAvroSchema() throws Exception {
        return this.getTableAvroSchema(true);
    }

    public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception {
        Option<Schema> schemaFromCommitMetadata = this.getTableSchemaFromCommitMetadata(includeMetadataFields);
        return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : this.getTableAvroSchemaFromDataFile();
    }

    public MessageType getTableParquetSchema() throws Exception {
        Option<Schema> schemaFromCommitMetadata = this.getTableSchemaFromCommitMetadata(true);
        return schemaFromCommitMetadata.isPresent() ? this.convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) : this.getTableParquetSchemaFromDataFile();
    }

    public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
        HoodieTimeline timeline = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        Option<Schema> schemaFromCommitMetadata = this.getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), false);
        return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : HoodieAvroUtils.removeMetadataFields(this.getTableAvroSchemaFromDataFile());
    }

    public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) throws Exception {
        Option<Schema> schemaFromCommitMetadata = this.getTableSchemaFromCommitMetadata(instant, false);
        return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : HoodieAvroUtils.removeMetadataFields(this.getTableAvroSchemaFromDataFile());
    }

    private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
        HoodieTimeline timeline = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        return this.getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), includeMetadataFields);
    }

    private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) {
        try {
            HoodieTimeline timeline = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            byte[] data = timeline.getInstantDetails(instant).get();
            HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
            String existingSchemaStr = metadata.getMetadata("schema");
            if (StringUtils.isNullOrEmpty(existingSchemaStr)) {
                return Option.empty();
            }
            Schema schema = new Schema.Parser().parse(existingSchemaStr);
            if (includeMetadataFields) {
                schema = HoodieAvroUtils.addMetadataFields(schema);
            }
            return Option.of(schema);
        }
        catch (Exception e) {
            throw new HoodieException("Failed to read schema from commit metadata", e);
        }
    }

    public Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
        AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(this.metaClient.getHadoopConf());
        return avroSchemaConverter.convert(parquetSchema);
    }

    public MessageType convertAvroSchemaToParquet(Schema schema) {
        AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(this.metaClient.getHadoopConf());
        return avroSchemaConverter.convert(schema);
    }

    public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) {
        if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) {
            if (!SchemaCompatibility.schemaNameEquals(newSchema, oldSchema)) {
                return false;
            }
            for (Schema.Field oldSchemaField : oldSchema.getFields()) {
                Schema.Field newSchemaField = SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField);
                if (newSchemaField == null) {
                    return false;
                }
                if (TableSchemaResolver.isSchemaCompatible(oldSchemaField.schema(), newSchemaField.schema())) continue;
                return false;
            }
            for (Schema.Field newSchemaField : newSchema.getFields()) {
                Schema.Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
                if (oldSchemaField != null || newSchemaField.defaultValue() != null) continue;
                return false;
            }
            return true;
        }
        SchemaCompatibility.SchemaPairCompatibility compatResult = SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema);
        return compatResult.getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
    }

    public static boolean isSchemaCompatible(String oldSchema, String newSchema) {
        return TableSchemaResolver.isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
    }

    public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException {
        LOG.info((Object)("Reading schema from " + parquetFilePath));
        FileSystem fs = this.metaClient.getRawFs();
        if (!fs.exists(parquetFilePath)) {
            throw new IllegalArgumentException("Failed to read schema from data file " + parquetFilePath + ". File does not exist.");
        }
        ParquetMetadata fileFooter = ParquetFileReader.readFooter((Configuration)fs.getConf(), (Path)parquetFilePath, (ParquetMetadataConverter.MetadataFilter)ParquetMetadataConverter.NO_FILTER);
        return fileFooter.getFileMetaData().getSchema();
    }

    public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception {
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new Exception("Could not read schema from last compaction, no compaction commits found on path " + this.metaClient));
        HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class);
        String filePath = compactionMetadata.getFileIdAndFullPaths(this.metaClient.getBasePath()).values().stream().findAny().orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for table " + this.metaClient.getBasePath()));
        return this.readSchemaFromBaseFile(new Path(filePath));
    }

    public MessageType readSchemaFromLogFile(Path path) throws IOException {
        return TableSchemaResolver.readSchemaFromLogFile(this.metaClient.getRawFs(), path);
    }

    public MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
        MessageType messageType = this.readSchemaFromLogFile(path);
        if (messageType == null) {
            LOG.info((Object)("Falling back to read the schema from last compaction " + lastCompactionCommitOpt));
            return this.readSchemaFromLastCompaction(lastCompactionCommitOpt);
        }
        return messageType;
    }

    public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
        HoodieDataBlock lastBlock = null;
        while (reader.hasNext()) {
            HoodieLogBlock block = (HoodieLogBlock)reader.next();
            if (!(block instanceof HoodieDataBlock)) continue;
            lastBlock = (HoodieDataBlock)block;
        }
        reader.close();
        if (lastBlock != null) {
            return new AvroSchemaConverter().convert(lastBlock.getSchema());
        }
        return null;
    }
}

