/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.HoodieSchemaNotFoundException;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Lazy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class TableSchemaResolver {
    private static final Logger LOG = LoggerFactory.getLogger(TableSchemaResolver.class);
    protected final HoodieTableMetaClient metaClient;
    private final Lazy<Boolean> hasOperationField;
    private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieCommitMetadata>> commitMetadataCache;
    private volatile HoodieInstant latestCommitWithValidSchema = null;
    private volatile HoodieInstant latestCommitWithInsertOrUpdate = null;

    public TableSchemaResolver(HoodieTableMetaClient metaClient) {
        this.metaClient = metaClient;
        this.commitMetadataCache = Lazy.lazily(() -> new ConcurrentHashMap(2));
        this.hasOperationField = Lazy.lazily(this::hasOperationField);
    }

    public Schema getTableAvroSchemaFromDataFile() throws Exception {
        return (Schema)this.getTableAvroSchemaFromDataFileInternal().orElseThrow(this.schemaNotFoundError());
    }

    private Option<Schema> getTableAvroSchemaFromDataFileInternal() {
        return this.getTableParquetSchemaFromDataFile();
    }

    public Schema getTableAvroSchema() throws Exception {
        return this.getTableAvroSchema(this.metaClient.getTableConfig().populateMetaFields());
    }

    public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception {
        return (Schema)this.getTableAvroSchemaInternal(includeMetadataFields, (Option<HoodieInstant>)Option.empty()).orElseThrow(this.schemaNotFoundError());
    }

    public Schema getTableAvroSchema(String timestamp) throws Exception {
        Option<HoodieInstant> instant = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(timestamp).lastInstant();
        return (Schema)this.getTableAvroSchemaInternal(this.metaClient.getTableConfig().populateMetaFields(), instant).orElseThrow(this.schemaNotFoundError());
    }

    public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception {
        return (Schema)this.getTableAvroSchemaInternal(includeMetadataFields, (Option<HoodieInstant>)Option.of((Object)instant)).orElseThrow(this.schemaNotFoundError());
    }

    @Deprecated
    public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
        return (Schema)this.getTableAvroSchemaInternal(false, (Option<HoodieInstant>)Option.empty()).orElseThrow(this.schemaNotFoundError());
    }

    public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields) {
        return this.getTableAvroSchemaInternal(includeMetadataFields, (Option<HoodieInstant>)Option.empty());
    }

    private Option<Schema> getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {
        Option schema = (instantOpt.isPresent() ? this.getTableSchemaFromCommitMetadata((HoodieInstant)instantOpt.get(), includeMetadataFields) : this.getTableSchemaFromLatestCommitMetadata(includeMetadataFields)).or(() -> this.metaClient.getTableConfig().getTableCreateSchema().map(tableSchema -> includeMetadataFields ? HoodieAvroUtils.addMetadataFields(tableSchema, this.hasOperationField.get()) : tableSchema)).or(() -> {
            Option schemaFromDataFile = this.getTableAvroSchemaFromDataFileInternal();
            return includeMetadataFields ? schemaFromDataFile : schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
        });
        if (this.metaClient.getTableConfig().shouldDropPartitionColumns().booleanValue() && schema.isPresent()) {
            return this.metaClient.getTableConfig().getPartitionFields().map(partitionFields -> TableSchemaResolver.appendPartitionColumns((Schema)schema.get(), (Option<String[]>)Option.ofNullable((Object)partitionFields))).or(() -> schema);
        }
        return schema;
    }

    private Option<Schema> getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) {
        Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = this.getLatestCommitMetadataWithValidSchema();
        if (instantAndCommitMetadata.isPresent()) {
            HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)((Pair)instantAndCommitMetadata.get()).getRight();
            String schemaStr = commitMetadata.getMetadata("schema");
            Schema schema = new Schema.Parser().parse(schemaStr);
            schema = includeMetadataFields ? HoodieAvroUtils.addMetadataFields(schema, this.hasOperationField.get()) : HoodieAvroUtils.removeMetadataFields(schema);
            return Option.of((Object)schema);
        }
        return Option.empty();
    }

    private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) {
        try {
            HoodieCommitMetadata metadata = this.getCachedCommitMetadata(instant);
            String existingSchemaStr = metadata.getMetadata("schema");
            if (StringUtils.isNullOrEmpty((String)existingSchemaStr)) {
                return Option.empty();
            }
            Schema schema = new Schema.Parser().parse(existingSchemaStr);
            schema = includeMetadataFields ? HoodieAvroUtils.addMetadataFields(schema, this.hasOperationField.get()) : HoodieAvroUtils.removeMetadataFields(schema);
            return Option.of((Object)schema);
        }
        catch (Exception e) {
            throw new HoodieException("Failed to read schema from commit metadata", (Throwable)e);
        }
    }

    private Option<Schema> getTableParquetSchemaFromDataFile() {
        Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = this.getLatestCommitMetadataWithInsertOrUpdate();
        switch (this.metaClient.getTableType()) {
            case COPY_ON_WRITE: 
            case MERGE_ON_READ: {
                if (instantAndCommitMetadata.isPresent()) {
                    HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)((Pair)instantAndCommitMetadata.get()).getRight();
                    Stream<StoragePath> filePaths = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).filter(writeStat -> writeStat.getNumInserts() > 0L || writeStat.getNumUpdateWrites() > 0L).map(writeStat -> new StoragePath(this.metaClient.getBasePath(), writeStat.getPath()));
                    return Option.of((Object)this.fetchSchemaFromFiles(filePaths));
                }
                LOG.debug("Could not find any data file written for commit, so could not get schema for table {}", (Object)this.metaClient.getBasePath());
                return Option.empty();
            }
        }
        LOG.error("Unknown table type {}", (Object)this.metaClient.getTableType());
        throw new InvalidTableException(this.metaClient.getBasePath().toString());
    }

    public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception {
        if (this.metaClient.isTimelineNonEmpty()) {
            return this.getTableAvroSchemaInternal(includeMetadataFields, (Option<HoodieInstant>)Option.empty());
        }
        return Option.empty();
    }

    public Schema readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception {
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        HoodieInstant lastCompactionCommit = (HoodieInstant)lastCompactionCommitOpt.orElseThrow(() -> new Exception("Could not read schema from last compaction, no compaction commits found on path " + this.metaClient));
        HoodieCommitMetadata compactionMetadata = activeTimeline.readCommitMetadata(lastCompactionCommit);
        String filePath = compactionMetadata.getFileIdAndFullPaths(this.metaClient.getBasePath()).values().stream().findAny().orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for table " + this.metaClient.getBasePath()));
        StoragePath path = new StoragePath(filePath);
        return HoodieIOFactory.getIOFactory(this.metaClient.getStorage()).getFileFormatUtils(path).readAvroSchema(this.metaClient.getStorage(), path);
    }

    private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
        return TableSchemaResolver.readSchemaFromLogFile(this.metaClient.getRawStorage(), path);
    }

    public static Schema readSchemaFromLogFile(HoodieStorage storage, StoragePath path) throws IOException {
        try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(path), null, false);){
            HoodieDataBlock lastBlock = null;
            while (reader.hasNext()) {
                HoodieLogBlock block = (HoodieLogBlock)reader.next();
                if (!(block instanceof HoodieDataBlock)) continue;
                lastBlock = (HoodieDataBlock)block;
            }
            Schema schema = lastBlock != null ? lastBlock.getSchema() : null;
            return schema;
        }
    }

    public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
        HoodieTimeline completedInstants = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        HoodieTimeline timeline = completedInstants.filter(instant -> WriteOperationType.canUpdateSchema(this.getCachedCommitMetadata((HoodieInstant)instant).getOperationType()));
        return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
    }

    public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(String timestamp) {
        HoodieTimeline timeline = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(timestamp);
        return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
    }

    private Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) {
        try {
            HoodieCommitMetadata metadata = this.getCachedCommitMetadata(instant);
            String latestInternalSchemaStr = metadata.getMetadata("latest_schema");
            if (latestInternalSchemaStr != null) {
                return SerDeHelper.fromJson(latestInternalSchemaStr);
            }
            return Option.empty();
        }
        catch (Exception e) {
            throw new HoodieException("Failed to read schema from commit metadata", (Throwable)e);
        }
    }

    public Option<String> getTableHistorySchemaStrFromCommitMetadata() {
        FileBasedInternalSchemaStorageManager manager = new FileBasedInternalSchemaStorageManager(this.metaClient);
        String result = manager.getHistorySchemaStr();
        return result.isEmpty() ? Option.empty() : Option.of((Object)result);
    }

    public boolean hasOperationField() {
        try {
            Schema tableAvroSchema = this.getTableAvroSchemaFromDataFile();
            return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null;
        }
        catch (Exception e) {
            LOG.info("Failed to read operation field from avro schema ({})", (Object)e.getMessage());
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLatestCommitMetadataWithValidSchema() {
        Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata;
        if (this.latestCommitWithValidSchema == null && (instantAndCommitMetadata = this.metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema()).isPresent()) {
            HoodieInstant instant2 = (HoodieInstant)((Pair)instantAndCommitMetadata.get()).getLeft();
            HoodieCommitMetadata metadata = (HoodieCommitMetadata)((Pair)instantAndCommitMetadata.get()).getRight();
            TableSchemaResolver tableSchemaResolver = this;
            synchronized (tableSchemaResolver) {
                if (this.latestCommitWithValidSchema == null) {
                    this.latestCommitWithValidSchema = instant2;
                }
                this.commitMetadataCache.get().putIfAbsent(instant2, metadata);
            }
        }
        return Option.ofNullable((Object)this.latestCommitWithValidSchema).map(instant -> Pair.of(instant, this.commitMetadataCache.get().get(instant)));
    }

    private Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLatestCommitMetadataWithInsertOrUpdate() {
        HoodieCommitMetadata commitMetadata;
        if (this.latestCommitWithValidSchema != null && this.commitMetadataCache.get().containsKey(this.latestCommitWithValidSchema) && this.commitHasInsertOrUpdate(commitMetadata = this.commitMetadataCache.get().get(this.latestCommitWithValidSchema))) {
            this.latestCommitWithInsertOrUpdate = this.latestCommitWithValidSchema;
        }
        if (this.latestCommitWithInsertOrUpdate == null) {
            this.getLatestCommitWithInsertOrUpdate().ifPresent(instantAndCommitMetadata -> {
                HoodieInstant instant = (HoodieInstant)instantAndCommitMetadata.getLeft();
                HoodieCommitMetadata metadata = (HoodieCommitMetadata)instantAndCommitMetadata.getRight();
                TableSchemaResolver tableSchemaResolver = this;
                synchronized (tableSchemaResolver) {
                    if (this.latestCommitWithInsertOrUpdate == null) {
                        this.latestCommitWithInsertOrUpdate = instant;
                    }
                    this.commitMetadataCache.get().putIfAbsent(instant, metadata);
                }
            });
        }
        return Option.ofNullable((Object)this.latestCommitWithInsertOrUpdate).map(instant -> Pair.of(instant, this.commitMetadataCache.get().get(instant)));
    }

    private Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLatestCommitWithInsertOrUpdate() {
        HoodieTimeline commitsTimeline = this.metaClient.getCommitsTimeline().filterCompletedInstants();
        return Option.fromJavaOptional(commitsTimeline.getReverseOrderedInstants().map(instant -> {
            try {
                HoodieCommitMetadata commitMetadata = commitsTimeline.readCommitMetadata((HoodieInstant)instant);
                return Pair.of(instant, commitMetadata);
            }
            catch (IOException e) {
                throw new HoodieIOException(String.format("Failed to fetch HoodieCommitMetadata for instant (%s)", instant), e);
            }
        }).filter(pair -> this.commitHasInsertOrUpdate((HoodieCommitMetadata)pair.getRight())).findFirst());
    }

    private boolean commitHasInsertOrUpdate(HoodieCommitMetadata commitMetadata) {
        return commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).anyMatch(writeStat -> writeStat.getNumInserts() > 0L || writeStat.getNumUpdateWrites() > 0L);
    }

    private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) {
        return this.commitMetadataCache.get().computeIfAbsent(instant, missingInstant -> {
            HoodieTimeline timeline = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            try {
                return timeline.readCommitMetadata((HoodieInstant)missingInstant);
            }
            catch (IOException e) {
                throw new HoodieIOException(String.format("Failed to fetch HoodieCommitMetadata for instant (%s)", missingInstant), e);
            }
        });
    }

    private Schema fetchSchemaFromFiles(Stream<StoragePath> filePaths) {
        return filePaths.map(filePath -> {
            try {
                if (FSUtils.isLogFile(filePath)) {
                    return this.readSchemaFromLogFile((StoragePath)filePath);
                }
                return HoodieIOFactory.getIOFactory(this.metaClient.getStorage()).getFileFormatUtils((StoragePath)filePath).readAvroSchema(this.metaClient.getStorage(), (StoragePath)filePath);
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to read schema from file: " + filePath, e);
            }
        }).filter(Objects::nonNull).findFirst().orElse(null);
    }

    public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) {
        if (!partitionFields.isPresent() || ((String[])partitionFields.get()).length == 0) {
            return dataSchema;
        }
        boolean hasPartitionColNotInSchema = Arrays.stream((Object[])partitionFields.get()).anyMatch(pf -> !AvroSchemaUtils.containsFieldInSchema(dataSchema, pf));
        boolean hasPartitionColInSchema = Arrays.stream((Object[])partitionFields.get()).anyMatch(pf -> AvroSchemaUtils.containsFieldInSchema(dataSchema, pf));
        if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
            throw new HoodieSchemaException("Partition columns could not be partially contained w/in the data schema");
        }
        if (hasPartitionColNotInSchema) {
            ArrayList<Schema.Field> newFields = new ArrayList<Schema.Field>();
            for (String partitionField : (String[])partitionFields.get()) {
                newFields.add(new Schema.Field(partitionField, AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", (Object)JsonProperties.NULL_VALUE));
            }
            return AvroSchemaUtils.appendFieldsToSchema(dataSchema, newFields);
        }
        return dataSchema;
    }

    private Supplier<Exception> schemaNotFoundError() {
        return () -> new HoodieSchemaNotFoundException("No schema found for table at " + this.metaClient.getBasePath());
    }
}

