/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.rollback;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ListingBasedRollbackStrategy
implements BaseRollbackPlanActionExecutor.RollbackStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(ListingBasedRollbackStrategy.class);
    protected final HoodieTable<?, ?, ?, ?> table;
    protected final transient HoodieEngineContext context;
    protected final HoodieWriteConfig config;
    protected final String instantTime;
    protected final Boolean isRestore;

    public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime, boolean isRestore) {
        this.table = table;
        this.context = context;
        this.config = config;
        this.instantTime = instantTime;
        this.isRestore = isRestore;
    }

    @Override
    public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRollback) {
        try {
            HoodieTableMetaClient metaClient = this.table.getMetaClient();
            boolean isTableVersionLessThanEight = metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT);
            List partitionPaths = FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.table.getStorage(), (StoragePath)this.table.getMetaClient().getBasePath(), (boolean)false);
            int numPartitions = Math.max(Math.min(partitionPaths.size(), this.config.getRollbackParallelism()), 1);
            this.context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan: " + this.config.getTableName());
            HoodieTableType tableType = this.table.getMetaClient().getTableType();
            String baseFileExtension = this.table.getBaseFileExtension();
            Option commitMetadataOptional = MetadataConversionUtils.getHoodieCommitMetadata((HoodieTableMetaClient)metaClient, (HoodieInstant)instantToRollback);
            Boolean isCommitMetadataCompleted = this.checkCommitMetadataCompleted(instantToRollback, (Option<HoodieCommitMetadata>)commitMetadataOptional);
            AtomicBoolean isCompaction = new AtomicBoolean(false);
            if (commitMetadataOptional.isPresent()) {
                isCompaction.set(((HoodieCommitMetadata)commitMetadataOptional.get()).getOperationType() == WriteOperationType.COMPACT);
            }
            AtomicBoolean isLogCompaction = new AtomicBoolean(false);
            if (commitMetadataOptional.isPresent()) {
                isLogCompaction.set(((HoodieCommitMetadata)commitMetadataOptional.get()).getOperationType() == WriteOperationType.LOG_COMPACT);
            }
            return this.context.flatMap(partitionPaths, (SerializableFunction & Serializable)partitionPath -> {
                ArrayList<HoodieRollbackRequest> hoodieRollbackRequests;
                block18: {
                    block19: {
                        Supplier<List> filesToDelete;
                        block17: {
                            hoodieRollbackRequests = new ArrayList<HoodieRollbackRequest>(partitionPaths.size());
                            filesToDelete = () -> {
                                try {
                                    return this.fetchFilesFromInstant(instantToRollback, (String)partitionPath, metaClient.getBasePath().toString(), baseFileExtension, metaClient.getStorage(), (Option<HoodieCommitMetadata>)commitMetadataOptional, isCommitMetadataCompleted, tableType);
                                }
                                catch (IOException e) {
                                    throw new HoodieIOException("Fetching files to delete error", e);
                                }
                            };
                            if (HoodieTableType.COPY_ON_WRITE != tableType) break block17;
                            hoodieRollbackRequests.addAll(this.getHoodieRollbackRequests((String)partitionPath, filesToDelete.get()));
                            break block18;
                        }
                        if (HoodieTableType.MERGE_ON_READ != tableType) break block19;
                        this.table.getMetaClient().reloadActiveTimeline();
                        String action = instantToRollback.getAction();
                        if (isCompaction.get()) {
                            action = "compaction";
                        }
                        if (isLogCompaction.get()) {
                            action = "logcompaction";
                        }
                        switch (action) {
                            case "commit": 
                            case "replacecommit": 
                            case "clustering": {
                                hoodieRollbackRequests.addAll(this.getHoodieRollbackRequests((String)partitionPath, filesToDelete.get()));
                                break;
                            }
                            case "compaction": {
                                if (!this.isRestore.booleanValue()) {
                                    hoodieRollbackRequests.addAll(this.getHoodieRollbackRequests((String)partitionPath, this.listBaseFilesToBeDeleted(instantToRollback.requestedTime(), baseFileExtension, (String)partitionPath, metaClient.getStorage())));
                                    break;
                                }
                                hoodieRollbackRequests.addAll(this.getHoodieRollbackRequests((String)partitionPath, isTableVersionLessThanEight ? filesToDelete.get() : this.listAllFilesSinceCommit(instantToRollback.requestedTime(), baseFileExtension, (String)partitionPath, metaClient)));
                                break;
                            }
                            case "deltacommit": 
                            case "logcompaction": {
                                HoodieCommitMetadata commitMetadata;
                                hoodieRollbackRequests.addAll(this.getHoodieRollbackRequests((String)partitionPath, filesToDelete.get()));
                                if (isTableVersionLessThanEight && (commitMetadata = (HoodieCommitMetadata)commitMetadataOptional.get()).getPartitionToWriteStats().containsKey(partitionPath)) {
                                    hoodieRollbackRequests.addAll(ListingBasedRollbackStrategy.getRollbackRequestToAppendForVersionSix(partitionPath, instantToRollback, commitMetadata, this.table));
                                    break;
                                }
                                break block18;
                            }
                            default: {
                                throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback);
                            }
                        }
                        break block18;
                    }
                    throw new HoodieRollbackException(String.format("Unsupported table type: %s, during listing rollback of %s", tableType, instantToRollback));
                }
                return hoodieRollbackRequests.stream();
            }, numPartitions);
        }
        catch (Exception e) {
            LOG.error("Generating rollback requests failed for " + instantToRollback.requestedTime(), (Throwable)e);
            throw new HoodieRollbackException("Generating rollback requests failed for " + instantToRollback.requestedTime(), e);
        }
    }

    public static List<HoodieRollbackRequest> getRollbackRequestToAppendForVersionSix(String partitionPath, HoodieInstant rollbackInstant, HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
        ArrayList<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<HoodieRollbackRequest>();
        ValidationUtils.checkArgument((boolean)table.version().lesserThan(HoodieTableVersion.EIGHT));
        ValidationUtils.checkArgument((boolean)rollbackInstant.getAction().equals("deltacommit"));
        Map latestFileSlices = table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.requestedTime(), true).collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
        List hoodieWriteStats = (List)Option.ofNullable(commitMetadata.getPartitionToWriteStats().get(partitionPath)).orElse(Collections.emptyList());
        hoodieWriteStats = hoodieWriteStats.stream().filter(writeStat -> {
            boolean validForRollback;
            boolean bl = validForRollback = writeStat != null && !writeStat.getPrevCommit().equals("null") && writeStat.getPrevCommit() != null && latestFileSlices.containsKey(writeStat.getFileId());
            if (!validForRollback) {
                return false;
            }
            FileSlice latestFileSlice = (FileSlice)latestFileSlices.get(writeStat.getFileId());
            ValidationUtils.checkArgument((boolean)InstantComparison.compareTimestamps((String)latestFileSlice.getBaseInstantTime(), (BiPredicate)InstantComparison.LESSER_THAN_OR_EQUALS, (String)rollbackInstant.requestedTime()), (String)"Log-file base-instant could not be less than the instant being rolled back");
            return InstantComparison.compareTimestamps((String)latestFileSlice.getBaseInstantTime(), (BiPredicate)InstantComparison.LESSER_THAN, (String)rollbackInstant.requestedTime());
        }).collect(Collectors.toList());
        for (HoodieWriteStat writeStat2 : hoodieWriteStats.stream().filter(hoodieWriteStat -> !StringUtils.isNullOrEmpty((String)hoodieWriteStat.getFileId())).collect(Collectors.toList())) {
            FileSlice latestFileSlice = (FileSlice)latestFileSlices.get(writeStat2.getFileId());
            String fileId = writeStat2.getFileId();
            String latestBaseInstant = latestFileSlice.getBaseInstantTime();
            Path fullLogFilePath = HadoopFSUtils.constructAbsolutePathInHadoopPath((String)table.getConfig().getBasePath(), (String)writeStat2.getPath());
            Map<String, Long> logFilesWithBlocksToRollback = Collections.singletonMap(fullLogFilePath.toString(), writeStat2.getTotalWriteBytes() > 0L ? writeStat2.getTotalWriteBytes() : 1L);
            hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant, Collections.emptyList(), logFilesWithBlocksToRollback));
        }
        return hoodieRollbackRequests;
    }

    private List<StoragePathInfo> listAllFilesSinceCommit(String commit, String baseFileExtension, String partitionPath, HoodieTableMetaClient metaClient) throws IOException {
        LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
        CompletionTimeQueryView completionTimeQueryView = metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient);
        StoragePathFilter & Serializable filter = (StoragePathFilter & Serializable)path -> {
            if (path.toString().contains(baseFileExtension)) {
                String fileCommitTime = FSUtils.getCommitTime((String)path.getName());
                return InstantComparison.compareTimestamps((String)commit, (BiPredicate)InstantComparison.LESSER_THAN_OR_EQUALS, (String)fileCommitTime);
            }
            if (FSUtils.isLogFile((StoragePath)path)) {
                String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath((StoragePath)path);
                return completionTimeQueryView.isSlicedAfterOrOn(commit, fileCommitTime);
            }
            return false;
        };
        return metaClient.getStorage().listDirectEntries(FSUtils.constructAbsolutePath((String)this.config.getBasePath(), (String)partitionPath), (StoragePathFilter)filter);
    }

    @NotNull
    private List<HoodieRollbackRequest> getHoodieRollbackRequests(String partitionPath, List<StoragePathInfo> filesToDeletedStatus) {
        return filesToDeletedStatus.stream().map(pathInfo -> {
            String dataFileToBeDeleted = pathInfo.getPath().toString();
            return ListingBasedRollbackStrategy.formatDeletePath(dataFileToBeDeleted);
        }).map(s -> new HoodieRollbackRequest(partitionPath, "", "", Collections.singletonList(s), Collections.emptyMap())).collect(Collectors.toList());
    }

    private static String formatDeletePath(String path) {
        return path.substring(path.indexOf(":") + 1);
    }

    private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit, String basefileExtension, String partitionPath, HoodieStorage storage) throws IOException {
        LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
        StoragePathFilter & Serializable filter = (StoragePathFilter & Serializable)path -> {
            if (path.toString().contains(basefileExtension)) {
                String fileCommitTime = FSUtils.getCommitTime((String)path.getName());
                return commit.equals(fileCommitTime);
            }
            return false;
        };
        return storage.listDirectEntries(FSUtils.constructAbsolutePath((String)this.config.getBasePath(), (String)partitionPath), (StoragePathFilter)filter);
    }

    private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant instantToRollback, String partitionPath, String basePath, String baseFileExtension, HoodieStorage storage, Option<HoodieCommitMetadata> commitMetadataOptional, Boolean isCommitMetadataCompleted, HoodieTableType tableType) throws IOException {
        if (isCommitMetadataCompleted.booleanValue() && tableType == HoodieTableType.COPY_ON_WRITE) {
            return this.fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, (HoodieCommitMetadata)commitMetadataOptional.get(), baseFileExtension, storage);
        }
        return this.fetchFilesFromListFiles(instantToRollback, partitionPath, basePath, baseFileExtension, storage);
    }

    private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant instantToRollback, String partitionPath, String basePath, HoodieCommitMetadata commitMetadata, String baseFileExtension, HoodieStorage storage) throws IOException {
        StoragePathFilter pathFilter = ListingBasedRollbackStrategy.getPathFilter(baseFileExtension, instantToRollback.requestedTime());
        List filePaths = ListingBasedRollbackStrategy.getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath).filter(entry -> {
            try {
                return storage.exists(entry);
            }
            catch (Exception e) {
                LOG.error("Exists check failed for " + entry.toString(), (Throwable)e);
                return true;
            }
        }).collect(Collectors.toList());
        return storage.listDirectEntries(filePaths, pathFilter);
    }

    private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant instantToRollback, String partitionPath, String basePath, String baseFileExtension, HoodieStorage storage) throws IOException {
        StoragePathFilter pathFilter = ListingBasedRollbackStrategy.getPathFilter(baseFileExtension, instantToRollback.requestedTime());
        List<StoragePath> filePaths = ListingBasedRollbackStrategy.listFilesToBeDeleted(basePath, partitionPath);
        return storage.listDirectEntries(filePaths, pathFilter);
    }

    private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback, Option<HoodieCommitMetadata> commitMetadataOptional) {
        return commitMetadataOptional.isPresent() && instantToRollback.isCompleted() && !WriteOperationType.UNKNOWN.equals((Object)((HoodieCommitMetadata)commitMetadataOptional.get()).getOperationType());
    }

    private static List<StoragePath> listFilesToBeDeleted(String basePath, String partitionPath) {
        return Collections.singletonList(FSUtils.constructAbsolutePath((String)basePath, (String)partitionPath));
    }

    private static Stream<StoragePath> getFilesFromCommitMetadata(String basePath, HoodieCommitMetadata commitMetadata, String partitionPath) {
        List fullPaths = commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath);
        return fullPaths.stream().map(StoragePath::new);
    }

    @NotNull
    private static StoragePathFilter getPathFilter(String basefileExtension, String commit) {
        return (StoragePathFilter & Serializable)path -> {
            if (path.toString().endsWith(basefileExtension)) {
                String fileCommitTime = FSUtils.getCommitTime((String)path.getName());
                return commit.equals(fileCommitTime);
            }
            if (FSUtils.isLogFile((StoragePath)path)) {
                String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath((StoragePath)path);
                return commit.equals(fileCommitTime);
            }
            return false;
        };
    }
}

