/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseMergeHelper;
import org.apache.hudi.util.ExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieMergeHelper<T>
extends BaseMergeHelper {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHelper.class);

    private HoodieMergeHelper() {
    }

    public static HoodieMergeHelper newInstance() {
        return MergeHelperHolder.HOODIE_MERGE_HELPER;
    }

    @Override
    public void runMerge(HoodieTable<?, ?, ?, ?> table, HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws IOException {
        HoodieWriteConfig writeConfig = table.getConfig();
        HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
        HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType();
        HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory((HoodieStorage)table.getStorage().newInstance(mergeHandle.getOldFilePath(), table.getStorageConf().newInstance())).getReaderFactory(recordType).getFileReader((HoodieConfig)writeConfig, mergeHandle.getOldFilePath());
        HoodieFileReader bootstrapFileReader = null;
        Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
        Schema readerSchema = baseFileReader.getSchema();
        Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt = this.composeSchemaEvolutionTransformer(readerSchema, writerSchema, baseFile, writeConfig, table.getMetaClient());
        boolean isPureProjection = schemaEvolutionTransformerOpt.isEmpty() && AvroSchemaUtils.isStrictProjectionOf((Schema)readerSchema, (Schema)writerSchema);
        boolean shouldRewriteInWriterSchema = !isPureProjection || baseFile.getBootstrapBaseFile().isPresent() || writeConfig.shouldUseExternalSchemaTransformation();
        HoodieExecutor<Void> executor = null;
        try {
            ClosableIterator recordIterator;
            Schema recordSchema;
            if (baseFile.getBootstrapBaseFile().isPresent()) {
                StoragePath bootstrapFilePath = ((BaseFile)baseFile.getBootstrapBaseFile().get()).getStoragePath();
                HoodieStorage storage = table.getStorage().newInstance(bootstrapFilePath, table.getStorageConf().newInstance());
                bootstrapFileReader = HoodieIOFactory.getIOFactory((HoodieStorage)storage).getReaderFactory(recordType).newBootstrapFileReader(baseFileReader, HoodieIOFactory.getIOFactory((HoodieStorage)storage).getReaderFactory(recordType).getFileReader((HoodieConfig)writeConfig, bootstrapFilePath), mergeHandle.getPartitionFields(), mergeHandle.getPartitionValues());
                recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
                recordIterator = bootstrapFileReader.getRecordIterator(recordSchema);
            } else {
                recordSchema = isPureProjection ? writerSchema : readerSchema;
                recordIterator = baseFileReader.getRecordIterator(recordSchema);
            }
            boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);
            executor = ExecutorFactory.create(writeConfig, recordIterator, new BaseMergeHelper.UpdateHandler(mergeHandle), record -> {
                HoodieRecord newRecord = schemaEvolutionTransformerOpt.isPresent() ? (HoodieRecord)((Function)schemaEvolutionTransformerOpt.get()).apply(record) : (shouldRewriteInWriterSchema ? record.rewriteRecordWithNewSchema(recordSchema, (Properties)writeConfig.getProps(), writerSchema) : record);
                return isBufferingRecords ? newRecord.copy() : newRecord;
            }, table.getPreExecuteRunnable());
            executor.execute();
        }
        catch (Exception e) {
            throw new HoodieException((Throwable)e);
        }
        finally {
            if (executor != null) {
                executor.shutdownNow();
                executor.awaitTermination();
            } else {
                baseFileReader.close();
                if (bootstrapFileReader != null) {
                    bootstrapFileReader.close();
                }
                mergeHandle.close();
            }
        }
    }

    private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTransformer(Schema recordSchema, Schema writerSchema, HoodieBaseFile baseFile, HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) {
        Option querySchemaOpt = SerDeHelper.fromJson((String)writeConfig.getInternalSchema());
        if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
            boolean needToReWriteRecord;
            InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema((Schema)writerSchema, (InternalSchema)((InternalSchema)querySchemaOpt.get()), (boolean)writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
            long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
            InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId((long)commitInstantTime, (HoodieTableMetaClient)metaClient);
            if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA).booleanValue()) {
                TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
                try {
                    fileSchema = AvroInternalSchemaConverter.convert((Schema)tableSchemaResolver.getTableAvroSchema(true));
                }
                catch (Exception e) {
                    throw new HoodieException(String.format("Failed to get InternalSchema for given versionId: %s", commitInstantTime), (Throwable)e);
                }
            }
            InternalSchema writeInternalSchema = fileSchema;
            List colNamesFromQuerySchema = querySchema.getAllColsFullName();
            List colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName();
            List sameCols = colNamesFromWriteSchema.stream().filter(f -> {
                int writerSchemaFieldId = writeInternalSchema.findIdByName(f);
                int querySchemaFieldId = querySchema.findIdByName(f);
                return colNamesFromQuerySchema.contains(f) && writerSchemaFieldId == querySchemaFieldId && writerSchemaFieldId != -1 && Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), querySchema.findType(querySchemaFieldId));
            }).collect(Collectors.toList());
            InternalSchema mergedSchema = new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema();
            Schema newWriterSchema = AvroInternalSchemaConverter.convert((InternalSchema)mergedSchema, (String)writerSchema.getFullName());
            Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert((InternalSchema)writeInternalSchema, (String)newWriterSchema.getFullName());
            boolean bl = needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() || SchemaCompatibility.checkReaderWriterCompatibility((Schema)newWriterSchema, (Schema)writeSchemaFromFile).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
            if (needToReWriteRecord) {
                Map renameCols = InternalSchemaUtils.collectRenameCols((InternalSchema)writeInternalSchema, (InternalSchema)querySchema);
                return Option.of(record -> record.rewriteRecordWithNewSchema(recordSchema, (Properties)writeConfig.getProps(), newWriterSchema, renameCols));
            }
            return Option.empty();
        }
        return Option.empty();
    }

    private static class MergeHelperHolder {
        private static final HoodieMergeHelper HOODIE_MERGE_HELPER = new HoodieMergeHelper();

        private MergeHelperHolder() {
        }
    }
}

