/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.RollbackRequest;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class RollbackExecutor
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(RollbackExecutor.class);
    private final HoodieTableMetaClient metaClient;
    private final HoodieWriteConfig config;

    public RollbackExecutor(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
        this.metaClient = metaClient;
        this.config = config;
    }

    public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
        SerializablePathFilter filter = path -> {
            if (path.toString().contains(".parquet")) {
                String fileCommitTime = FSUtils.getCommitTime(path.getName());
                return instantToRollback.getTimestamp().equals(fileCommitTime);
            }
            if (path.toString().contains(".log")) {
                String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
                return instantToRollback.getTimestamp().equals(fileCommitTime);
            }
            return false;
        };
        int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), this.config.getRollbackParallelism()), 1);
        return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair((PairFunction & Serializable)rollbackRequest -> {
            HashMap<FileStatus, Boolean> filesToDeletedStatus = new HashMap<FileStatus, Boolean>();
            switch (rollbackRequest.getRollbackAction()) {
                case DELETE_DATA_FILES_ONLY: {
                    this.deleteCleanedFiles(this.metaClient, this.config, filesToDeletedStatus, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath());
                    return new Tuple2((Object)rollbackRequest.getPartitionPath(), (Object)HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());
                }
                case DELETE_DATA_AND_LOG_FILES: {
                    this.deleteCleanedFiles(this.metaClient, this.config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);
                    return new Tuple2((Object)rollbackRequest.getPartitionPath(), (Object)HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());
                }
                case APPEND_ROLLBACK_BLOCK: {
                    Closeable writer = null;
                    try {
                        writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath(this.metaClient.getBasePath(), rollbackRequest.getPartitionPath())).withFileId(rollbackRequest.getFileId().get()).overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(this.metaClient.getFs()).withFileExtension(".log").build();
                        Map<HoodieLogBlock.HeaderMetadataType, String> header = this.generateHeader(instantToRollback.getTimestamp());
                        writer = writer.appendBlock(new HoodieCommandBlock(header));
                    }
                    catch (IOException | InterruptedException io) {
                        throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
                    }
                    finally {
                        try {
                            if (writer != null) {
                                writer.close();
                            }
                        }
                        catch (IOException io) {
                            throw new UncheckedIOException(io);
                        }
                    }
                    HashMap<FileStatus, Long> filesToNumBlocksRollback = new HashMap<FileStatus, Long>();
                    filesToNumBlocksRollback.put(this.metaClient.getFs().getFileStatus(((HoodieLogFormat.Writer)Preconditions.checkNotNull((Object)writer)).getLogFile().getPath()), 1L);
                    return new Tuple2((Object)rollbackRequest.getPartitionPath(), (Object)HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
                }
            }
            throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
        }).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();
    }

    private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) {
        Preconditions.checkArgument((boolean)stat1.getPartitionPath().equals(stat2.getPartitionPath()));
        ArrayList<String> successDeleteFiles = new ArrayList<String>();
        ArrayList<String> failedDeleteFiles = new ArrayList<String>();
        HashMap<FileStatus, Long> commandBlocksCount = new HashMap<FileStatus, Long>();
        if (stat1.getSuccessDeleteFiles() != null) {
            successDeleteFiles.addAll(stat1.getSuccessDeleteFiles());
        }
        if (stat2.getSuccessDeleteFiles() != null) {
            successDeleteFiles.addAll(stat2.getSuccessDeleteFiles());
        }
        if (stat1.getFailedDeleteFiles() != null) {
            failedDeleteFiles.addAll(stat1.getFailedDeleteFiles());
        }
        if (stat2.getFailedDeleteFiles() != null) {
            failedDeleteFiles.addAll(stat2.getFailedDeleteFiles());
        }
        if (stat1.getCommandBlocksCount() != null) {
            commandBlocksCount.putAll(stat1.getCommandBlocksCount());
        }
        if (stat2.getCommandBlocksCount() != null) {
            commandBlocksCount.putAll(stat2.getCommandBlocksCount());
        }
        return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
    }

    private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException {
        FileStatus[] toBeDeleted;
        LOG.info((Object)("Cleaning path " + partitionPath));
        HoodieWrapperFileSystem fs = metaClient.getFs();
        for (FileStatus file : toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter)) {
            boolean success = fs.delete(file.getPath(), false);
            results.put(file, success);
            LOG.info((Object)("Delete file " + file.getPath() + "\t" + success));
        }
        return results;
    }

    private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
        FileStatus[] toBeDeleted;
        LOG.info((Object)("Cleaning path " + partitionPath));
        HoodieWrapperFileSystem fs = metaClient.getFs();
        PathFilter filter = path -> {
            if (path.toString().contains(".parquet")) {
                String fileCommitTime = FSUtils.getCommitTime(path.getName());
                return commit.equals(fileCommitTime);
            }
            return false;
        };
        for (FileStatus file : toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter)) {
            boolean success = fs.delete(file.getPath(), false);
            results.put(file, success);
            LOG.info((Object)("Delete file " + file.getPath() + "\t" + success));
        }
        return results;
    }

    private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
        HashMap header = Maps.newHashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, this.metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        return header;
    }

    public static interface SerializablePathFilter
    extends PathFilter,
    Serializable {
    }
}

