/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.cli.commands;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.CompactionAdminClient;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
import scala.collection.JavaConversions;
import scala.collection.Map;

@ShellComponent
public class CompactionCommand {
    private static final Logger LOG = LogManager.getLogger(CompactionCommand.class);
    private static final String TMP_DIR = "/tmp/";

    private HoodieTableMetaClient checkAndGetMetaClient() {
        HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
        if (client.getTableType() != HoodieTableType.MERGE_ON_READ) {
            throw new HoodieException("Compactions can only be run for table type : MERGE_ON_READ");
        }
        return client;
    }

    @ShellMethod(key={"compactions show all"}, value="Shows all compactions that are in active timeline")
    public String compactionsAll(@ShellOption(value={"--includeExtraMetadata"}, help="Include extra metadata", defaultValue="false") boolean includeExtraMetadata, @ShellOption(value={"--limit"}, help="Limit commits", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Print Header Only", defaultValue="false") boolean headerOnly) {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
        return CompactionCommand.printAllCompactions((HoodieDefaultTimeline)activeTimeline, this.compactionPlanReader(this::readCompactionPlanForActiveTimeline, activeTimeline), includeExtraMetadata, sortByField, descending, limit, headerOnly);
    }

    @ShellMethod(key={"compaction show"}, value="Shows compaction details for a specific compaction instant")
    public String compactionShow(@ShellOption(value={"--instant"}, help="Base path for the target hoodie table") String compactionInstantTime, @ShellOption(value={"--limit"}, help="Limit commits", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Print Header Only", defaultValue="false") boolean headerOnly, @ShellOption(value={"--partition"}, help="Partition value", defaultValue="__NULL__") String partition) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
        HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant((String)compactionInstantTime)).get()));
        return CompactionCommand.printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ShellMethod(key={"compactions showarchived"}, value="Shows compaction details for specified time window")
    public String compactionsShowArchived(@ShellOption(value={"--includeExtraMetadata"}, help="Include extra metadata", defaultValue="false") boolean includeExtraMetadata, @ShellOption(value={"--startTs"}, defaultValue="__NULL__", help="start time for compactions, default: now - 10 days") String startTs, @ShellOption(value={"--endTs"}, defaultValue="__NULL__", help="end time for compactions, default: now - 1 day") String endTs, @ShellOption(value={"--limit"}, help="Limit compactions", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Print Header Only", defaultValue="false") boolean headerOnly) {
        if (StringUtils.isNullOrEmpty((String)startTs)) {
            startTs = CommitUtil.getTimeDaysAgo(10);
        }
        if (StringUtils.isNullOrEmpty((String)endTs)) {
            endTs = CommitUtil.getTimeDaysAgo(1);
        }
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
        archivedTimeline.loadCompactionDetailsInMemory(startTs, endTs);
        try {
            String string = CompactionCommand.printAllCompactions((HoodieDefaultTimeline)archivedTimeline, this.compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline), includeExtraMetadata, sortByField, descending, limit, headerOnly);
            return string;
        }
        finally {
            archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ShellMethod(key={"compaction showarchived"}, value="Shows compaction details for a specific compaction instant")
    public String compactionShowArchived(@ShellOption(value={"--instant"}, help="instant time") String compactionInstantTime, @ShellOption(value={"--limit"}, help="Limit commits", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Print Header Only", defaultValue="false") boolean headerOnly, @ShellOption(value={"--partition"}, help="Partition value", defaultValue="__NULL__") String partition) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
        HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "compaction", compactionInstantTime);
        try {
            archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
            HoodieCompactionPlan compactionPlan = (HoodieCompactionPlan)TimelineMetadataUtils.deserializeAvroRecordMetadata((byte[])((byte[])archivedTimeline.getInstantDetails(instant).get()), (Schema)HoodieCompactionPlan.getClassSchema());
            String string = CompactionCommand.printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
            return string;
        }
        finally {
            archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
        }
    }

    @ShellMethod(key={"compaction schedule"}, value="Schedule Compaction")
    public String scheduleCompact(@ShellOption(value={"--sparkMemory"}, defaultValue="1G", help="Spark executor memory") String sparkMemory, @ShellOption(value={"--propsFilePath"}, help="path to properties file on localfs or dfs with configurations for hoodie client for compacting", defaultValue="") String propsFilePath, @ShellOption(value={"--hoodieConfigs"}, help="Any configuration that can be set in the properties file can be passed here in the form of an array", defaultValue="") String[] configs, @ShellOption(value={"--sparkMaster"}, defaultValue="local", help="Spark Master") String master) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
        String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
        SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
        String cmd = SparkMain.SparkCommand.COMPACT_SCHEDULE.toString();
        sparkLauncher.addAppArgs(new String[]{cmd, master, sparkMemory, client.getBasePath(), client.getTableConfig().getTableName(), compactionInstantTime, propsFilePath});
        UtilHelpers.validateAndAddProperties((String[])configs, (SparkLauncher)sparkLauncher);
        Process process = sparkLauncher.launch();
        InputStreamConsumer.captureOutput(process);
        int exitCode = process.waitFor();
        if (exitCode != 0) {
            return "Failed to run compaction for " + compactionInstantTime;
        }
        return "Attempted to schedule compaction for " + compactionInstantTime;
    }

    @ShellMethod(key={"compaction run"}, value="Run Compaction for given instant time")
    public String compact(@ShellOption(value={"--parallelism"}, defaultValue="3", help="Parallelism for hoodie compaction") String parallelism, @ShellOption(value={"--schemaFilePath"}, help="Path for Avro schema file", defaultValue="") String schemaFilePath, @ShellOption(value={"--sparkMaster"}, defaultValue="local", help="Spark Master") String master, @ShellOption(value={"--sparkMemory"}, defaultValue="4G", help="Spark executor memory") String sparkMemory, @ShellOption(value={"--retry"}, defaultValue="1", help="Number of retries") String retry, @ShellOption(value={"--compactionInstant"}, help="Instant of compaction.request", defaultValue="__NULL__") String compactionInstantTime, @ShellOption(value={"--propsFilePath"}, help="path to properties file on localfs or dfs with configurations for hoodie client for compacting", defaultValue="") String propsFilePath, @ShellOption(value={"--hoodieConfigs"}, help="Any configuration that can be set in the properties file can be passed here in the form of an array", defaultValue="") String[] configs) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        if (null == compactionInstantTime) {
            Option firstPendingInstant = client.reloadActiveTimeline().filterCompletedAndCompactionInstants().filter(instant -> instant.getAction().equals("compaction")).firstInstant().map(HoodieInstant::getTimestamp);
            if (!firstPendingInstant.isPresent()) {
                return "NO PENDING COMPACTION TO RUN";
            }
            compactionInstantTime = (String)firstPendingInstant.get();
        }
        String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
        SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
        sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_RUN.toString(), master, sparkMemory, client.getBasePath(), client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, retry, propsFilePath});
        UtilHelpers.validateAndAddProperties((String[])configs, (SparkLauncher)sparkLauncher);
        Process process = sparkLauncher.launch();
        InputStreamConsumer.captureOutput(process);
        int exitCode = process.waitFor();
        if (exitCode != 0) {
            return "Failed to run compaction for " + compactionInstantTime;
        }
        return "Compaction successfully completed for " + compactionInstantTime;
    }

    @ShellMethod(key={"compaction scheduleAndExecute"}, value="Schedule compaction plan and execute this plan")
    public String compact(@ShellOption(value={"--parallelism"}, defaultValue="3", help="Parallelism for hoodie compaction") String parallelism, @ShellOption(value={"--schemaFilePath"}, help="Path for Avro schema file", defaultValue="__NULL__") String schemaFilePath, @ShellOption(value={"--sparkMaster"}, defaultValue="local", help="Spark Master") String master, @ShellOption(value={"--sparkMemory"}, defaultValue="4G", help="Spark executor memory") String sparkMemory, @ShellOption(value={"--retry"}, defaultValue="1", help="Number of retries") String retry, @ShellOption(value={"--propsFilePath"}, help="path to properties file on localfs or dfs with configurations for hoodie client for compacting", defaultValue="") String propsFilePath, @ShellOption(value={"--hoodieConfigs"}, help="Any configuration that can be set in the properties file can be passed here in the form of an array", defaultValue="") String[] configs) throws Exception {
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
        SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
        sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory, client.getBasePath(), client.getTableConfig().getTableName(), parallelism, schemaFilePath, retry, propsFilePath});
        UtilHelpers.validateAndAddProperties((String[])configs, (SparkLauncher)sparkLauncher);
        Process process = sparkLauncher.launch();
        InputStreamConsumer.captureOutput(process);
        int exitCode = process.waitFor();
        if (exitCode != 0) {
            return "Failed to schedule and execute compaction ";
        }
        return "Schedule and execute compaction successfully completed";
    }

    private static String printAllCompactions(HoodieDefaultTimeline timeline, Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader, boolean includeExtraMetadata, String sortByField, boolean descending, int limit, boolean headerOnly) {
        Stream instantsStream = timeline.getWriteTimeline().getReverseOrderedInstants();
        List compactionPlans = instantsStream.map(instant -> Pair.of((Object)instant, compactionPlanReader.apply((HoodieInstant)instant))).filter(pair -> pair.getRight() != null).collect(Collectors.toList());
        Set committedInstants = timeline.getCommitTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
        ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
        for (Pair compactionPlan : compactionPlans) {
            HoodieCompactionPlan plan = (HoodieCompactionPlan)compactionPlan.getRight();
            HoodieInstant instant2 = (HoodieInstant)compactionPlan.getLeft();
            HoodieInstant.State state = committedInstants.contains(instant2.getTimestamp()) ? HoodieInstant.State.COMPLETED : instant2.getState();
            if (includeExtraMetadata) {
                rows.add(new Comparable[]{instant2.getTimestamp(), state.toString(), Integer.valueOf(plan.getOperations() == null ? 0 : plan.getOperations().size()), plan.getExtraMetadata().toString()});
                continue;
            }
            rows.add(new Comparable[]{instant2.getTimestamp(), state.toString(), Integer.valueOf(plan.getOperations() == null ? 0 : plan.getOperations().size())});
        }
        HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
        TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State").addTableHeaderField("Total FileIds to be Compacted");
        if (includeExtraMetadata) {
            header = header.addTableHeaderField("Extra Metadata");
        }
        return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
    }

    private <T extends HoodieDefaultTimeline, U extends HoodieInstant, V extends HoodieCompactionPlan> Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader(BiFunction<T, HoodieInstant, HoodieCompactionPlan> f, T timeline) {
        return y -> (HoodieCompactionPlan)f.apply(timeline, (HoodieInstant)y);
    }

    private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline, HoodieInstant instant) {
        if ("compaction".equals(instant.getAction()) && HoodieInstant.State.INFLIGHT.equals((Object)instant.getState())) {
            try {
                return (HoodieCompactionPlan)TimelineMetadataUtils.deserializeAvroRecordMetadata((byte[])((byte[])archivedTimeline.getInstantDetails(instant).get()), (Schema)HoodieCompactionPlan.getClassSchema());
            }
            catch (Exception e) {
                throw new HoodieException(e.getMessage(), (Throwable)e);
            }
        }
        return null;
    }

    private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTimeline activeTimeline, HoodieInstant instant) {
        try {
            if (!"compaction".equals(instant.getAction())) {
                try {
                    return TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant((String)instant.getTimestamp())).get()));
                }
                catch (HoodieIOException ioe) {
                    return null;
                }
            }
            return TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])activeTimeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant((String)instant.getTimestamp())).get()));
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    protected static String printCompaction(HoodieCompactionPlan compactionPlan, String sortByField, boolean descending, int limit, boolean headerOnly, String partition) {
        ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
        if (null != compactionPlan && null != compactionPlan.getOperations()) {
            for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
                if (!StringUtils.isNullOrEmpty((String)partition) && !partition.equals(op.getPartitionPath())) continue;
                rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(), Integer.valueOf(op.getDeltaFilePaths().size()), op.getMetrics() == null ? "" : op.getMetrics().toString()});
            }
        }
        HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
        TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("FileId").addTableHeaderField("Base-Instant").addTableHeaderField("Data File Path").addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics");
        return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
    }

    private static String getTmpSerializerFile() {
        return TMP_DIR + UUID.randomUUID().toString() + ".ser";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T deSerializeOperationResult(String inputP, FileSystem fs) throws Exception {
        Path inputPath = new Path(inputP);
        FSDataInputStream fsDataInputStream = fs.open(inputPath);
        ObjectInputStream in = new ObjectInputStream((InputStream)fsDataInputStream);
        try {
            Object result = in.readObject();
            LOG.info("Result : " + result);
            Object object = result;
            return (T)object;
        }
        finally {
            in.close();
            fsDataInputStream.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ShellMethod(key={"compaction validate"}, value="Validate Compaction")
    public String validateCompaction(@ShellOption(value={"--instant"}, help="Compaction Instant") String compactionInstant, @ShellOption(value={"--parallelism"}, defaultValue="3", help="Parallelism") String parallelism, @ShellOption(value={"--sparkMaster"}, defaultValue="local", help="Spark Master") String master, @ShellOption(value={"--sparkMemory"}, defaultValue="2G", help="executor memory") String sparkMemory, @ShellOption(value={"--limit"}, help="Limit commits", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Print Header Only", defaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_VALIDATE.toString(), master, sparkMemory, client.getBasePath(), compactionInstant, outputPathStr, parallelism});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to validate compaction for " + compactionInstant;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
            String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
            ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
            res.forEach(r -> {
                Comparable[] row = new Comparable[]{((CompactionOperation)r.getOperation()).getFileId(), ((CompactionOperation)r.getOperation()).getBaseInstantTime(), ((CompactionOperation)r.getOperation()).getDataFileName().isPresent() ? (Comparable)((CompactionOperation)r.getOperation()).getDataFileName().get() : "", Integer.valueOf(((CompactionOperation)r.getOperation()).getDeltaFileNames().size()), Boolean.valueOf(r.isSuccess()), r.getException().isPresent() ? ((Exception)r.getException().get()).getMessage() : ""};
                rows.add(row);
            });
            HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
            TableHeader header = new TableHeader().addTableHeaderField("FileId").addTableHeaderField("Base Instant Time").addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid").addTableHeaderField("Error");
            output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ShellMethod(key={"compaction unschedule"}, value="Unschedule Compaction")
    public String unscheduleCompaction(@ShellOption(value={"--instant"}, help="Compaction Instant") String compactionInstant, @ShellOption(value={"--parallelism"}, defaultValue="3", help="Parallelism") String parallelism, @ShellOption(value={"--sparkMaster"}, defaultValue="local", help="Spark Master") String master, @ShellOption(value={"--sparkMemory"}, defaultValue="2G", help="executor memory") String sparkMemory, @ShellOption(value={"--skipValidation"}, help="skip validation", defaultValue="false") boolean skipV, @ShellOption(value={"--dryRun"}, help="Dry Run Mode", defaultValue="false") boolean dryRun, @ShellOption(value={"--limit"}, help="Limit commits", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Print Header Only", defaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), master, sparkMemory, client.getBasePath(), compactionInstant, outputPathStr, parallelism, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to unschedule compaction for " + compactionInstant;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ShellMethod(key={"compaction unscheduleFileId"}, value="UnSchedule Compaction for a fileId")
    public String unscheduleCompactFile(@ShellOption(value={"--fileId"}, help="File Id") String fileId, @ShellOption(value={"--partitionPath"}, defaultValue="", help="partition path") String partitionPath, @ShellOption(value={"--sparkMaster"}, defaultValue="local", help="Spark Master") String master, @ShellOption(value={"--sparkMemory"}, defaultValue="2G", help="executor memory") String sparkMemory, @ShellOption(value={"--skipValidation"}, help="skip validation", defaultValue="false") boolean skipV, @ShellOption(value={"--dryRun"}, help="Dry Run Mode", defaultValue="false") boolean dryRun, @ShellOption(value={"--limit"}, help="Limit commits", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Header Only", defaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, client.getBasePath(), fileId, partitionPath, outputPathStr, "1", Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to unschedule compaction for file " + fileId;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule file from pending compaction");
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ShellMethod(key={"compaction repair"}, value="Renames the files to make them consistent with the timeline as dictated by Hoodie metadata. Use when compaction unschedule fails partially.")
    public String repairCompaction(@ShellOption(value={"--instant"}, help="Compaction Instant") String compactionInstant, @ShellOption(value={"--parallelism"}, defaultValue="3", help="Parallelism") String parallelism, @ShellOption(value={"--sparkMaster"}, defaultValue="local", help="Spark Master") String master, @ShellOption(value={"--sparkMemory"}, defaultValue="2G", help="executor memory") String sparkMemory, @ShellOption(value={"--dryRun"}, help="Dry Run Mode", defaultValue="false") boolean dryRun, @ShellOption(value={"--limit"}, help="Limit commits", defaultValue="-1") Integer limit, @ShellOption(value={"--sortBy"}, help="Sorting Field", defaultValue="") String sortByField, @ShellOption(value={"--desc"}, help="Ordering", defaultValue="false") boolean descending, @ShellOption(value={"--headeronly"}, help="Print Header Only", defaultValue="false") boolean headerOnly) throws Exception {
        String output;
        HoodieTableMetaClient client = this.checkAndGetMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String outputPathStr = CompactionCommand.getTmpSerializerFile();
        Path outputPath = new Path(outputPathStr);
        try {
            String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)JavaConversions.propertiesAsScalaMap((Properties)System.getProperties()));
            SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
            sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.COMPACT_REPAIR.toString(), master, sparkMemory, client.getBasePath(), compactionInstant, outputPathStr, parallelism, Boolean.valueOf(dryRun).toString()});
            Process process = sparkLauncher.launch();
            InputStreamConsumer.captureOutput(process);
            int exitCode = process.waitFor();
            if (exitCode != 0) {
                String string = "Failed to unschedule compaction for " + compactionInstant;
                return string;
            }
            List res = (List)this.deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
            output = this.getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
        }
        finally {
            if (HoodieCLI.fs.exists(outputPath)) {
                HoodieCLI.fs.delete(outputPath, false);
            }
        }
        return output;
    }

    private String getRenamesToBePrinted(List<CompactionAdminClient.RenameOpResult> res, Integer limit, String sortByField, boolean descending, boolean headerOnly, String operation) {
        Option result = Option.fromJavaOptional(res.stream().map(r -> r.isExecuted() && r.isSuccess()).reduce(Boolean::logicalAnd));
        if (result.isPresent()) {
            System.out.println("There were some file renames that needed to be done to " + operation);
            if (((Boolean)result.get()).booleanValue()) {
                System.out.println("All renames successfully completed to " + operation + " done !!");
            } else {
                System.out.println("Some renames failed. table could be in inconsistent-state. Try running compaction repair");
            }
            ArrayList<Comparable[]> rows = new ArrayList<Comparable[]>();
            res.forEach(r -> {
                Comparable[] row = new Comparable[]{((CompactionAdminClient.RenameInfo)r.getOperation()).fileId, ((CompactionAdminClient.RenameInfo)r.getOperation()).srcPath, ((CompactionAdminClient.RenameInfo)r.getOperation()).destPath, Boolean.valueOf(r.isExecuted()), Boolean.valueOf(r.isSuccess()), r.getException().isPresent() ? ((Exception)r.getException().get()).getMessage() : ""};
                rows.add(row);
            });
            HashMap<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<String, Function<Object, String>>();
            TableHeader header = new TableHeader().addTableHeaderField("FileId").addTableHeaderField("Source File Path").addTableHeaderField("Destination File Path").addTableHeaderField("Rename Executed?").addTableHeaderField("Rename Succeeded?").addTableHeaderField("Error");
            return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
        }
        return "No File renames needed to " + operation + ". Operation successful.";
    }
}

