/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.cli.commands;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.CompactionAdminClient;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import scala.collection.JavaConversions;
import scala.collection.Map;

@Component
public class CompactionCommand
implements CommandMarker {
    private static final Logger LOG = LogManager.getLogger(CompactionCommand.class);
    private static final String TMP_DIR = "/tmp/";

    private HoodieTableMetaClient checkAndGetMetaClient() {
        HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
        if (client.getTableType() != HoodieTableType.MERGE_ON_READ) {
            throw new HoodieException("Compactions can only be run for table type : MERGE_ON_READ");
        }
        return client;
    }

    @CliCommand(value={"compactions show all"}, help="Shows all compactions that are in active timeline")
    public String compactionsAll(@CliOption(key={"includeExtraMetadata"}, help="Include extra metadata", unspecifiedDefaultValue="false") boolean includeExtraMetadata, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws IOException {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
        return this.printAllCompactions((HoodieDefaultTimeline)activeTimeline, this.compactionPlanReader(this::readCompactionPlanForActiveTimeline, activeTimeline), includeExtraMetadata, sortByField, descending, limit, headerOnly);
    }

    @CliCommand(value={"compaction show"}, help="Shows compaction details for a specific compaction instant")
    public String compactionShow(@CliOption(key={"instant"}, mandatory=true, help="Base path for the target hoodie table") String compactionInstantTime, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
        HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant((String)compactionInstantTime)).get()));
        return this.printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compactions showarchived"}, help="Shows compaction details for specified time window")
    public String compactionsShowArchived(@CliOption(key={"includeExtraMetadata"}, help="Include extra metadata", unspecifiedDefaultValue="false") boolean includeExtraMetadata, @CliOption(key={"startTs"}, mandatory=false, help="start time for compactions, default: now - 10 days") String startTs, @CliOption(key={"endTs"}, mandatory=false, help="end time for compactions, default: now - 1 day") String endTs, @CliOption(key={"limit"}, help="Limit compactions", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        if (StringUtils.isNullOrEmpty((String)startTs)) {
            startTs = CommitUtil.getTimeDaysAgo(10);
        }
        if (StringUtils.isNullOrEmpty((String)endTs)) {
            endTs = CommitUtil.getTimeDaysAgo(1);
        }
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
        archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
        try {
            String string = this.printAllCompactions((HoodieDefaultTimeline)archivedTimeline, this.compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline), includeExtraMetadata, sortByField, descending, limit, headerOnly);
            return string;
        }
        finally {
            archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction showarchived"}, help="Shows compaction details for a specific compaction instant")
    public String compactionShowArchived(@CliOption(key={"instant"}, mandatory=true, help="instant time") String compactionInstantTime, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
        HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "compaction", compactionInstantTime);
        String startTs = CommitUtil.addHours(compactionInstantTime, -1);
        String endTs = CommitUtil.addHours(compactionInstantTime, 1);
        try {
            archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
            HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])archivedTimeline.getInstantDetails(instant).get()));
            String string = this.printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
            return string;
        }
        finally {
            archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
        }
    }

    @CliCommand(value={"compaction schedule"}, help="Schedule Compaction")
    public String scheduleCompact(@CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="1G", help="Spark executor memory") String sparkMemory, @CliOption(key={"propsFilePath"}, help="path to properties file on localfs or dfs with configurations for hoodie client for compacting", unspecifiedDefaultValue="") String propsFilePath, @CliOption(key={"hoodieConfigs"}, help="Any configuration that can be set in the properties file can be passed here in the form of an array", unspecifiedDefaultValue="") String[] configs) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
        String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
        SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
        sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(), client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory, propsFilePath});
        UtilHelpers.validateAndAddProperties((String[])configs, (SparkLauncher)sparkLauncher);
        Process process = sparkLauncher.launch();
        InputStreamConsumer.captureOutput(process);
        int exitCode = process.waitFor();
        if (exitCode != 0) {
            return "Failed to run compaction for " + compactionInstantTime;
        }
        return "Attempted to schedule compaction for " + compactionInstantTime;
    }

    @CliCommand(value={"compaction run"}, help="Run Compaction for given instant time")
    public String compact(@CliOption(key={"parallelism"}, mandatory=true, help="Parallelism for hoodie compaction") String parallelism, @CliOption(key={"schemaFilePath"}, mandatory=true, help="Path for Avro schema file") String schemaFilePath, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="4G", help="Spark executor memory") String sparkMemory, @CliOption(key={"retry"}, unspecifiedDefaultValue="1", help="Number of retries") String retry, @CliOption(key={"compactionInstant"}, help="Base path for the target hoodie table") String compactionInstantTime, @CliOption(key={"propsFilePath"}, help="path to properties file on localfs or dfs with configurations for hoodie client for compacting", unspecifiedDefaultValue="") String propsFilePath, @CliOption(key={"hoodieConfigs"}, help="Any configuration that can be set in the properties file can be passed here in the form of an array", unspecifiedDefaultValue="") String[] configs) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        if (null == compactionInstantTime) {
            Option firstPendingInstant = client.reloadActiveTimeline().filterCompletedAndCompactionInstants().filter(instant -> instant.getAction().equals("compaction")).firstInstant().map(HoodieInstant::getTimestamp);
            if (!firstPendingInstant.isPresent()) {
                return "NO PENDING COMPACTION TO RUN";
            }
            compactionInstantTime = (String)firstPendingInstant.get();
        }
        String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
        SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
        sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_RUN.toString(), client.getBasePath(), client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, sparkMemory, retry, propsFilePath});
        UtilHelpers.validateAndAddProperties((String[])configs, (SparkLauncher)sparkLauncher);
        Process process = sparkLauncher.launch();
        InputStreamConsumer.captureOutput(process);
        int exitCode = process.waitFor();
        if (exitCode != 0) {
            return "Failed to run compaction for " + compactionInstantTime;
        }
        return "Compaction successfully completed for " + compactionInstantTime;
    }

    private String printAllCompactions(HoodieDefaultTimeline timeline, Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader, boolean includeExtraMetadata, String sortByField, boolean descending, int limit, boolean headerOnly) {
        Stream instantsStream = timeline.getWriteTimeline().getReverseOrderedInstants();
        List compactionPlans = instantsStream.map(instant -> Pair.of((Object)instant, compactionPlanReader.apply((HoodieInstant)instant))).filter(pair -> pair.getRight() != null).collect(Collectors.toList());
        Set committedInstants = timeline.getCommitTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
        ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
        for (Pair compactionPlan : compactionPlans) {
            HoodieCompactionPlan plan = (HoodieCompactionPlan)compactionPlan.getRight();
            HoodieInstant instant2 = (HoodieInstant)compactionPlan.getLeft();
            HoodieInstant.State state = committedInstants.contains(instant2) ? HoodieInstant.State.COMPLETED : instant2.getState();
            if (includeExtraMetadata) {
                rows.add(new Comparable[]{instant2.getTimestamp(), state.toString(), Integer.valueOf(plan.getOperations() == null ? 0 : plan.getOperations().size()), plan.getExtraMetadata().toString()});
                continue;
            }
            rows.add(new Comparable[]{instant2.getTimestamp(), state.toString(), Integer.valueOf(plan.getOperations() == null ? 0 : plan.getOperations().size())});
        }
        HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
        TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State").addTableHeaderField("Total FileIds to be Compacted");
        if (includeExtraMetadata) {
            header = header.addTableHeaderField("Extra Metadata");
        }
        return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
    }

    private <T extends HoodieDefaultTimeline, U extends HoodieInstant, V extends HoodieCompactionPlan> Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader(BiFunction<T, HoodieInstant, HoodieCompactionPlan> f, T timeline) {
        return y -> (HoodieCompactionPlan)f.apply(timeline, (HoodieInstant)y);
    }

    private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline, HoodieInstant instant) {
        if (!"compaction".equals(instant.getAction())) {
            return null;
        }
        try {
            return TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])archivedTimeline.getInstantDetails(instant).get()));
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTimeline activeTimeline, HoodieInstant instant) {
        try {
            if (!"compaction".equals(instant.getAction())) {
                try {
                    return TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant((String)instant.getTimestamp())).get()));
                }
                catch (HoodieIOException ioe) {
                    return null;
                }
            }
            return TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant((String)instant.getTimestamp())).get()));
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    private String printCompaction(HoodieCompactionPlan compactionPlan, String sortByField, boolean descending, int limit, boolean headerOnly) {
        ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
        if (null != compactionPlan && null != compactionPlan.getOperations()) {
            for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
                rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(), Integer.valueOf(op.getDeltaFilePaths().size()), op.getMetrics() == null ? "" : op.getMetrics().toString()});
            }
        }
        HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
        TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id").addTableHeaderField("Base Instant").addTableHeaderField("Data File Path").addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics");
        return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
    }

    private static String getTmpSerializerFile() {
        return TMP_DIR + UUID.randomUUID().toString() + ".ser";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T deSerializeOperationResult(String inputP, FileSystem fs) throws Exception {
        Path inputPath = new Path(inputP);
        FSDataInputStream fsDataInputStream = fs.open(inputPath);
        ObjectInputStream in = new ObjectInputStream((InputStream)fsDataInputStream);
        try {
            Object result = in.readObject();
            LOG.info((Object)("Result : " + result));
            Object object = result;
            return (T)object;
        }
        finally {
            in.close();
            fsDataInputStream.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction validate"}, help="Validate Compaction")
    public String validateCompaction(@CliOption(key={"instant"}, mandatory=true, help="Compaction Instant") String compactionInstant, @CliOption(key={"parallelism"}, unspecifiedDefaultValue="3", help="Parallelism") String parallelism, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_VALIDATE.toString(), master, sparkMemory, client.getBasePath(), compactionInstant, outputPathStr, parallelism});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to validate compaction for " + compactionInstant;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
            String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
            ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
            res.forEach(r -> {
                Comparable[] row = new Comparable[]{((CompactionOperation)r.getOperation()).getFileId(), ((CompactionOperation)r.getOperation()).getBaseInstantTime(), ((CompactionOperation)r.getOperation()).getDataFileName().isPresent() ? (Comparable)((CompactionOperation)r.getOperation()).getDataFileName().get() : "", Integer.valueOf(((CompactionOperation)r.getOperation()).getDeltaFileNames().size()), Boolean.valueOf(r.isSuccess()), r.getException().isPresent() ? ((Exception)r.getException().get()).getMessage() : ""};
                rows.add(row);
            });
            HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
            TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time").addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid").addTableHeaderField("Error");
            output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction unschedule"}, help="Unschedule Compaction")
    public String unscheduleCompaction(@CliOption(key={"instant"}, mandatory=true, help="Compaction Instant") String compactionInstant, @CliOption(key={"parallelism"}, unspecifiedDefaultValue="3", help="Parallelism") String parallelism, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"skipValidation"}, help="skip validation", unspecifiedDefaultValue="false") boolean skipV, @CliOption(key={"dryRun"}, help="Dry Run Mode", unspecifiedDefaultValue="false") boolean dryRun, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), master, sparkMemory, client.getBasePath(), compactionInstant, outputPathStr, parallelism, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to unschedule compaction for " + compactionInstant;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction unscheduleFileId"}, help="UnSchedule Compaction for a fileId")
    public String unscheduleCompactFile(@CliOption(key={"fileId"}, mandatory=true, help="File Id") String fileId, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"skipValidation"}, help="skip validation", unspecifiedDefaultValue="false") boolean skipV, @CliOption(key={"dryRun"}, help="Dry Run Mode", unspecifiedDefaultValue="false") boolean dryRun, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, client.getBasePath(), fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to unschedule compaction for file " + fileId;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule file from pending compaction");
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @CliCommand(value={"compaction repair"}, help="Renames the files to make them consistent with the timeline as dictated by Hoodie metadata. Use when compaction unschedule fails partially.")
    public String repairCompaction(@CliOption(key={"instant"}, mandatory=true, help="Compaction Instant") String compactionInstant, @CliOption(key={"parallelism"}, unspecifiedDefaultValue="3", help="Parallelism") String parallelism, @CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="", help="Spark Master ") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="2G", help="executor memory") String sparkMemory, @CliOption(key={"dryRun"}, help="Dry Run Mode", unspecifiedDefaultValue="false") boolean dryRun, @CliOption(key={"limit"}, help="Limit commits", unspecifiedDefaultValue="-1") Integer limit, @CliOption(key={"sortBy"}, help="Sorting Field", unspecifiedDefaultValue="") String sortByField, @CliOption(key={"desc"}, help="Ordering", unspecifiedDefaultValue="false") boolean descending, @CliOption(key={"headeronly"}, help="Print Header Only", unspecifiedDefaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_REPAIR.toString(), master, sparkMemory, client.getBasePath(), compactionInstant, outputPathStr, parallelism, Boolean.valueOf(dryRun).toString()});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to unschedule compaction for " + compactionInstant;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    private String getRenamesToBePrinted(List<CompactionAdminClient.RenameOpResult> res, Integer limit, String sortByField, boolean descending, boolean headerOnly, String operation) {
        Option result = Option.fromJavaOptional(res.stream().map(r -> r.isExecuted() && r.isSuccess()).reduce(Boolean::logicalAnd));
        if (result.isPresent()) {
            System.out.println("There were some file renames that needed to be done to " + operation);
            if (((Boolean)result.get()).booleanValue()) {
                System.out.println("All renames successfully completed to " + operation + " done !!");
            } else {
                System.out.println("Some renames failed. table could be in inconsistent-state. Try running compaction repair");
            }
            ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
            res.forEach(r -> {
                Comparable[] row = new Comparable[]{((CompactionAdminClient.RenameInfo)r.getOperation()).fileId, ((CompactionAdminClient.RenameInfo)r.getOperation()).srcPath, ((CompactionAdminClient.RenameInfo)r.getOperation()).destPath, Boolean.valueOf(r.isExecuted()), Boolean.valueOf(r.isSuccess()), r.getException().isPresent() ? ((Exception)r.getException().get()).getMessage() : ""};
                rows.add(row);
            });
            HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
            TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Source File Path").addTableHeaderField("Destination File Path").addTableHeaderField("Rename Executed?").addTableHeaderField("Rename Succeeded?").addTableHeaderField("Error");
            return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
        }
        return "No File renames needed to " + operation + ". Operation successful.";
    }
}

