/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.cli.commands;

import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import scala.collection.JavaConverters;
import scala.collection.Map;

@Component
public class ClusteringCommand
implements CommandMarker {
    private static final Logger LOG = LogManager.getLogger(ClusteringCommand.class);

    @CliCommand(value={"clustering schedule"}, help="Schedule Clustering")
    public String scheduleClustering(@CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="yarn", help="Spark master") String master, @CliOption(key={"sparkMemory"}, unspecifiedDefaultValue="1g", help="Spark executor memory") String sparkMemory, @CliOption(key={"propsFilePath"}, help="path to properties file on localfs or dfs with configurations for hoodie client for clustering", unspecifiedDefaultValue="") String propsFilePath, @CliOption(key={"hoodieConfigs"}, help="Any configuration that can be set in the properties file can be passed here in the form of an array", unspecifiedDefaultValue="") String[] configs) throws Exception {
        HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)((Map)JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()));
        SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
        String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
        sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.CLUSTERING_SCHEDULE.toString(), master, sparkMemory, client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, propsFilePath});
        UtilHelpers.validateAndAddProperties((String[])configs, (SparkLauncher)sparkLauncher);
        Process process = sparkLauncher.launch();
        InputStreamConsumer.captureOutput(process);
        int exitCode = process.waitFor();
        if (exitCode != 0) {
            return "Failed to schedule clustering for " + clusteringInstantTime;
        }
        return "Succeeded to schedule clustering for " + clusteringInstantTime;
    }

    @CliCommand(value={"clustering run"}, help="Run Clustering")
    public String runClustering(@CliOption(key={"sparkMaster"}, unspecifiedDefaultValue="yarn", help="Spark master") String master, @CliOption(key={"sparkMemory"}, help="Spark executor memory", unspecifiedDefaultValue="4g") String sparkMemory, @CliOption(key={"parallelism"}, help="Parallelism for hoodie clustering", unspecifiedDefaultValue="1") String parallelism, @CliOption(key={"retry"}, help="Number of retries", unspecifiedDefaultValue="1") String retry, @CliOption(key={"clusteringInstant"}, help="Clustering instant time", mandatory=true) String clusteringInstantTime, @CliOption(key={"propsFilePath"}, help="path to properties file on localfs or dfs with configurations for hoodie client for compacting", unspecifiedDefaultValue="") String propsFilePath, @CliOption(key={"hoodieConfigs"}, help="Any configuration that can be set in the properties file can be passed here in the form of an array", unspecifiedDefaultValue="") String[] configs) throws Exception {
        HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
        boolean initialized = HoodieCLI.initConf();
        HoodieCLI.initFS(initialized);
        String sparkPropertiesPath = Utils.getDefaultPropertiesFile((Map)((Map)JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()));
        SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
        sparkLauncher.addAppArgs(new String[]{SparkMain.SparkCommand.CLUSTERING_RUN.toString(), master, sparkMemory, client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, retry, propsFilePath});
        UtilHelpers.validateAndAddProperties((String[])configs, (SparkLauncher)sparkLauncher);
        Process process = sparkLauncher.launch();
        InputStreamConsumer.captureOutput(process);
        int exitCode = process.waitFor();
        if (exitCode != 0) {
            return "Failed to run clustering for " + clusteringInstantTime;
        }
        return "Succeeded to run clustering for " + clusteringInstantTime;
    }
}

