/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering.update.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkConsistentBucketUpdateStrategy<T extends HoodieRecordPayload>
extends UpdateStrategy<T, List<Pair<List<HoodieRecord>, String>>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConsistentBucketUpdateStrategy.class);
    private boolean initialized = false;
    private List<String> indexKeyFields;
    private Map<String, Pair<String, ConsistentBucketIdentifier>> partitionToIdentifier;
    private String lastRefreshInstant = "00000000000000";

    public FlinkConsistentBucketUpdateStrategy(HoodieFlinkWriteClient writeClient, List<String> indexKeyFields) {
        super(writeClient.getEngineContext(), (HoodieTable)writeClient.getHoodieTable(), Collections.emptySet());
        this.indexKeyFields = indexKeyFields;
        this.partitionToIdentifier = new HashMap<String, Pair<String, ConsistentBucketIdentifier>>();
    }

    public void initialize(HoodieFlinkWriteClient writeClient) {
        HoodieInstant latestPendingReplaceInstant;
        if (this.initialized) {
            return;
        }
        HoodieFlinkTable table = writeClient.getHoodieTable();
        List instants = ClusteringUtils.getPendingClusteringInstantTimes((HoodieTableMetaClient)table.getMetaClient());
        if (!instants.isEmpty() && (latestPendingReplaceInstant = (HoodieInstant)instants.get(instants.size() - 1)).getTimestamp().compareTo(this.lastRefreshInstant) > 0) {
            LOG.info("Found new pending replacement commit. Last pending replacement commit is {}.", (Object)latestPendingReplaceInstant);
            this.table = table;
            this.fileGroupsInPendingClustering = table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
            this.lastRefreshInstant = latestPendingReplaceInstant.getTimestamp();
            this.partitionToIdentifier.clear();
        }
        this.initialized = true;
    }

    public void reset() {
        this.initialized = false;
    }

    public Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> handleUpdate(List<Pair<List<HoodieRecord>, String>> recordsList) {
        ValidationUtils.checkArgument((boolean)this.initialized, (String)"Strategy has not been initialized");
        ValidationUtils.checkArgument((recordsList.size() == 1 ? 1 : 0) != 0);
        Pair<List<HoodieRecord>, String> recordsInstantPair = recordsList.get(0);
        HoodieRecord sampleRecord = (HoodieRecord)((List)recordsInstantPair.getLeft()).get(0);
        HoodieFileGroupId fileId = new HoodieFileGroupId(sampleRecord.getPartitionPath(), sampleRecord.getCurrentLocation().getFileId());
        if (this.fileGroupsInPendingClustering.isEmpty() || !this.fileGroupsInPendingClustering.contains(fileId)) {
            return Pair.of(recordsList, Collections.singleton(fileId));
        }
        return this.doHandleUpdate(fileId, recordsInstantPair);
    }

    private Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> doHandleUpdate(HoodieFileGroupId fileId, Pair<List<HoodieRecord>, String> recordsInstantPair) {
        Pair<String, ConsistentBucketIdentifier> bucketIdentifierPair = this.getBucketIdentifierOfPartition(fileId.getPartitionPath());
        String clusteringInstant = (String)bucketIdentifierPair.getLeft();
        ConsistentBucketIdentifier identifier = (ConsistentBucketIdentifier)bucketIdentifierPair.getRight();
        Map<String, List<HoodieRecord>> fileIdToRecords = ((List)recordsInstantPair.getLeft()).stream().map(HoodieRecord::newInstance).collect(Collectors.groupingBy(r -> identifier.getBucket(r.getKey(), this.indexKeyFields).getFileIdPrefix()));
        ArrayList<Object> recordsList = new ArrayList<Object>();
        LinkedHashSet<HoodieFileGroupId> fgs = new LinkedHashSet<HoodieFileGroupId>();
        for (Map.Entry<String, List<HoodieRecord>> e : fileIdToRecords.entrySet()) {
            String newFileId = FSUtils.createNewFileId((String)e.getKey(), (int)0);
            this.patchFileIdToRecords(e.getValue(), newFileId);
            recordsList.add(Pair.of(e.getValue(), (Object)clusteringInstant));
            fgs.add(new HoodieFileGroupId(fileId.getPartitionPath(), newFileId));
        }
        LOG.info("Apply duplicate update for FileGroup {}, routing records to: {}.", (Object)fileId, (Object)String.join((CharSequence)",", fileIdToRecords.keySet()));
        recordsList.add(recordsInstantPair);
        fgs.add(fileId);
        return Pair.of(recordsList, fgs);
    }

    private Pair<String, ConsistentBucketIdentifier> getBucketIdentifierOfPartition(String partition) {
        return this.partitionToIdentifier.computeIfAbsent(partition, p -> (Pair)ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(Collections.singleton(p), (HoodieTable)this.table).get(p));
    }

    private void patchFileIdToRecords(List<HoodieRecord> records, String fileId) {
        HoodieRecord first = records.get(0);
        HoodieAvroRecord record = new HoodieAvroRecord(first.getKey(), (HoodieRecordPayload)first.getData(), first.getOperation());
        HoodieRecordLocation newLoc = new HoodieRecordLocation("U", fileId);
        record.setCurrentLocation(newLoc);
        records.set(0, (HoodieRecord)record);
    }
}

