/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.SchemaCompatibility;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseMergeHelper;
import org.apache.hudi.util.ExecutorFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieMergeHelper<T>
extends BaseMergeHelper {
    private static final Logger LOG = LogManager.getLogger(HoodieMergeHelper.class);

    private HoodieMergeHelper() {
    }

    public static HoodieMergeHelper newInstance() {
        return MergeHelperHolder.HOODIE_MERGE_HELPER;
    }

    @Override
    public void runMerge(HoodieTable<?, ?, ?, ?> table, HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws IOException {
        HoodieWriteConfig writeConfig = table.getConfig();
        HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
        Configuration hadoopConf = new Configuration(table.getHadoopConf());
        HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType();
        HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(hadoopConf, mergeHandle.getOldFilePath());
        HoodieFileReader bootstrapFileReader = null;
        Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
        Schema readerSchema = baseFileReader.getSchema();
        Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt = this.composeSchemaEvolutionTransformer(readerSchema, writerSchema, baseFile, writeConfig, table.getMetaClient());
        boolean isPureProjection = AvroSchemaUtils.isStrictProjectionOf(readerSchema, writerSchema) && !schemaEvolutionTransformerOpt.isPresent();
        boolean shouldRewriteInWriterSchema = writeConfig.shouldUseExternalSchemaTransformation() || !isPureProjection || baseFile.getBootstrapBaseFile().isPresent();
        HoodieExecutor<Void> wrapper = null;
        try {
            Schema recordSchema;
            Iterator recordIterator;
            ClosableIterator baseFileRecordIterator = baseFileReader.getRecordIterator(isPureProjection ? writerSchema : readerSchema);
            if (baseFile.getBootstrapBaseFile().isPresent()) {
                Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
                Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
                bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath);
                recordIterator = new MergingIterator<HoodieRecord>(baseFileRecordIterator, bootstrapFileReader.getRecordIterator(), (left, right) -> left.joinWith((HoodieRecord)right, mergeHandle.getWriterSchemaWithMetaFields()));
                recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
            } else {
                recordIterator = baseFileRecordIterator;
                recordSchema = isPureProjection ? writerSchema : readerSchema;
            }
            boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);
            wrapper = ExecutorFactory.create(writeConfig, recordIterator, new BaseMergeHelper.UpdateHandler(mergeHandle), record -> {
                HoodieRecord newRecord = schemaEvolutionTransformerOpt.isPresent() ? (HoodieRecord)((Function)schemaEvolutionTransformerOpt.get()).apply(record) : (shouldRewriteInWriterSchema ? record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema) : record);
                return isBufferingRecords ? newRecord.copy() : newRecord;
            }, table.getPreExecuteRunnable());
            wrapper.execute();
        }
        catch (Exception e) {
            throw new HoodieException(e);
        }
        finally {
            baseFileReader.close();
            if (bootstrapFileReader != null) {
                bootstrapFileReader.close();
            }
            if (null != wrapper) {
                wrapper.shutdownNow();
                wrapper.awaitTermination();
            }
            mergeHandle.close();
        }
    }

    private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTransformer(Schema recordSchema, Schema writerSchema, HoodieBaseFile baseFile, HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) {
        Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(writeConfig.getInternalSchema());
        if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
            boolean needToReWriteRecord;
            InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema, querySchemaOpt.get());
            long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
            InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
            if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA).booleanValue()) {
                TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
                try {
                    fileSchema = AvroInternalSchemaConverter.convert(tableSchemaResolver.getTableAvroSchema(true));
                }
                catch (Exception e) {
                    throw new HoodieException(String.format("Failed to get InternalSchema for given versionId: %s", commitInstantTime), e);
                }
            }
            InternalSchema writeInternalSchema = fileSchema;
            List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
            List<String> colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName();
            List sameCols = colNamesFromWriteSchema.stream().filter(f -> {
                int writerSchemaFieldId = writeInternalSchema.findIdByName((String)f);
                int querySchemaFieldId = querySchema.findIdByName((String)f);
                return colNamesFromQuerySchema.contains(f) && writerSchemaFieldId == querySchemaFieldId && writerSchemaFieldId != -1 && Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), querySchema.findType(querySchemaFieldId));
            }).collect(Collectors.toList());
            InternalSchema mergedSchema = new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema();
            Schema newWriterSchema = AvroInternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName());
            Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, newWriterSchema.getFullName());
            boolean bl = needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() || SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
            if (needToReWriteRecord) {
                Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
                return Option.of(record -> record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), newWriterSchema, renameCols));
            }
            return Option.empty();
        }
        return Option.empty();
    }

    private static class MergeHelperHolder {
        private static final HoodieMergeHelper HOODIE_MERGE_HELPER = new HoodieMergeHelper();

        private MergeHelperHolder() {
        }
    }
}

