/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecordMerger;
import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
import org.apache.hudi.common.table.read.BufferedRecords;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieDeleteHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieIndexUtils {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieIndexUtils.class);

    public static List<HoodieBaseFile> getLatestBaseFilesForPartition(String partition, HoodieTable hoodieTable) {
        Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
        if (latestCommitTime.isPresent()) {
            return hoodieTable.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition, ((HoodieInstant)latestCommitTime.get()).requestedTime()).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    static boolean validateDataTypeForSecondaryIndex(List<String> sourceFields, Schema tableSchema) {
        return sourceFields.stream().allMatch(fieldToIndex -> {
            Schema schema = HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema((Schema)tableSchema, (String)fieldToIndex);
            return HoodieIndexUtils.isSecondaryIndexSupportedType(schema);
        });
    }

    public static boolean validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, Schema tableSchema) {
        return sourceFields.stream().anyMatch(fieldToIndex -> {
            Schema schema = HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema((Schema)tableSchema, (String)fieldToIndex);
            return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
        });
    }

    private static boolean isSecondaryIndexSupportedType(Schema schema) {
        if (schema.getType() == Schema.Type.UNION) {
            return schema.getTypes().stream().anyMatch(s -> s.getType() != Schema.Type.NULL && HoodieIndexUtils.isSecondaryIndexSupportedType(s));
        }
        switch (schema.getType()) {
            case STRING: {
                return schema.getLogicalType() == null;
            }
            case INT: {
                if (schema.getLogicalType() != null) {
                    return schema.getLogicalType() == LogicalTypes.date() || schema.getLogicalType() == LogicalTypes.timeMillis();
                }
                return true;
            }
            case LONG: {
                if (schema.getLogicalType() != null) {
                    return schema.getLogicalType() == LogicalTypes.timestampMillis() || schema.getLogicalType() == LogicalTypes.timestampMicros() || schema.getLogicalType() == LogicalTypes.timeMicros();
                }
                return true;
            }
            case DOUBLE: {
                return true;
            }
        }
        return false;
    }

    public static List<FileSlice> getLatestFileSlicesForPartition(String partition, HoodieTable hoodieTable) {
        Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
        if (latestCommitTime.isPresent()) {
            return hoodieTable.getHoodieView().getLatestFileSlicesBeforeOrOn(partition, ((HoodieInstant)latestCommitTime.get()).requestedTime(), true).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(List<String> partitions, HoodieEngineContext context, HoodieTable hoodieTable) {
        context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions: " + hoodieTable.getConfig().getTableName());
        return context.flatMap(partitions, (SerializableFunction & Serializable)partitionPath -> {
            List filteredFiles = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream().map(baseFile -> Pair.of((Object)partitionPath, (Object)baseFile)).collect(Collectors.toList());
            return filteredFiles.stream();
        }, Math.max(partitions.size(), 1));
    }

    public static <R> HoodieRecord<R> tagAsNewRecordIfNeeded(HoodieRecord<R> record, Option<HoodieRecordLocation> location) {
        if (location.isPresent()) {
            HoodieRecord newRecord = record.newInstance();
            newRecord.unseal();
            newRecord.setCurrentLocation((HoodieRecordLocation)location.get());
            newRecord.seal();
            return newRecord;
        }
        return record;
    }

    public static <R> HoodieRecord<R> tagRecord(HoodieRecord<R> record, HoodieRecordLocation location) {
        record.unseal();
        record.setCurrentLocation(location);
        record.seal();
        return record;
    }

    public static List<Pair<String, Long>> filterKeysFromFile(StoragePath filePath, List<String> candidateRecordKeys, HoodieStorage storage) throws HoodieIndexException {
        ValidationUtils.checkArgument((boolean)FSUtils.isBaseFile((StoragePath)filePath));
        ArrayList<Pair<String, Long>> foundRecordKeys = new ArrayList<Pair<String, Long>>();
        LOG.info(String.format("Going to filter %d keys from file %s", candidateRecordKeys.size(), filePath));
        try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory((HoodieStorage)storage).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, filePath);){
            if (!candidateRecordKeys.isEmpty()) {
                HoodieTimer timer = HoodieTimer.start();
                Set fileRowKeys = fileReader.filterRowKeys(candidateRecordKeys.stream().collect(Collectors.toSet()));
                foundRecordKeys.addAll(fileRowKeys);
                LOG.info("Checked keys against file {}, in {} ms. #candidates ({}) #found ({})", new Object[]{filePath, timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()});
                LOG.debug("Keys matching for file {} => {}", (Object)filePath, foundRecordKeys);
            }
        }
        catch (Exception e) {
            throw new HoodieIndexException("Error checking candidate keys against file.", (Throwable)e);
        }
        return foundRecordKeys;
    }

    public static boolean checkIfValidCommit(HoodieTimeline commitTimeline, String commitTs) {
        return !commitTimeline.empty() && commitTimeline.containsOrBeforeTimelineStarts(commitTs);
    }

    public static HoodieIndex createUserDefinedIndex(HoodieWriteConfig config) {
        Object instance = ReflectionUtils.loadClass((String)config.getIndexClass(), (Object[])new Object[]{config});
        if (!(instance instanceof HoodieIndex)) {
            throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
        }
        return (HoodieIndex)instance;
    }

    private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable, ReaderContextFactory<R> readerContextFactory, Schema dataSchema) {
        HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
        Option instantTime = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime);
        if (instantTime.isEmpty()) {
            return hoodieTable.getContext().emptyHoodieData();
        }
        return partitionLocations.flatMap((SerializableFunction & Serializable)p -> {
            Option fileSliceOption = Option.fromJavaOptional(hoodieTable.getHoodieView().getLatestMergedFileSlicesBeforeOrOn((String)p.getLeft(), (String)instantTime.get()).filter(fileSlice -> fileSlice.getFileId().equals(p.getRight())).findFirst());
            if (fileSliceOption.isEmpty()) {
                return Collections.emptyIterator();
            }
            Option internalSchemaOption = SerDeHelper.fromJson((String)config.getInternalSchema());
            FileSlice fileSlice2 = (FileSlice)fileSliceOption.get();
            HoodieReaderContext readerContext = readerContextFactory.getContext();
            HoodieFileGroupReader fileGroupReader = HoodieFileGroupReader.newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime((String)instantTime.get()).withFileSlice(fileSlice2).withDataSchema(dataSchema).withRequestedSchema(dataSchema).withInternalSchema(internalSchemaOption).withProps(metaClient.getTableConfig().getProps()).withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan()).build();
            try {
                HoodieRecordLocation currentLocation = new HoodieRecordLocation(fileSlice2.getBaseInstantTime(), fileSlice2.getFileId());
                return new CloseableMappingIterator(fileGroupReader.getClosableHoodieRecordIterator(), hoodieRecord -> {
                    hoodieRecord.unseal();
                    hoodieRecord.setCurrentLocation(currentLocation);
                    hoodieRecord.seal();
                    return hoodieRecord;
                });
            }
            catch (IOException ex) {
                throw new HoodieIOException("Unable to read file slice " + fileSlice2, ex);
            }
        });
    }

    private static Pair<HoodieWriteConfig, BaseKeyGenerator> getKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig tableConfig, boolean isExpressionPayload) {
        HoodieWriteConfig writeConfig = config;
        if (isExpressionPayload) {
            TypedProperties typedProperties = TypedProperties.copy((Properties)config.getProps());
            typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), tableConfig.getPayloadClass());
            typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), tableConfig.getPayloadClass());
            writeConfig = HoodieWriteConfig.newBuilder().withProperties((Properties)typedProperties).build();
        }
        try {
            return Pair.of((Object)((Object)writeConfig), (Object)((BaseKeyGenerator)HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())));
        }
        catch (IOException e) {
            throw new RuntimeException("KeyGenerator must inherit from BaseKeyGenerator to update a records partition path using spark sql merge into", e);
        }
    }

    private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecordWithExpressionPayload(HoodieRecord<R> incoming, HoodieRecord<R> existing, Schema writeSchema, Schema writeSchemaWithMetaFields, HoodieWriteConfig config, BufferedRecordMerger<R> recordMerger, BaseKeyGenerator keyGenerator, RecordContext<R> incomingRecordContext, RecordContext<R> existingRecordContext, String[] orderingFieldNames, TypedProperties properties, DeleteContext deleteContext) throws IOException {
        BufferedRecord incomingBufferedRecord = BufferedRecords.fromHoodieRecord(incoming, (Schema)writeSchemaWithMetaFields, incomingRecordContext, (Properties)properties, (String[])orderingFieldNames, (DeleteContext)deleteContext);
        BufferedRecord existingBufferedRecord = BufferedRecords.fromHoodieRecord(existing, (Schema)writeSchemaWithMetaFields, existingRecordContext, (Properties)properties, (String[])orderingFieldNames, (boolean)false);
        BufferedRecord mergeResult = recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
        if (mergeResult.isDelete()) {
            return Option.empty();
        }
        if (mergeResult.getRecord() == null || mergeResult == existingBufferedRecord) {
            return Option.of((Object)new HoodieAvroIndexedRecord((IndexedRecord)HoodieRecord.SENTINEL));
        }
        String partitionPath = HoodieIndexUtils.inferPartitionPath(incoming, existing, writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
        HoodieRecord result = existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
        HoodieRecord withMeta = result.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath), (Properties)properties);
        return Option.of((Object)withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, (Properties)properties, Option.empty(), Boolean.valueOf(config.allowOperationMetadataField()), Option.empty(), Boolean.valueOf(false), Option.of((Object)writeSchema)));
    }

    private static <R> String inferPartitionPath(HoodieRecord<R> incoming, HoodieRecord<R> existing, Schema recordSchema, BaseKeyGenerator keyGenerator, RecordContext<R> recordContext, BufferedRecord<R> resultingBufferedRecord) {
        Object record = resultingBufferedRecord.getRecord();
        if (record == incoming.getData()) {
            return incoming.getPartitionPath();
        }
        if (record == existing.getData()) {
            return existing.getPartitionPath();
        }
        return keyGenerator.getPartitionPath(recordContext.convertToAvroRecord(record, recordSchema));
    }

    private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(HoodieRecord<R> incoming, HoodieRecord<R> existing, Schema writeSchema, Schema writeSchemaWithMetaFields, HoodieWriteConfig config, BufferedRecordMerger<R> recordMerger, BaseKeyGenerator keyGenerator, RecordContext<R> incomingRecordContext, RecordContext<R> existingRecordContext, String[] orderingFieldNames, TypedProperties properties, boolean isExpressionPayload, DeleteContext deleteContext) throws IOException {
        if (isExpressionPayload) {
            return HoodieIndexUtils.mergeIncomingWithExistingRecordWithExpressionPayload(incoming, existing, writeSchema, writeSchemaWithMetaFields, config, recordMerger, keyGenerator, incomingRecordContext, existingRecordContext, orderingFieldNames, properties, deleteContext);
        }
        BufferedRecord incomingBufferedRecord = BufferedRecords.fromHoodieRecord(incoming, (Schema)writeSchema, incomingRecordContext, (Properties)properties, (String[])orderingFieldNames, (DeleteContext)deleteContext);
        BufferedRecord existingBufferedRecord = BufferedRecords.fromHoodieRecord(existing, (Schema)writeSchemaWithMetaFields, existingRecordContext, (Properties)properties, (String[])orderingFieldNames, (boolean)false);
        existingBufferedRecord.project(existingRecordContext.projectRecord(writeSchemaWithMetaFields, writeSchema));
        BufferedRecord mergeResult = recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
        if (mergeResult.isDelete()) {
            return Option.empty();
        }
        String partitionPath = HoodieIndexUtils.inferPartitionPath(incoming, existing, writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
        if (config.isFileGroupReaderBasedMergeHandle() && HoodieRecordUtils.isPayloadClassDeprecated((String)ConfigUtils.getPayloadClass((Properties)properties))) {
            return Option.of((Object)existingRecordContext.constructHoodieRecord(mergeResult, partitionPath));
        }
        HoodieRecord result = existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
        HoodieRecord resultWithMetaFields = result.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath), (Properties)properties);
        return Option.of((Object)resultWithMetaFields.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, (Properties)properties, Option.empty(), Boolean.valueOf(config.allowOperationMetadataField()), Option.empty(), Boolean.valueOf(false), Option.of((Object)writeSchema)));
    }

    public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesAndDeletionsIfNeeded(HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations, boolean shouldUpdatePartitionPath, HoodieWriteConfig config, HoodieTable hoodieTable, HoodieReaderContext<R> readerContext, SerializableSchema writerSchema) {
        boolean isExpressionPayload = config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload");
        Pair<HoodieWriteConfig, BaseKeyGenerator> keyGeneratorWriteConfigOpt = HoodieIndexUtils.getKeygenAndUpdatedWriteConfig(config, hoodieTable.getMetaClient().getTableConfig(), isExpressionPayload);
        HoodieWriteConfig updatedConfig = (HoodieWriteConfig)((Object)keyGeneratorWriteConfigOpt.getLeft());
        BaseKeyGenerator keyGenerator = (BaseKeyGenerator)keyGeneratorWriteConfigOpt.getRight();
        HoodieData taggedNewRecords = incomingRecordsAndLocations.filter((SerializableFunction & Serializable)p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        HoodieData untaggedUpdatingRecords = incomingRecordsAndLocations.filter((SerializableFunction & Serializable)p -> ((Option)p.getRight()).isPresent()).map(Pair::getLeft).distinctWithKey(HoodieRecord::getRecordKey, updatedConfig.getGlobalIndexReconcileParallelism());
        HoodieData globalLocations = incomingRecordsAndLocations.filter((SerializableFunction & Serializable)p -> ((Option)p.getRight()).isPresent()).map((SerializableFunction & Serializable)p -> Pair.of((Object)((HoodieRecordGlobalLocation)((Option)p.getRight()).get()).getPartitionPath(), (Object)((HoodieRecordGlobalLocation)((Option)p.getRight()).get()).getFileId())).distinct(updatedConfig.getGlobalIndexReconcileParallelism());
        TypedProperties properties = readerContext.getMergeProps(updatedConfig.getProps());
        RecordContext incomingRecordContext = readerContext.getRecordContext();
        ReaderContextFactory readerContextFactoryForExistingRecords = hoodieTable.getContext().getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), hoodieTable.getMetaClient().getTableConfig().getProps());
        RecordContext existingRecordContext = readerContextFactoryForExistingRecords.getContext().getRecordContext();
        SerializableSchema writerSchemaWithMetaFields = new SerializableSchema(HoodieAvroUtils.addMetadataFields((Schema)writerSchema.get(), (boolean)updatedConfig.allowOperationMetadataField()));
        AvroSchemaCache.intern((Schema)writerSchema.get());
        AvroSchemaCache.intern((Schema)writerSchemaWithMetaFields.get());
        HoodieData<HoodieRecord<R>> existingRecords = HoodieIndexUtils.getExistingRecords((HoodieData<Pair<String, String>>)globalLocations, (HoodieWriteConfig)((Object)keyGeneratorWriteConfigOpt.getLeft()), hoodieTable, readerContextFactoryForExistingRecords, writerSchemaWithMetaFields.get());
        List orderingFieldNames = HoodieRecordUtils.getOrderingFieldNames((RecordMergeMode)readerContext.getMergeMode(), (HoodieTableMetaClient)hoodieTable.getMetaClient());
        BufferedRecordMerger recordMerger = BufferedRecordMergerFactory.create(readerContext, (RecordMergeMode)readerContext.getMergeMode(), (boolean)false, (Option)readerContext.getRecordMerger(), (Schema)writerSchema.get(), (Option)Option.ofNullable((Object)Pair.of((Object)hoodieTable.getMetaClient().getTableConfig().getPayloadClass(), (Object)hoodieTable.getConfig().getPayloadClass())), (TypedProperties)properties, (Option)hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
        String[] orderingFieldsArray = orderingFieldNames.toArray(new String[0]);
        DeleteContext deleteContext = DeleteContext.fromRecordSchema((Properties)properties, (Schema)writerSchema.get());
        HoodieData taggedUpdatingRecords = untaggedUpdatingRecords.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r)).leftOuterJoin(existingRecords.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r))).values().flatMap((SerializableFunction & Serializable)entry -> {
            Schema writeSchema;
            HoodieRecord incoming = (HoodieRecord)entry.getLeft();
            Option existingOpt = (Option)entry.getRight();
            if (!existingOpt.isPresent()) {
                return Collections.singletonList(incoming).iterator();
            }
            HoodieRecord existing = (HoodieRecord)existingOpt.get();
            Option mergedOpt = HoodieIndexUtils.mergeIncomingWithExistingRecord(incoming, existing, writeSchema = writerSchema.get(), writerSchemaWithMetaFields.get(), updatedConfig, recordMerger, keyGenerator, incomingRecordContext, existingRecordContext, orderingFieldsArray, properties, isExpressionPayload, deleteContext);
            if (!mergedOpt.isPresent()) {
                return Collections.singletonList(HoodieIndexUtils.tagRecord(incoming.newInstance(existing.getKey()), existing.getCurrentLocation())).iterator();
            }
            HoodieRecord merged = (HoodieRecord)mergedOpt.get();
            if (merged.getData().equals(HoodieRecord.SENTINEL)) {
                return Collections.emptyIterator();
            }
            if (Objects.equals(merged.getPartitionPath(), existing.getPartitionPath())) {
                return Collections.singletonList(HoodieIndexUtils.tagRecord(merged, existing.getCurrentLocation())).iterator();
            }
            if (shouldUpdatePartitionPath) {
                HoodieRecord deleteRecord = HoodieDeleteHelper.createDeleteRecord(updatedConfig, existing.getKey());
                deleteRecord.setIgnoreIndexUpdate(true);
                return Arrays.asList(HoodieIndexUtils.tagRecord(deleteRecord, existing.getCurrentLocation()), merged).iterator();
            }
            return Collections.singletonList(merged.newInstance(existing.getKey())).iterator();
        });
        return taggedUpdatingRecords.union(taggedNewRecords);
    }

    public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(HoodieData<HoodieRecord<R>> incomingRecords, HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations, boolean mayContainDuplicateLookup, boolean shouldUpdatePartitionPath, HoodieWriteConfig config, HoodieTable table) {
        HoodiePairData keyAndIncomingRecords = incomingRecords.mapToPair((SerializablePairFunction & Serializable)record -> Pair.of((Object)record.getRecordKey(), (Object)record));
        ReaderContextFactory readerContextFactory = table.getContext().getReaderContextFactoryForWrite(table.getMetaClient(), config.getRecordMerger().getRecordType(), config.getProps());
        HoodieReaderContext readerContext = readerContextFactory.getContext();
        readerContext.initRecordMergerForIngestion(config.getProps());
        TypedProperties properties = readerContext.getMergeProps(config.getProps());
        SerializableSchema writerSchema = new SerializableSchema(config.getWriteSchema());
        boolean isCommitTimeOrdered = readerContext.getMergeMode() == RecordMergeMode.COMMIT_TIME_ORDERING;
        boolean requiresMergingWithOlderRecordVersion = shouldUpdatePartitionPath || table.getMetaClient().getTableConfig().getTableType() == HoodieTableType.MERGE_ON_READ;
        DeleteContext deleteContext = DeleteContext.fromRecordSchema((Properties)properties, (Schema)writerSchema.get());
        HoodieData incomingRecordsAndLocations = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values().map((SerializableFunction & Serializable)v -> {
            HoodieRecord incomingRecord = (HoodieRecord)v.getLeft();
            Option currentLocOpt = Option.ofNullable((Object)((Option)v.getRight()).orElse(null));
            if (currentLocOpt.isPresent()) {
                boolean shouldDoMergedLookUpThenTag;
                HoodieRecordGlobalLocation currentLoc = (HoodieRecordGlobalLocation)currentLocOpt.get();
                boolean bl = shouldDoMergedLookUpThenTag = mayContainDuplicateLookup || shouldUpdatePartitionPath && !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath()) || !isCommitTimeOrdered && incomingRecord.isDelete(deleteContext, (Properties)properties);
                if (requiresMergingWithOlderRecordVersion && shouldDoMergedLookUpThenTag) {
                    return Pair.of((Object)incomingRecord, (Object)currentLocOpt);
                }
                return Pair.of(HoodieIndexUtils.createNewTaggedHoodieRecord(incomingRecord, currentLoc), (Object)Option.empty());
            }
            return Pair.of((Object)incomingRecord, (Object)Option.empty());
        });
        return requiresMergingWithOlderRecordVersion ? HoodieIndexUtils.mergeForPartitionUpdatesAndDeletionsIfNeeded(incomingRecordsAndLocations, shouldUpdatePartitionPath, config, table, readerContext, writerSchema) : incomingRecordsAndLocations.map(Pair::getLeft);
    }

    public static <R> HoodieRecord<R> createNewTaggedHoodieRecord(HoodieRecord<R> oldRecord, HoodieRecordGlobalLocation location) {
        HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(), location.getPartitionPath());
        return HoodieIndexUtils.tagRecord(oldRecord.newInstance(recordKey), (HoodieRecordLocation)location);
    }

    public static void register(HoodieTableMetaClient metaClient, HoodieIndexDefinition indexDefinition) {
        LOG.info("Registering index {} of using {}", (Object)indexDefinition.getIndexName(), (Object)indexDefinition.getIndexType());
        metaClient.buildIndexDefinition(indexDefinition);
    }

    static HoodieIndexDefinition getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient, String userIndexName, String indexType, Map<String, Map<String, String>> columns, Map<String, String> options, Map<String, String> tableProperties) throws Exception {
        HoodieIndexVersion indexVersion;
        String fullIndexName = indexType.equals("secondary_index") ? "secondary_index_" + userIndexName : "expr_index_" + userIndexName;
        HoodieTableVersion tableVersion = metaClient.getTableConfig().getTableVersion();
        HoodieIndexVersion hoodieIndexVersion = indexVersion = indexType.equals("secondary_index") ? HoodieIndexVersion.getCurrentVersion((HoodieTableVersion)tableVersion, (MetadataPartitionType)MetadataPartitionType.SECONDARY_INDEX) : HoodieIndexVersion.getCurrentVersion((HoodieTableVersion)tableVersion, (MetadataPartitionType)MetadataPartitionType.EXPRESSION_INDEX);
        if (HoodieIndexUtils.indexExists(metaClient, fullIndexName)) {
            throw new HoodieMetadataIndexException("Index already exists: " + userIndexName);
        }
        ValidationUtils.checkArgument((columns.size() == 1 ? 1 : 0) != 0, (String)"Only one column can be indexed for functional or secondary index.");
        HoodieIndexUtils.validateEligibilityForSecondaryOrExpressionIndex(metaClient, indexType, tableProperties, columns, userIndexName);
        return HoodieIndexDefinition.newBuilder().withIndexName(fullIndexName).withIndexType(indexType).withIndexFunction(options.getOrDefault("expr", "identity")).withSourceFields(new ArrayList<String>(columns.keySet())).withIndexOptions(options).withVersion(indexVersion).build();
    }

    static boolean indexExists(HoodieTableMetaClient metaClient, String indexName) {
        return metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition -> partition.equals(indexName));
    }

    static void validateEligibilityForSecondaryOrExpressionIndex(HoodieTableMetaClient metaClient, String indexType, Map<String, String> options, Map<String, Map<String, String>> columns, String userIndexName) throws Exception {
        Schema tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
        ArrayList<String> sourceFields = new ArrayList<String>(columns.keySet());
        String columnName = (String)sourceFields.get(0);
        try {
            HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema((Schema)tableSchema, (String)columnName);
        }
        catch (Exception e) {
            throw new HoodieMetadataIndexException(String.format("Cannot create %s index '%s': Column '%s' does not exist in the table schema. Please verify the column name and ensure it exists in the table.", indexType.equals("secondary_index") ? "secondary" : "expression", userIndexName, columnName));
        }
        if (!HoodieIndexUtils.validateDataTypeForSecondaryOrExpressionIndex(sourceFields, tableSchema)) {
            Schema fieldSchema = HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema((Schema)tableSchema, (String)columnName);
            throw new HoodieMetadataIndexException(String.format("Cannot create %s index '%s': Column '%s' has unsupported data type '%s'. Complex types (RECORD, ARRAY, MAP) are not supported for indexing. Please choose a column with a primitive data type.", indexType.equals("secondary_index") ? "secondary" : "expression", userIndexName, columnName, fieldSchema.getType()));
        }
        if (indexType.equals("secondary_index")) {
            if (!HoodieIndexUtils.validateDataTypeForSecondaryIndex(sourceFields, tableSchema)) {
                Schema fieldSchema = HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema((Schema)tableSchema, (String)columnName);
                String actualType = fieldSchema.getType().toString();
                if (fieldSchema.getLogicalType() != null) {
                    actualType = actualType + " with logical type " + fieldSchema.getLogicalType();
                }
                throw new HoodieMetadataIndexException(String.format("Cannot create secondary index '%s': Column '%s' has unsupported data type '%s'. Secondary indexes only support: STRING, CHAR, INT, BIGINT/LONG, SMALLINT, TINYINT, FLOAT, DOUBLE, TIMESTAMP (including logical types timestampMillis, timestampMicros), and DATE types. Please choose a column with one of these supported types.", userIndexName, columnName, actualType));
            }
            boolean hasRecordIndex = metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition -> partition.equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
            boolean recordIndexEnabled = Boolean.parseBoolean(options.getOrDefault(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(), options.getOrDefault("hoodie.metadata.record.index.enable", ((Boolean)HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.defaultValue()).toString())));
            if (!hasRecordIndex && !recordIndexEnabled) {
                throw new HoodieMetadataIndexException(String.format("Cannot create secondary index '%s': Record index is required for secondary indexes but is not enabled. Please enable the record index by setting '%s' to 'true' in the index creation options, or create a record index first using: CREATE INDEX record_index ON %s USING record_index", userIndexName, HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(), metaClient.getTableConfig().getTableName()));
            }
        }
    }
}

