/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.update.strategy;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class SparkConsistentBucketDuplicateUpdateStrategy<T extends HoodieRecordPayload<T>>
extends UpdateStrategy<T, HoodieData<HoodieRecord<T>>> {
    private static final Logger LOG = LogManager.getLogger(SparkConsistentBucketDuplicateUpdateStrategy.class);

    public SparkConsistentBucketDuplicateUpdateStrategy(HoodieEngineContext engineContext, HoodieTable table, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
        super(engineContext, table, fileGroupsInPendingClustering);
    }

    @Override
    public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
        if (this.fileGroupsInPendingClustering.isEmpty()) {
            return Pair.of(taggedRecordsRDD, Collections.emptySet());
        }
        HoodieData<HoodieRecord> filteredRecordsRDD = taggedRecordsRDD.filter(r -> {
            ValidationUtils.checkState(r.getCurrentLocation() != null);
            return this.fileGroupsInPendingClustering.contains(new HoodieFileGroupId(r.getPartitionPath(), r.getCurrentLocation().getFileId()));
        });
        if (filteredRecordsRDD.count() == 0L) {
            return Pair.of(taggedRecordsRDD, Collections.emptySet());
        }
        List instantPlanPairs = this.table.getMetaClient().getActiveTimeline().filterInflightsAndRequested().filter(instant -> instant.getAction().equals("replacecommit")).getInstantsAsStream().map(instant -> ClusteringUtils.getClusteringPlan(this.table.getMetaClient(), instant)).flatMap(o -> o.isPresent() ? Stream.of(o.get()) : Stream.empty()).collect(Collectors.toList());
        HashSet<String> partitions = new HashSet<String>(filteredRecordsRDD.map(HoodieRecord::getPartitionPath).distinct().collectAsList());
        HashMap<String, HoodieConsistentHashingMetadata> partitionToHashingMeta = new HashMap<String, HoodieConsistentHashingMetadata>();
        HashMap<String, String> partitionToInstant = new HashMap<String, String>();
        for (Pair pair : instantPlanPairs) {
            String instant2 = ((HoodieInstant)pair.getLeft()).getTimestamp();
            HoodieClusteringPlan plan = (HoodieClusteringPlan)((Object)pair.getRight());
            this.extractHashingMetadataFromClusteringPlan(instant2, plan, partitions, partitionToHashingMeta, partitionToInstant);
        }
        Map<String, ConsistentBucketIdentifier> partitionToIdentifier = partitionToHashingMeta.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ConsistentBucketIdentifier((HoodieConsistentHashingMetadata)e.getValue())));
        List<String> indexKeyFields = Arrays.asList(this.table.getConfig().getBucketIndexHashField().split(","));
        HoodieData<HoodieRecord> redirectedRecordsRDD = filteredRecordsRDD.map(r -> {
            ConsistentHashingNode node = ((ConsistentBucketIdentifier)partitionToIdentifier.get(r.getPartitionPath())).getBucket(r.getKey(), indexKeyFields);
            return HoodieIndexUtils.getTaggedRecord(new HoodieAvroRecord<HoodieRecordPayload>(r.getKey(), (HoodieRecordPayload)r.getData(), r.getOperation()), Option.ofNullable(new HoodieRecordLocation((String)partitionToInstant.get(r.getPartitionPath()), FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
        });
        return Pair.of(taggedRecordsRDD.union(redirectedRecordsRDD), Collections.emptySet());
    }

    private void extractHashingMetadataFromClusteringPlan(String instant, HoodieClusteringPlan plan, Set<String> recordPartitions, Map<String, HoodieConsistentHashingMetadata> partitionToHashingMeta, Map<String, String> partitionToInstant) {
        for (HoodieClusteringGroup group : plan.getInputGroups()) {
            Map<String, String> groupMeta = group.getExtraMetadata();
            String p = groupMeta.get("clustering.group.partition");
            ValidationUtils.checkState(p != null, "Clustering plan does not has partition info, plan: " + (Object)((Object)plan));
            if (!recordPartitions.contains(p)) {
                return;
            }
            String preInstant = partitionToInstant.putIfAbsent(p, instant);
            ValidationUtils.checkState(preInstant == null || preInstant.equals(instant), "Find a partition: " + p + " with two clustering instants");
            if (!partitionToHashingMeta.containsKey(p)) {
                Option<HoodieConsistentHashingMetadata> metadataOption = HoodieSparkConsistentBucketIndex.loadMetadata(this.table, p);
                ValidationUtils.checkState(metadataOption.isPresent(), "Failed to load consistent hashing metadata for partition: " + p);
                partitionToHashingMeta.put(p, metadataOption.get());
            }
            try {
                String nodeJson = group.getExtraMetadata().get("clustering.group.child.node");
                List<ConsistentHashingNode> nodes = ConsistentHashingNode.fromJsonString(nodeJson);
                partitionToHashingMeta.get(p).getChildrenNodes().addAll(nodes);
            }
            catch (Exception e) {
                LOG.error((Object)"Failed to parse child nodes in clustering plan", (Throwable)e);
                throw new HoodieException("Failed to parse child nodes in clustering plan, partition: " + p + ", cluster group: " + (Object)((Object)group), e);
            }
        }
    }
}

