/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import com.google.common.hash.Hashing;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.func.CopyOnWriteLazyInsertIterable;
import org.apache.hudi.func.ParquetReaderIterator;
import org.apache.hudi.func.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCleanHelper;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.RollbackExecutor;
import org.apache.hudi.table.RollbackRequest;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload>
extends HoodieTable<T> {
    private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class);

    public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) {
        super(config, jsc);
    }

    private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc(HoodieTable table) {
        return (PairFlatMapFunction & Serializable)iter -> {
            HashMap<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<String, PartitionCleanStat>();
            HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
            Path basePath = new Path(table.getMetaClient().getBasePath());
            while (iter.hasNext()) {
                Tuple2 partitionDelFileTuple = (Tuple2)iter.next();
                String partitionPath = (String)partitionDelFileTuple._1();
                String delFileName = (String)partitionDelFileTuple._2();
                Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName);
                String deletePathStr = deletePath.toString();
                Boolean deletedFileResult = HoodieCopyOnWriteTable.deleteFileAndGetResult(fs, deletePathStr);
                if (!partitionCleanStatMap.containsKey(partitionPath)) {
                    partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
                }
                PartitionCleanStat partitionCleanStat = (PartitionCleanStat)partitionCleanStatMap.get(partitionPath);
                partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
                partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult);
            }
            return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2(e.getKey(), e.getValue())).collect(Collectors.toList()).iterator();
        };
    }

    private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
        Path deletePath = new Path(deletePathStr);
        LOG.debug((Object)("Working on delete path :" + deletePath));
        try {
            boolean deleteResult = fs.delete(deletePath, false);
            if (deleteResult) {
                LOG.debug((Object)("Cleaned file at path :" + deletePath));
            }
            return deleteResult;
        }
        catch (FileNotFoundException fio) {
            return false;
        }
    }

    @Override
    public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
        if (profile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        return new UpsertPartitioner(profile);
    }

    @Override
    public Partitioner getInsertPartitioner(WorkloadProfile profile) {
        return this.getUpsertPartitioner(profile);
    }

    @Override
    public boolean isWorkloadProfileNeeded() {
        return true;
    }

    @Override
    public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String commitTime) {
        throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
    }

    @Override
    public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime, HoodieCompactionPlan compactionPlan) {
        throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
    }

    public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        if (!recordItr.hasNext()) {
            LOG.info((Object)("Empty partition with fileId => " + fileId));
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        HoodieMergeHandle upsertHandle = this.getUpdateHandle(commitTime, fileId, recordItr);
        return this.handleUpdateInternal(upsertHandle, commitTime, fileId);
    }

    public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
        HoodieMergeHandle upsertHandle = this.getUpdateHandle(commitTime, fileId, keyToNewRecords, oldDataFile);
        return this.handleUpdateInternal(upsertHandle, commitTime, fileId);
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String commitTime, String fileId) throws IOException {
        if (upsertHandle.getOldFilePath() == null) {
            throw new HoodieUpsertException("Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId);
        }
        AvroReadSupport.setAvroReadSchema(this.getHadoopConf(), upsertHandle.getWriterSchema());
        SparkBoundedInMemoryExecutor<Object, Object, Void> wrapper = null;
        try (ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()).withConf(this.getHadoopConf()).build();){
            wrapper = new SparkBoundedInMemoryExecutor<Object, Object, Void>(this.config, new ParquetReaderIterator(reader), new UpdateHandler(upsertHandle), x -> x);
            wrapper.execute();
        }
        catch (Exception e) {
            throw new HoodieException(e);
        }
        finally {
            upsertHandle.close();
            if (null != wrapper) {
                wrapper.shutdownNow();
            }
        }
        if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
            LOG.info((Object)("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + upsertHandle.getWriteStatus()));
        }
        return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
    }

    protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        return new HoodieMergeHandle<T>(this.config, commitTime, this, recordItr, fileId);
    }

    protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
        return new HoodieMergeHandle<T>(this.config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
    }

    public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception {
        if (!recordItr.hasNext()) {
            LOG.info((Object)"Empty partition");
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        return new CopyOnWriteLazyInsertIterable<T>(recordItr, this.config, commitTime, this, idPfx);
    }

    public Iterator<List<WriteStatus>> handleInsert(String commitTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        HoodieCreateHandle<T> createHandle = new HoodieCreateHandle<T>(this.config, commitTime, this, partitionPath, fileId, recordItr);
        createHandle.write();
        return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
    }

    @Override
    public Iterator<List<WriteStatus>> handleUpsertPartition(String commitTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        UpsertPartitioner upsertPartitioner = (UpsertPartitioner)partitioner;
        BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
        BucketType btype = binfo.bucketType;
        try {
            if (btype.equals((Object)BucketType.INSERT)) {
                return this.handleInsert(commitTime, binfo.fileIdPrefix, recordItr);
            }
            if (btype.equals((Object)BucketType.UPDATE)) {
                return this.handleUpdate(commitTime, binfo.fileIdPrefix, recordItr);
            }
            throw new HoodieUpsertException("Unknown bucketType " + (Object)((Object)btype) + " for partition :" + partition);
        }
        catch (Throwable t) {
            String msg = "Error upserting bucketType " + (Object)((Object)btype) + " for partition :" + partition;
            LOG.error((Object)msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }

    @Override
    public Iterator<List<WriteStatus>> handleInsertPartition(String commitTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        return this.handleUpsertPartition(commitTime, partition, recordItr, partitioner);
    }

    @Override
    public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
        try {
            HoodieCleanHelper cleaner = new HoodieCleanHelper(this, this.config);
            Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
            List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
            if (partitionsToClean.isEmpty()) {
                LOG.info((Object)"Nothing to clean here. It is already clean");
                return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
            }
            LOG.info((Object)("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + (Object)((Object)this.config.getCleanerPolicy())));
            int cleanerParallelism = Math.min(partitionsToClean.size(), this.config.getCleanerParallelism());
            LOG.info((Object)("Using cleanerParallelism: " + cleanerParallelism));
            Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism).map((Function & Serializable)partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths((String)partitionPathToClean))).collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
            return new HoodieCleanerPlan(earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), this.config.getCleanerPolicy().name(), cleanOps, 1);
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to schedule clean operation", e);
        }
    }

    @Override
    public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
        int cleanerParallelism = Math.min((int)cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count(), this.config.getCleanerParallelism());
        LOG.info((Object)("Using cleanerParallelism: " + cleanerParallelism));
        List partitionCleanStats = jsc.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream().flatMap(x -> ((List)x.getValue()).stream().map(y -> new Tuple2(x.getKey(), y))).collect(Collectors.toList()), cleanerParallelism).mapPartitionsToPair(HoodieCopyOnWriteTable.deleteFilesFunc(this)).reduceByKey((Function2 & Serializable)(rec$, x$0) -> ((PartitionCleanStat)rec$).merge((PartitionCleanStat)x$0)).collect();
        Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
        return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
            PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) ? (PartitionCleanStat)partitionCleanStatsMap.get(partitionPath) : new PartitionCleanStat((String)partitionPath);
            HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
            return HoodieCleanStat.newBuilder().withPolicy(this.config.getCleanerPolicy()).withPartitionPath((String)partitionPath).withEarliestCommitRetained(Option.ofNullable(actionInstant != null ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), actionInstant.getAction(), actionInstant.getTimestamp()) : null)).withDeletePathPattern(partitionCleanStat.deletePathPatterns).withSuccessfulDeletes(partitionCleanStat.successDeleteFiles).withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
        }).collect(Collectors.toList());
    }

    @Override
    public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants) throws IOException {
        long startTime = System.currentTimeMillis();
        List<HoodieRollbackStat> stats = new ArrayList<HoodieRollbackStat>();
        HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
        if (instant.isCompleted()) {
            LOG.info((Object)("Unpublishing instant " + instant));
            instant = activeTimeline.revertToInflight(instant);
        }
        if (!instant.isRequested()) {
            String commit = instant.getTimestamp();
            LOG.info((Object)("Clean out all parquet files generated for commit: " + commit));
            List<RollbackRequest> rollbackRequests = this.generateRollbackRequests(instant);
            stats = new RollbackExecutor(this.metaClient, this.config).performRollback(jsc, instant, rollbackRequests);
        }
        this.deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant);
        LOG.info((Object)("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)));
        return stats;
    }

    private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback) throws IOException {
        return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), this.config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)).collect(Collectors.toList());
    }

    protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline, HoodieInstant instantToBeDeleted) {
        this.deleteMarkerDir(instantToBeDeleted.getTimestamp());
        if (deleteInstant) {
            LOG.info((Object)("Deleting instant=" + instantToBeDeleted));
            activeTimeline.deletePending(instantToBeDeleted);
            if (instantToBeDeleted.isInflight() && !this.metaClient.getTimelineLayoutVersion().isNullVersion()) {
                instantToBeDeleted = new HoodieInstant(HoodieInstant.State.REQUESTED, instantToBeDeleted.getAction(), instantToBeDeleted.getTimestamp());
                activeTimeline.deletePending(instantToBeDeleted);
            }
            LOG.info((Object)("Deleted pending commit " + instantToBeDeleted));
        } else {
            LOG.warn((Object)("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted));
        }
    }

    protected HoodieRollingStatMetadata getRollingStats() {
        return null;
    }

    protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
        long avgSize;
        block3: {
            avgSize = defaultRecordSizeEstimate;
            try {
                if (commitTimeline.empty()) break block3;
                Iterator instants = commitTimeline.getReverseOrderedInstants().iterator();
                while (instants.hasNext()) {
                    HoodieInstant instant = (HoodieInstant)instants.next();
                    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
                    long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
                    long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
                    if (totalBytesWritten <= 0L || totalRecordsWritten <= 0L) continue;
                    avgSize = (long)Math.ceil(1.0 * (double)totalBytesWritten / (double)totalRecordsWritten);
                    break;
                }
            }
            catch (Throwable t) {
                LOG.error((Object)"Error trying to compute average bytes/record ", t);
            }
        }
        return avgSize;
    }

    class UpsertPartitioner
    extends Partitioner {
        List<SmallFile> smallFiles = new ArrayList<SmallFile>();
        private int totalBuckets = 0;
        private WorkloadStat globalStat;
        private HashMap<String, Integer> updateLocationToBucket = new HashMap();
        private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets = new HashMap();
        private HashMap<Integer, BucketInfo> bucketInfoMap = new HashMap();
        protected HoodieRollingStatMetadata rollingStatMetadata;

        UpsertPartitioner(WorkloadProfile profile) {
            this.globalStat = profile.getGlobalStat();
            this.rollingStatMetadata = HoodieCopyOnWriteTable.this.getRollingStats();
            this.assignUpdates(profile);
            this.assignInserts(profile);
            LOG.info((Object)("Total Buckets :" + this.totalBuckets + ", buckets info => " + this.bucketInfoMap + ", \nPartition to insert buckets => " + this.partitionPathToInsertBuckets + ", \nUpdateLocations mapped to buckets =>" + this.updateLocationToBucket));
        }

        private void assignUpdates(WorkloadProfile profile) {
            WorkloadStat gStat = profile.getGlobalStat();
            for (Map.Entry<String, Pair<String, Long>> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) {
                this.addUpdateBucket(updateLocEntry.getKey());
            }
        }

        private int addUpdateBucket(String fileIdHint) {
            int bucket = this.totalBuckets;
            this.updateLocationToBucket.put(fileIdHint, bucket);
            BucketInfo bucketInfo = new BucketInfo();
            bucketInfo.bucketType = BucketType.UPDATE;
            bucketInfo.fileIdPrefix = fileIdHint;
            this.bucketInfoMap.put(this.totalBuckets, bucketInfo);
            ++this.totalBuckets;
            return bucket;
        }

        private void assignInserts(WorkloadProfile profile) {
            Set<String> partitionPaths = profile.getPartitionPaths();
            long averageRecordSize = HoodieCopyOnWriteTable.averageBytesPerRecord(HoodieCopyOnWriteTable.this.metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), HoodieCopyOnWriteTable.this.config.getCopyOnWriteRecordSizeEstimate());
            LOG.info((Object)("AvgRecordSize => " + averageRecordSize));
            for (String partitionPath : partitionPaths) {
                WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
                if (pStat.getNumInserts() <= 0L) continue;
                List<SmallFile> smallFiles = this.getSmallFiles(partitionPath);
                LOG.info((Object)("For partitionPath : " + partitionPath + " Small Files => " + smallFiles));
                long totalUnassignedInserts = pStat.getNumInserts();
                ArrayList<Integer> bucketNumbers = new ArrayList<Integer>();
                ArrayList<Long> recordsPerBucket = new ArrayList<Long>();
                for (SmallFile smallFile : smallFiles) {
                    int bucket;
                    long recordsToAppend = Math.min((HoodieCopyOnWriteTable.this.config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);
                    if (recordsToAppend <= 0L || totalUnassignedInserts <= 0L) continue;
                    if (this.updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
                        bucket = this.updateLocationToBucket.get(smallFile.location.getFileId());
                        LOG.info((Object)("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket));
                    } else {
                        bucket = this.addUpdateBucket(smallFile.location.getFileId());
                        LOG.info((Object)("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket));
                    }
                    bucketNumbers.add(bucket);
                    recordsPerBucket.add(recordsToAppend);
                    totalUnassignedInserts -= recordsToAppend;
                }
                if (totalUnassignedInserts > 0L) {
                    long insertRecordsPerBucket = HoodieCopyOnWriteTable.this.config.getCopyOnWriteInsertSplitSize();
                    if (HoodieCopyOnWriteTable.this.config.shouldAutoTuneInsertSplits()) {
                        insertRecordsPerBucket = HoodieCopyOnWriteTable.this.config.getParquetMaxFileSize() / averageRecordSize;
                    }
                    int insertBuckets = (int)Math.ceil(1.0 * (double)totalUnassignedInserts / (double)insertRecordsPerBucket);
                    LOG.info((Object)("After small file assignment: unassignedInserts => " + totalUnassignedInserts + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket));
                    for (int b = 0; b < insertBuckets; ++b) {
                        bucketNumbers.add(this.totalBuckets);
                        recordsPerBucket.add(totalUnassignedInserts / (long)insertBuckets);
                        BucketInfo bucketInfo = new BucketInfo();
                        bucketInfo.bucketType = BucketType.INSERT;
                        bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
                        this.bucketInfoMap.put(this.totalBuckets, bucketInfo);
                        ++this.totalBuckets;
                    }
                }
                ArrayList<InsertBucket> insertBuckets = new ArrayList<InsertBucket>();
                for (int i = 0; i < bucketNumbers.size(); ++i) {
                    InsertBucket bkt = new InsertBucket();
                    bkt.bucketNumber = (Integer)bucketNumbers.get(i);
                    bkt.weight = 1.0 * (double)((Long)recordsPerBucket.get(i)).longValue() / (double)pStat.getNumInserts();
                    insertBuckets.add(bkt);
                }
                LOG.info((Object)("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets));
                this.partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
            }
        }

        protected List<SmallFile> getSmallFiles(String partitionPath) {
            ArrayList<SmallFile> smallFileLocations = new ArrayList<SmallFile>();
            HoodieTimeline commitTimeline = HoodieCopyOnWriteTable.this.getCompletedCommitsTimeline();
            if (!commitTimeline.empty()) {
                HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
                List allFiles = HoodieCopyOnWriteTable.this.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
                for (HoodieBaseFile file : allFiles) {
                    if (file.getFileSize() >= (long)HoodieCopyOnWriteTable.this.config.getParquetSmallFileLimit()) continue;
                    String filename = file.getFileName();
                    SmallFile sf = new SmallFile();
                    sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
                    sf.sizeBytes = file.getFileSize();
                    smallFileLocations.add(sf);
                    this.smallFiles.add(sf);
                }
            }
            return smallFileLocations;
        }

        public BucketInfo getBucketInfo(int bucketNumber) {
            return this.bucketInfoMap.get(bucketNumber);
        }

        public List<InsertBucket> getInsertBuckets(String partitionPath) {
            return this.partitionPathToInsertBuckets.get(partitionPath);
        }

        public int numPartitions() {
            return this.totalBuckets;
        }

        public int getPartition(Object key) {
            Tuple2 keyLocation = (Tuple2)key;
            if (((Option)keyLocation._2()).isPresent()) {
                HoodieRecordLocation location = (HoodieRecordLocation)((Option)keyLocation._2()).get();
                return this.updateLocationToBucket.get(location.getFileId());
            }
            List<InsertBucket> targetBuckets = this.partitionPathToInsertBuckets.get(((HoodieKey)keyLocation._1()).getPartitionPath());
            double totalWeight = 0.0;
            long totalInserts = Math.max(1L, this.globalStat.getNumInserts());
            long hashOfKey = Hashing.md5().hashString((CharSequence)((HoodieKey)keyLocation._1()).getRecordKey(), StandardCharsets.UTF_8).asLong();
            double r = 1.0 * (double)Math.floorMod(hashOfKey, totalInserts) / (double)totalInserts;
            for (InsertBucket insertBucket : targetBuckets) {
                if (!(r <= (totalWeight += insertBucket.weight))) continue;
                return insertBucket.bucketNumber;
            }
            return targetBuckets.get((int)0).bucketNumber;
        }
    }

    class BucketInfo
    implements Serializable {
        BucketType bucketType;
        String fileIdPrefix;

        BucketInfo() {
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("BucketInfo {");
            sb.append("bucketType=").append((Object)this.bucketType).append(", ");
            sb.append("fileIdPrefix=").append(this.fileIdPrefix);
            sb.append('}');
            return sb.toString();
        }
    }

    class InsertBucket
    implements Serializable {
        int bucketNumber;
        double weight;

        InsertBucket() {
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("WorkloadStat {");
            sb.append("bucketNumber=").append(this.bucketNumber).append(", ");
            sb.append("weight=").append(this.weight);
            sb.append('}');
            return sb.toString();
        }
    }

    static class SmallFile
    implements Serializable {
        HoodieRecordLocation location;
        long sizeBytes;

        SmallFile() {
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("SmallFile {");
            sb.append("location=").append(this.location).append(", ");
            sb.append("sizeBytes=").append(this.sizeBytes);
            sb.append('}');
            return sb.toString();
        }
    }

    private static class PartitionCleanStat
    implements Serializable {
        private final String partitionPath;
        private final List<String> deletePathPatterns = new ArrayList<String>();
        private final List<String> successDeleteFiles = new ArrayList<String>();
        private final List<String> failedDeleteFiles = new ArrayList<String>();

        private PartitionCleanStat(String partitionPath) {
            this.partitionPath = partitionPath;
        }

        private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
            if (deletedFileResult.booleanValue()) {
                this.successDeleteFiles.add(deletePathStr);
            } else {
                this.failedDeleteFiles.add(deletePathStr);
            }
        }

        private void addDeleteFilePatterns(String deletePathStr) {
            this.deletePathPatterns.add(deletePathStr);
        }

        private PartitionCleanStat merge(PartitionCleanStat other) {
            if (!this.partitionPath.equals(other.partitionPath)) {
                throw new RuntimeException(String.format("partitionPath is not a match: (%s, %s)", this.partitionPath, other.partitionPath));
            }
            this.successDeleteFiles.addAll(other.successDeleteFiles);
            this.deletePathPatterns.addAll(other.deletePathPatterns);
            this.failedDeleteFiles.addAll(other.failedDeleteFiles);
            return this;
        }
    }

    private static class UpdateHandler
    extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
        private final HoodieMergeHandle upsertHandle;

        private UpdateHandler(HoodieMergeHandle upsertHandle) {
            this.upsertHandle = upsertHandle;
        }

        @Override
        protected void consumeOneRecord(GenericRecord record) {
            this.upsertHandle.write(record);
        }

        @Override
        protected void finish() {
        }

        @Override
        protected Void getResult() {
            return null;
        }
    }

    static enum BucketType {
        UPDATE,
        INSERT;

    }
}

