/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.InsertBucket;
import org.apache.hudi.table.action.commit.InsertBucketCumulativeWeightPair;
import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.table.action.commit.SparkHoodiePartitioner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class UpsertPartitioner<T extends HoodieRecordPayload<T>>
extends SparkHoodiePartitioner<T> {
    private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);
    protected List<SmallFile> smallFiles = new ArrayList<SmallFile>();
    private int totalBuckets = 0;
    private HashMap<String, Integer> updateLocationToBucket = new HashMap();
    private HashMap<String, List<InsertBucketCumulativeWeightPair>> partitionPathToInsertBucketInfos = new HashMap();
    private HashMap<Integer, BucketInfo> bucketInfoMap = new HashMap();
    protected final HoodieWriteConfig config;

    public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table, HoodieWriteConfig config) {
        super(profile, table);
        this.config = config;
        this.assignUpdates(profile);
        this.assignInserts(profile, context);
        LOG.info((Object)("Total Buckets :" + this.totalBuckets + ", buckets info => " + this.bucketInfoMap + ", \nPartition to insert buckets => " + this.partitionPathToInsertBucketInfos + ", \nUpdateLocations mapped to buckets =>" + this.updateLocationToBucket));
    }

    private void assignUpdates(WorkloadProfile profile) {
        Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getInputPartitionPathStatMap().entrySet();
        for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
            WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), new WorkloadStat());
            for (Map.Entry<String, Pair<String, Long>> updateLocEntry : partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
                this.addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
                if (!profile.hasOutputWorkLoadStats()) continue;
                HoodieRecordLocation hoodieRecordLocation = new HoodieRecordLocation(updateLocEntry.getValue().getKey(), updateLocEntry.getKey());
                outputWorkloadStats.addUpdates(hoodieRecordLocation, updateLocEntry.getValue().getValue());
            }
            if (!profile.hasOutputWorkLoadStats()) continue;
            profile.updateOutputPartitionPathStatMap(partitionStat.getKey(), outputWorkloadStats);
        }
    }

    private int addUpdateBucket(String partitionPath, String fileIdHint) {
        int bucket = this.totalBuckets;
        this.updateLocationToBucket.put(fileIdHint, bucket);
        BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
        this.bucketInfoMap.put(this.totalBuckets, bucketInfo);
        ++this.totalBuckets;
        return bucket;
    }

    private Map<String, Set<String>> getPartitionPathToPendingClusteringFileGroupsId() {
        Map<String, Set<String>> partitionPathToInPendingClusteringFileId = this.table.getFileSystemView().getFileGroupsInPendingClustering().map(fileGroupIdAndInstantPair -> Pair.of(((HoodieFileGroupId)fileGroupIdAndInstantPair.getKey()).getPartitionPath(), ((HoodieFileGroupId)fileGroupIdAndInstantPair.getKey()).getFileId())).collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
        return partitionPathToInPendingClusteringFileId;
    }

    private List<SmallFile> filterSmallFilesInClustering(Set<String> pendingClusteringFileGroupsId, List<SmallFile> smallFiles) {
        if (!pendingClusteringFileGroupsId.isEmpty()) {
            return smallFiles.stream().filter(smallFile -> !pendingClusteringFileGroupsId.contains(smallFile.location.getFileId())).collect(Collectors.toList());
        }
        return smallFiles;
    }

    private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) {
        Set<String> partitionPaths = profile.getPartitionPaths();
        long averageRecordSize = UpsertPartitioner.averageBytesPerRecord(this.table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), this.config);
        LOG.info((Object)("AvgRecordSize => " + averageRecordSize));
        Map<String, List<SmallFile>> partitionSmallFilesMap = this.getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);
        Map<String, Set<String>> partitionPathToPendingClusteringFileGroupsId = this.getPartitionPathToPendingClusteringFileGroupsId();
        for (String partitionPath : partitionPaths) {
            WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
            WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionPath, new WorkloadStat());
            if (pStat.getNumInserts() > 0L) {
                List<SmallFile> smallFiles = this.filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath, Collections.emptySet()), partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList()));
                this.smallFiles.addAll(smallFiles);
                LOG.info((Object)("For partitionPath : " + partitionPath + " Small Files => " + smallFiles));
                long totalUnassignedInserts = pStat.getNumInserts();
                ArrayList<Integer> bucketNumbers = new ArrayList<Integer>();
                ArrayList<Long> recordsPerBucket = new ArrayList<Long>();
                for (SmallFile smallFile : smallFiles) {
                    int bucket;
                    long recordsToAppend = Math.min((this.config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);
                    if (recordsToAppend <= 0L) continue;
                    if (this.updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
                        bucket = this.updateLocationToBucket.get(smallFile.location.getFileId());
                        LOG.info((Object)("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket));
                    } else {
                        bucket = this.addUpdateBucket(partitionPath, smallFile.location.getFileId());
                        LOG.info((Object)("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket));
                    }
                    if (profile.hasOutputWorkLoadStats()) {
                        outputWorkloadStats.addInserts(smallFile.location, recordsToAppend);
                    }
                    bucketNumbers.add(bucket);
                    recordsPerBucket.add(recordsToAppend);
                    if ((totalUnassignedInserts -= recordsToAppend) > 0L) continue;
                    break;
                }
                if (totalUnassignedInserts > 0L) {
                    long insertRecordsPerBucket = this.config.getCopyOnWriteInsertSplitSize();
                    if (this.config.shouldAutoTuneInsertSplits()) {
                        insertRecordsPerBucket = this.config.getParquetMaxFileSize() / averageRecordSize;
                    }
                    int insertBuckets = (int)Math.ceil(1.0 * (double)totalUnassignedInserts / (double)insertRecordsPerBucket);
                    LOG.info((Object)("After small file assignment: unassignedInserts => " + totalUnassignedInserts + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket));
                    for (int b = 0; b < insertBuckets; ++b) {
                        bucketNumbers.add(this.totalBuckets);
                        if (b < insertBuckets - 1) {
                            recordsPerBucket.add(insertRecordsPerBucket);
                        } else {
                            recordsPerBucket.add(totalUnassignedInserts - (long)(insertBuckets - 1) * insertRecordsPerBucket);
                        }
                        BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
                        this.bucketInfoMap.put(this.totalBuckets, bucketInfo);
                        if (profile.hasOutputWorkLoadStats()) {
                            outputWorkloadStats.addInserts(new HoodieRecordLocation("null", bucketInfo.getFileIdPrefix()), (Long)recordsPerBucket.get(recordsPerBucket.size() - 1));
                        }
                        ++this.totalBuckets;
                    }
                }
                ArrayList<InsertBucketCumulativeWeightPair> insertBuckets = new ArrayList<InsertBucketCumulativeWeightPair>();
                double currentCumulativeWeight = 0.0;
                for (int i = 0; i < bucketNumbers.size(); ++i) {
                    InsertBucket bkt = new InsertBucket();
                    bkt.bucketNumber = (Integer)bucketNumbers.get(i);
                    bkt.weight = 1.0 * (double)((Long)recordsPerBucket.get(i)).longValue() / (double)pStat.getNumInserts();
                    insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, currentCumulativeWeight += bkt.weight));
                }
                LOG.info((Object)("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets));
                this.partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
            }
            if (!profile.hasOutputWorkLoadStats()) continue;
            profile.updateOutputPartitionPathStatMap(partitionPath, outputWorkloadStats);
        }
    }

    private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, HoodieEngineContext context) {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<String, List<SmallFile>>();
        if (this.config.getParquetSmallFileLimit() <= 0) {
            return partitionSmallFilesMap;
        }
        if (partitionPaths != null && partitionPaths.size() > 0) {
            context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + this.config.getTableName());
            JavaRDD partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
            partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction & Serializable)partitionPath -> new Tuple2(partitionPath, this.getSmallFiles((String)partitionPath))).collectAsMap();
        }
        return partitionSmallFilesMap;
    }

    protected List<SmallFile> getSmallFiles(String partitionPath) {
        ArrayList<SmallFile> smallFileLocations = new ArrayList<SmallFile>();
        HoodieTimeline commitTimeline = this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
        if (!commitTimeline.empty()) {
            HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
            List allFiles = this.table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
            for (HoodieBaseFile file : allFiles) {
                if (file.getFileSize() >= (long)this.config.getParquetSmallFileLimit()) continue;
                String filename = file.getFileName();
                SmallFile sf = new SmallFile();
                sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
                sf.sizeBytes = file.getFileSize();
                smallFileLocations.add(sf);
            }
        }
        return smallFileLocations;
    }

    public List<BucketInfo> getBucketInfos() {
        return Collections.unmodifiableList(new ArrayList<BucketInfo>(this.bucketInfoMap.values()));
    }

    @Override
    public BucketInfo getBucketInfo(int bucketNumber) {
        return this.bucketInfoMap.get(bucketNumber);
    }

    public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String partitionPath) {
        return this.partitionPathToInsertBucketInfos.get(partitionPath);
    }

    public int numPartitions() {
        return this.totalBuckets;
    }

    @Override
    public int getNumPartitions() {
        return this.totalBuckets;
    }

    @Override
    public int getPartition(Object key) {
        Tuple2 keyLocation = (Tuple2)key;
        if (((Option)keyLocation._2()).isPresent()) {
            HoodieRecordLocation location = (HoodieRecordLocation)((Option)keyLocation._2()).get();
            return this.updateLocationToBucket.get(location.getFileId());
        }
        String partitionPath = ((HoodieKey)keyLocation._1()).getPartitionPath();
        List<InsertBucketCumulativeWeightPair> targetBuckets = this.partitionPathToInsertBucketInfos.get(partitionPath);
        long totalInserts = Math.max(1L, this.profile.getWorkloadStat(partitionPath).getNumInserts());
        long hashOfKey = NumericUtils.getMessageDigestHash("MD5", ((HoodieKey)keyLocation._1()).getRecordKey());
        double r = 1.0 * (double)Math.floorMod(hashOfKey, totalInserts) / (double)totalInserts;
        int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r));
        if (index >= 0) {
            return ((InsertBucket)targetBuckets.get((int)index).getKey()).bucketNumber;
        }
        if (-1 * index - 1 < targetBuckets.size()) {
            return ((InsertBucket)targetBuckets.get((int)(-1 * index - 1)).getKey()).bucketNumber;
        }
        return ((InsertBucket)targetBuckets.get((int)0).getKey()).bucketNumber;
    }

    protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
        long avgSize;
        block3: {
            avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
            long fileSizeThreshold = (long)(hoodieWriteConfig.getRecordSizeEstimationThreshold() * (double)hoodieWriteConfig.getParquetSmallFileLimit());
            try {
                if (commitTimeline.empty()) break block3;
                Iterator instants = commitTimeline.getReverseOrderedInstants().iterator();
                while (instants.hasNext()) {
                    HoodieInstant instant = (HoodieInstant)instants.next();
                    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
                    long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
                    long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
                    if (totalBytesWritten <= fileSizeThreshold || totalRecordsWritten <= 0L) continue;
                    avgSize = (long)Math.ceil(1.0 * (double)totalBytesWritten / (double)totalRecordsWritten);
                    break;
                }
            }
            catch (Throwable t) {
                LOG.error((Object)"Error trying to compute average bytes/record ", t);
            }
        }
        return avgSize;
    }
}

