/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TableFormatCompletionAction;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantGeneratorV2;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusteringUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ClusteringUtils.class);
    public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB";
    public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE";
    public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES";

    public static Stream<Pair<HoodieInstant, HoodieClusteringPlan>> getAllPendingClusteringPlans(HoodieTableMetaClient metaClient) {
        List<HoodieInstant> pendingClusterInstants = metaClient.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants();
        return pendingClusterInstants.stream().map(instant -> ClusteringUtils.getClusteringPlan(metaClient, instant)).filter(Option::isPresent).map(Option::get);
    }

    public static Option<HoodieInstant> getInflightClusteringInstant(String timestamp, HoodieActiveTimeline activeTimeline, InstantGenerator factory) {
        HoodieInstant inflightInstant;
        HoodieTimeline pendingReplaceOrClusterTimeline = activeTimeline.filterPendingReplaceOrClusteringTimeline();
        if (pendingReplaceOrClusterTimeline.containsInstant(inflightInstant = factory.getClusteringCommitInflightInstant(timestamp))) {
            return Option.of((Object)inflightInstant);
        }
        inflightInstant = factory.getReplaceCommitInflightInstant(timestamp);
        return Option.ofNullable((Object)(pendingReplaceOrClusterTimeline.containsInstant(inflightInstant) ? inflightInstant : null));
    }

    public static Option<HoodieInstant> getRequestedClusteringInstant(String timestamp, HoodieActiveTimeline activeTimeline, InstantGenerator factory) {
        HoodieTimeline pendingReplaceOrClusterTimeline = activeTimeline.filterPendingReplaceOrClusteringTimeline();
        HoodieInstant requestedInstant = factory.getClusteringCommitRequestedInstant(timestamp);
        if (pendingReplaceOrClusterTimeline.containsInstant(timestamp)) {
            return Option.of((Object)requestedInstant);
        }
        requestedInstant = factory.getReplaceCommitRequestedInstant(timestamp);
        return Option.ofNullable((Object)(pendingReplaceOrClusterTimeline.containsInstant(requestedInstant) ? requestedInstant : null));
    }

    public static <T> void transitionClusteringOrReplaceInflightToComplete(boolean shouldLock, HoodieInstant clusteringInstant, HoodieReplaceCommitMetadata metadata, HoodieActiveTimeline activeTimeline, TableFormatCompletionAction tableFormatCompletionAction) {
        if (clusteringInstant.getAction().equals("clustering")) {
            activeTimeline.transitionClusterInflightToComplete(shouldLock, clusteringInstant, metadata, tableFormatCompletionAction);
        } else {
            activeTimeline.transitionReplaceInflightToComplete(shouldLock, clusteringInstant, metadata, tableFormatCompletionAction);
        }
    }

    public static <T> void transitionClusteringOrReplaceRequestedToInflight(HoodieInstant requestedClusteringInstant, Option<T> metadata, HoodieActiveTimeline activeTimeline) {
        if (requestedClusteringInstant.getAction().equals("clustering")) {
            activeTimeline.transitionClusterRequestedToInflight(requestedClusteringInstant, metadata);
        } else {
            activeTimeline.transitionReplaceRequestedToInflight(requestedClusteringInstant, metadata);
        }
    }

    public static boolean isClusteringOrReplaceCommitAction(String actionType) {
        return actionType.equals("clustering") || actionType.equals("replacecommit");
    }

    public static boolean isClusteringInstant(HoodieTimeline timeline, HoodieInstant replaceInstant, InstantGenerator factory) {
        return replaceInstant.getAction().equals("clustering") || replaceInstant.getAction().equals("replacecommit") && ClusteringUtils.getClusteringPlan(timeline, replaceInstant, factory).isPresent();
    }

    private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, InstantGenerator factory) throws IOException {
        HoodieInstant requestedInstant;
        if (pendingReplaceOrClusterInstant.isInflight()) {
            requestedInstant = factory.createNewInstant(HoodieInstant.State.REQUESTED, pendingReplaceOrClusterInstant.getAction(), pendingReplaceOrClusterInstant.requestedTime());
        } else if (pendingReplaceOrClusterInstant.isRequested()) {
            requestedInstant = pendingReplaceOrClusterInstant;
        } else {
            String action = factory instanceof InstantGeneratorV2 ? "clustering" : "replacecommit";
            requestedInstant = factory.createNewInstant(HoodieInstant.State.REQUESTED, action, pendingReplaceOrClusterInstant.requestedTime());
        }
        try {
            return ClusteringUtils.getRequestedReplaceMetadataOption(timeline, pendingReplaceOrClusterInstant, factory, requestedInstant);
        }
        catch (Exception ex) {
            if (ClusteringUtils.isEmptyReplaceOrClusteringInstant(timeline, pendingReplaceOrClusterInstant, factory, requestedInstant)) {
                return Option.empty();
            }
            throw ex;
        }
    }

    private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadataOption(HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, InstantGenerator factory, HoodieInstant requestedInstant) throws IOException {
        try {
            return Option.of((Object)((Object)timeline.readRequestedReplaceMetadata(requestedInstant)));
        }
        catch (HoodieIOException e) {
            if (e.getCause() instanceof FileNotFoundException && pendingReplaceOrClusterInstant.isCompleted()) {
                requestedInstant = factory.createNewInstant(HoodieInstant.State.REQUESTED, "replacecommit", pendingReplaceOrClusterInstant.requestedTime());
                return Option.of((Object)((Object)timeline.readRequestedReplaceMetadata(requestedInstant)));
            }
            throw e;
        }
    }

    private static boolean isEmptyReplaceOrClusteringInstant(HoodieTimeline timeline, HoodieInstant pendingReplaceOrClusterInstant, InstantGenerator instantGenerator, HoodieInstant requestedInstant) {
        try {
            return timeline.isEmpty(requestedInstant);
        }
        catch (HoodieIOException e) {
            if (e.getCause() instanceof FileNotFoundException && pendingReplaceOrClusterInstant.isCompleted()) {
                requestedInstant = instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, "replacecommit", pendingReplaceOrClusterInstant.requestedTime());
                return timeline.isEmpty(requestedInstant);
            }
            throw e;
        }
    }

    public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) {
        return ClusteringUtils.getClusteringPlan(metaClient.getActiveTimeline(), pendingReplaceInstant, metaClient.getInstantGenerator());
    }

    public static HoodieClusteringPlan getPendingClusteringPlan(HoodieTableMetaClient metaClient, String clusteringCommitTime) {
        return (HoodieClusteringPlan)((Object)ClusteringUtils.getClusteringPlan(metaClient, (HoodieInstant)ClusteringUtils.getRequestedClusteringInstant(clusteringCommitTime, metaClient.getActiveTimeline(), metaClient.getInstantGenerator()).get()).map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + clusteringCommitTime)));
    }

    public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant, InstantGenerator factory) {
        try {
            Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = ClusteringUtils.getRequestedReplaceMetadata(timeline, pendingReplaceInstant, factory);
            if (requestedReplaceMetadata.isPresent() && WriteOperationType.CLUSTER.name().equals(((HoodieRequestedReplaceMetadata)((Object)requestedReplaceMetadata.get())).getOperationType())) {
                return Option.of(Pair.of(pendingReplaceInstant, ((HoodieRequestedReplaceMetadata)((Object)requestedReplaceMetadata.get())).getClusteringPlan()));
            }
            return Option.empty();
        }
        catch (IOException e) {
            throw new HoodieIOException("Error reading clustering plan " + pendingReplaceInstant.requestedTime(), e);
        }
    }

    public static Map<HoodieFileGroupId, HoodieInstant> getAllFileGroupsInPendingClusteringPlans(HoodieTableMetaClient metaClient) {
        Map<HoodieFileGroupId, HoodieInstant> resultMap;
        Stream<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans(metaClient);
        Stream<Map<HoodieFileGroupId, HoodieInstant>> resultStream = pendingClusteringPlans.flatMap(clusteringPlan -> ClusteringUtils.getFileGroupEntriesInClusteringPlan((HoodieInstant)clusteringPlan.getLeft(), (HoodieClusteringPlan)((Object)((Object)clusteringPlan.getRight()))));
        try {
            resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        }
        catch (Exception e) {
            if (e instanceof IllegalStateException && e.getMessage().contains("Duplicate key")) {
                throw new HoodieException("Found duplicate file groups pending clustering. If you're running deltastreamer in continuous mode, consider adding delay using --min-sync-interval-seconds. Or consider setting write concurrency mode to OPTIMISTIC_CONCURRENCY_CONTROL.", (Throwable)e);
            }
            throw new HoodieException("Error getting all file groups in pending clustering", (Throwable)e);
        }
        LOG.info("Found " + resultMap.size() + " files in pending clustering operations");
        return resultMap;
    }

    public static Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClusteringInstant(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
        Stream<Pair> partitionToFileIdLists = clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
        return partitionToFileIdLists.map(e -> Pair.of(e, instant));
    }

    private static Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> getFileGroupEntriesInClusteringPlan(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
        return ClusteringUtils.getFileGroupsInPendingClusteringInstant(instant, clusteringPlan).map(entry -> new AbstractMap.SimpleEntry(entry.getLeft(), entry.getRight()));
    }

    public static Stream<HoodieFileGroupId> getFileGroupsFromClusteringPlan(HoodieClusteringPlan clusteringPlan) {
        return clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
    }

    public static Stream<HoodieFileGroupId> getFileGroupsFromClusteringGroup(HoodieClusteringGroup group) {
        return group.getSlices().stream().map(slice -> new HoodieFileGroupId(slice.getPartitionPath(), slice.getFileId()));
    }

    public static HoodieClusteringPlan createClusteringPlan(String strategyClassName, Map<String, String> strategyParams, List<FileSlice>[] fileSliceGroups, Map<String, String> extraMetadata) {
        List<HoodieClusteringGroup> clusteringGroups = Arrays.stream(fileSliceGroups).map(fileSliceGroup -> {
            Map<String, Double> groupMetrics = ClusteringUtils.buildMetrics(fileSliceGroup);
            List<HoodieSliceInfo> sliceInfos = ClusteringUtils.getFileSliceInfo(fileSliceGroup);
            return HoodieClusteringGroup.newBuilder().setSlices(sliceInfos).setMetrics(groupMetrics).build();
        }).collect(Collectors.toList());
        HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder().setStrategyClassName(strategyClassName).setStrategyParams(strategyParams).build();
        return HoodieClusteringPlan.newBuilder().setInputGroups(clusteringGroups).setExtraMetadata(extraMetadata).setStrategy(strategy).setPreserveHoodieMetadata(true).build();
    }

    private static List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) {
        return slices.stream().map(slice -> HoodieSliceInfo.newBuilder().setPartitionPath(slice.getPartitionPath()).setFileId(slice.getFileId()).setDataFilePath((String)slice.getBaseFile().map(BaseFile::getPath).orElse(null)).setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().getName()).collect(Collectors.toList())).setBootstrapFilePath((String)slice.getBaseFile().map(bf -> (String)bf.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)).orElse(null)).build()).collect(Collectors.toList());
    }

    private static Map<String, Double> buildMetrics(List<FileSlice> fileSlices) {
        int numLogFiles = 0;
        long totalLogFileSize = 0L;
        long totalIORead = 0L;
        for (FileSlice slice : fileSlices) {
            numLogFiles = (int)((long)numLogFiles + slice.getLogFiles().count());
            totalIORead = FSUtils.getSizeInMB((slice.getBaseFile().isPresent() ? ((HoodieBaseFile)slice.getBaseFile().get()).getFileSize() : 0L) + (totalLogFileSize += slice.getLogFiles().map(HoodieLogFile::getFileSize).filter(size -> size >= 0L).reduce(Long::sum).orElse(0L).longValue()));
        }
        HashMap<String, Double> metrics = new HashMap<String, Double>();
        metrics.put(TOTAL_IO_READ_MB, Double.valueOf(totalIORead));
        metrics.put(TOTAL_LOG_FILE_SIZE, Double.valueOf(totalLogFileSize));
        metrics.put(TOTAL_LOG_FILES, Double.valueOf(numLogFiles));
        return metrics;
    }

    public static List<HoodieInstant> getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) {
        InstantGenerator factory = metaClient.getInstantGenerator();
        return metaClient.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstantsAsStream().filter(instant -> ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(), instant, factory)).collect(Collectors.toList());
    }

    public static Option<HoodieInstant> getEarliestInstantToRetainForClustering(HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient, HoodieCleaningPolicy cleanerPolicy) throws IOException {
        Option<HoodieInstant> oldestInstantToRetain = Option.empty();
        HoodieTimeline replaceOrClusterTimeline = activeTimeline.getTimelineOfActions(CollectionUtils.createSet("replacecommit", "clustering"));
        if (!replaceOrClusterTimeline.empty()) {
            Option<HoodieInstant> cleanInstantOpt = activeTimeline.getCleanerTimeline().filterCompletedInstants().lastInstant();
            if (cleanInstantOpt.isPresent()) {
                HoodieInstant cleanInstant = (HoodieInstant)cleanInstantOpt.get();
                HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, cleanInstant);
                Option earliestInstantToRetain = Option.ofNullable((Object)((Object)cleanerPlan.getEarliestInstantToRetain())).map(HoodieActionInstant::getTimestamp);
                Option<String> earliestReplacedSavepointInClean = ClusteringUtils.getEarliestReplacedSavepointInClean(activeTimeline, cleanerPolicy, cleanerPlan);
                String retainLowerBound = earliestReplacedSavepointInClean.isPresent() ? (String)earliestReplacedSavepointInClean.get() : (earliestInstantToRetain.isPresent() && !StringUtils.isNullOrEmpty((String)((String)earliestInstantToRetain.get())) ? (String)earliestInstantToRetain.get() : cleanInstant.requestedTime());
                oldestInstantToRetain = replaceOrClusterTimeline.findInstantsAfterOrEquals(retainLowerBound).firstInstant();
            } else {
                oldestInstantToRetain = replaceOrClusterTimeline.firstInstant();
            }
        }
        return oldestInstantToRetain;
    }

    public static Option<String> getEarliestReplacedSavepointInClean(HoodieActiveTimeline activeTimeline, HoodieCleaningPolicy cleanerPolicy, HoodieCleanerPlan cleanerPlan) {
        HoodieTimeline replaceTimeline;
        String earliestInstantToRetain = (String)Option.ofNullable((Object)((Object)cleanerPlan.getEarliestInstantToRetain())).map(HoodieActionInstant::getTimestamp).orElse(null);
        Option savepoints = Option.ofNullable(cleanerPlan.getExtraMetadata()).map(metadata -> metadata.getOrDefault("savepointed_timestamps", "").split(","));
        String earliestSavepoint = (String)savepoints.flatMap(arr -> Option.fromJavaOptional(Arrays.stream(arr).sorted().findFirst())).orElse(null);
        if (cleanerPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS && !StringUtils.isNullOrEmpty((String)earliestInstantToRetain) && !StringUtils.isNullOrEmpty((String)earliestSavepoint) && InstantComparison.compareTimestamps(earliestInstantToRetain, InstantComparison.GREATER_THAN, earliestSavepoint) && !(replaceTimeline = activeTimeline.getCompletedReplaceTimeline().findInstantsInClosedRange(earliestSavepoint, earliestInstantToRetain)).empty()) {
            return Option.of((Object)earliestSavepoint);
        }
        return Option.empty();
    }

    public static boolean isCompletedClusteringInstant(HoodieInstant instant, HoodieTimeline timeline) {
        if (!instant.getAction().equals("replacecommit")) {
            return false;
        }
        try {
            return TimelineUtils.getCommitMetadata(instant, timeline).getOperationType().equals((Object)WriteOperationType.CLUSTER);
        }
        catch (IOException e) {
            throw new HoodieException("Resolve replace commit metadata error for instant: " + instant, (Throwable)e);
        }
    }

    public static boolean isInsertOverwriteInstant(HoodieInstant instant, HoodieTimeline timeline) {
        if (!instant.getAction().equals("replacecommit")) {
            return false;
        }
        try {
            WriteOperationType opType = TimelineUtils.getCommitMetadata(instant, timeline).getOperationType();
            return opType.equals((Object)WriteOperationType.INSERT_OVERWRITE) || opType.equals((Object)WriteOperationType.INSERT_OVERWRITE_TABLE);
        }
        catch (IOException e) {
            throw new HoodieException("Resolve replace commit metadata error for instant: " + instant, (Throwable)e);
        }
    }

    public static Option<HoodieFileReader> getBaseFileReader(HoodieStorage storage, HoodieRecord.HoodieRecordType recordType, HoodieConfig config, String dataFilePath) {
        if (StringUtils.isNullOrEmpty((String)dataFilePath)) {
            return Option.empty();
        }
        try {
            return Option.of((Object)HoodieIOFactory.getIOFactory(storage).getReaderFactory(recordType).getFileReader(config, new StoragePath(dataFilePath)));
        }
        catch (IOException e) {
            throw new HoodieIOException("Error reading base file " + dataFilePath, e);
        }
    }
}

