/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class ClusteringUtils {
    private static final Logger LOG = LogManager.getLogger(ClusteringUtils.class);
    public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB";
    public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE";
    public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES";

    public static Stream<Pair<HoodieInstant, HoodieClusteringPlan>> getAllPendingClusteringPlans(HoodieTableMetaClient metaClient) {
        List<HoodieInstant> pendingReplaceInstants = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
        return pendingReplaceInstants.stream().map(instant -> ClusteringUtils.getClusteringPlan(metaClient, instant)).filter(Option::isPresent).map(Option::get);
    }

    private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) throws IOException {
        HoodieInstant requestedInstant = !pendingReplaceInstant.isRequested() ? HoodieTimeline.getReplaceCommitRequestedInstant(pendingReplaceInstant.getTimestamp()) : pendingReplaceInstant;
        Option<byte[]> content = metaClient.getActiveTimeline().getInstantDetails(requestedInstant);
        if (!content.isPresent() || content.get().length == 0) {
            return Option.empty();
        }
        return Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(content.get()));
    }

    public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) {
        try {
            Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = ClusteringUtils.getRequestedReplaceMetadata(metaClient, pendingReplaceInstant);
            if (requestedReplaceMetadata.isPresent() && WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.get().getOperationType())) {
                return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.get().getClusteringPlan()));
            }
            return Option.empty();
        }
        catch (IOException e) {
            throw new HoodieIOException("Error reading clustering plan " + pendingReplaceInstant.getTimestamp(), e);
        }
    }

    public static Map<HoodieFileGroupId, HoodieInstant> getAllFileGroupsInPendingClusteringPlans(HoodieTableMetaClient metaClient) {
        Map<HoodieFileGroupId, HoodieInstant> resultMap;
        Stream<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans(metaClient);
        Stream<Map<HoodieFileGroupId, HoodieInstant>> resultStream = pendingClusteringPlans.flatMap(clusteringPlan -> ClusteringUtils.getFileGroupEntriesInClusteringPlan((HoodieInstant)clusteringPlan.getLeft(), (HoodieClusteringPlan)((Object)((Object)clusteringPlan.getRight()))));
        try {
            resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        }
        catch (Exception e) {
            if (e instanceof IllegalStateException && e.getMessage().contains("Duplicate key")) {
                throw new HoodieException("Found duplicate file groups pending clustering. If you're running deltastreamer in continuous mode, consider adding delay using --min-sync-interval-seconds. Or consider setting write concurrency mode to optimistic_concurrency_control.", e);
            }
            throw new HoodieException("Error getting all file groups in pending clustering", e);
        }
        LOG.info((Object)("Found " + resultMap.size() + " files in pending clustering operations"));
        return resultMap;
    }

    public static Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClusteringInstant(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
        Stream<Pair> partitionToFileIdLists = clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
        return partitionToFileIdLists.map(e -> Pair.of(e, instant));
    }

    private static Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> getFileGroupEntriesInClusteringPlan(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
        return ClusteringUtils.getFileGroupsInPendingClusteringInstant(instant, clusteringPlan).map(entry -> new AbstractMap.SimpleEntry(entry.getLeft(), entry.getRight()));
    }

    public static Stream<HoodieFileGroupId> getFileGroupsFromClusteringPlan(HoodieClusteringPlan clusteringPlan) {
        return clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
    }

    public static Stream<HoodieFileGroupId> getFileGroupsFromClusteringGroup(HoodieClusteringGroup group) {
        return group.getSlices().stream().map(slice -> new HoodieFileGroupId(slice.getPartitionPath(), slice.getFileId()));
    }

    public static HoodieClusteringPlan createClusteringPlan(String strategyClassName, Map<String, String> strategyParams, List<FileSlice>[] fileSliceGroups, Map<String, String> extraMetadata) {
        List<HoodieClusteringGroup> clusteringGroups = Arrays.stream(fileSliceGroups).map(fileSliceGroup -> {
            Map<String, Double> groupMetrics = ClusteringUtils.buildMetrics(fileSliceGroup);
            List<HoodieSliceInfo> sliceInfos = ClusteringUtils.getFileSliceInfo(fileSliceGroup);
            return HoodieClusteringGroup.newBuilder().setSlices(sliceInfos).setMetrics(groupMetrics).build();
        }).collect(Collectors.toList());
        HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder().setStrategyClassName(strategyClassName).setStrategyParams(strategyParams).build();
        return HoodieClusteringPlan.newBuilder().setInputGroups(clusteringGroups).setExtraMetadata(extraMetadata).setStrategy(strategy).build();
    }

    private static List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) {
        return slices.stream().map(slice -> HoodieSliceInfo.newBuilder().setPartitionPath(slice.getPartitionPath()).setFileId(slice.getFileId()).setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(null)).setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().getName()).collect(Collectors.toList())).setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)).orElse(null)).build()).collect(Collectors.toList());
    }

    private static Map<String, Double> buildMetrics(List<FileSlice> fileSlices) {
        int numLogFiles = 0;
        long totalLogFileSize = 0L;
        long totalIORead = 0L;
        for (FileSlice slice : fileSlices) {
            numLogFiles = (int)((long)numLogFiles + slice.getLogFiles().count());
            totalIORead = FSUtils.getSizeInMB((slice.getBaseFile().isPresent() ? slice.getBaseFile().get().getFileSize() : 0L) + (totalLogFileSize += slice.getLogFiles().map(HoodieLogFile::getFileSize).filter(size -> size >= 0L).reduce(Long::sum).orElse(0L).longValue()));
        }
        HashMap<String, Double> metrics = new HashMap<String, Double>();
        metrics.put(TOTAL_IO_READ_MB, Double.valueOf(totalIORead));
        metrics.put(TOTAL_LOG_FILE_SIZE, Double.valueOf(totalLogFileSize));
        metrics.put(TOTAL_LOG_FILES, Double.valueOf(numLogFiles));
        return metrics;
    }

    public static List<HoodieInstant> getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) {
        return metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstantsAsStream().filter(instant -> ClusteringUtils.isPendingClusteringInstant(metaClient, instant)).collect(Collectors.toList());
    }

    public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
        return ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent();
    }

    public static Option<HoodieInstant> getOldestInstantToRetainForClustering(HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException {
        Option<HoodieInstant> oldestInstantToRetain = Option.empty();
        HoodieTimeline replaceTimeline = activeTimeline.getTimelineOfActions(CollectionUtils.createSet("replacecommit"));
        if (!replaceTimeline.empty()) {
            Option<HoodieInstant> beforePendingInstant;
            Option<HoodieInstant> cleanInstantOpt = activeTimeline.getCleanerTimeline().filterCompletedInstants().lastInstant();
            if (cleanInstantOpt.isPresent()) {
                HoodieInstant cleanInstant = cleanInstantOpt.get();
                HoodieActionInstant earliestInstantToRetain = CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested() ? cleanInstant : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).getEarliestInstantToRetain();
                String retainLowerBound = earliestInstantToRetain != null && !StringUtils.isNullOrEmpty(earliestInstantToRetain.getTimestamp()) ? earliestInstantToRetain.getTimestamp() : cleanInstant.getTimestamp();
                oldestInstantToRetain = replaceTimeline.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, retainLowerBound)).firstInstant();
            } else {
                oldestInstantToRetain = replaceTimeline.firstInstant();
            }
            Option<HoodieInstant> pendingInstantOpt = replaceTimeline.filterInflights().firstInstant();
            if (pendingInstantOpt.isPresent() && (beforePendingInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants().findInstantsBefore(pendingInstantOpt.get().getTimestamp()).lastInstant()).isPresent() && oldestInstantToRetain.map(instant -> instant.compareTo((HoodieInstant)beforePendingInstant.get()) > 0).orElse(true).booleanValue()) {
                oldestInstantToRetain = beforePendingInstant;
            }
        }
        return oldestInstantToRetain;
    }
}

