/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.cli.commands;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.HoodieCleaner;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieCompactionAdminTool;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.streamer.BootstrapExecutor;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.hudi.DeDupeType;
import org.apache.spark.sql.hudi.DedupeSparkJob;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkMain {
    private static final Logger LOG = LoggerFactory.getLogger(SparkMain.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    public static void main(String[] args) throws Exception {
        ValidationUtils.checkArgument((boolean)(args.length >= 4));
        commandString = args[0];
        SparkMain.LOG.info("Invoking SparkMain: " + commandString);
        cmd = SparkCommand.valueOf(commandString);
        jsc = SparkUtil.initJavaSparkContext("hoodie-cli-" + commandString, (Option<String>)Option.of((Object)args[1]), (Option<String>)Option.of((Object)args[2]));
        returnCode = 0;
        try {
            switch (1.$SwitchMap$org$apache$hudi$cli$commands$SparkMain$SparkCommand[cmd.ordinal()]) {
                case 1: {
                    if (!SparkMain.$assertionsDisabled && args.length != 6) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.rollback(jsc, args[3], args[4], Boolean.parseBoolean(args[5]));
                    ** break;
lbl14:
                    // 1 sources

                    break;
                }
                case 2: {
                    if (!SparkMain.$assertionsDisabled && args.length != 8) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]);
                    ** break;
lbl20:
                    // 1 sources

                    break;
                }
                case 3: {
                    if (!SparkMain.$assertionsDisabled && args.length != 6) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.rollbackToSavepoint(jsc, args[3], args[4], Boolean.parseBoolean(args[5]));
                    ** break;
lbl26:
                    // 1 sources

                    break;
                }
                case 4: 
                case 5: {
                    if (!SparkMain.$assertionsDisabled && args.length < 13) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[12])) {
                        propsFilePath = args[12];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 13) {
                        configs.addAll(Arrays.asList(args).subList(13, args.length));
                    }
                    returnCode = SparkMain.dataLoad(jsc, commandString, args[3], args[4], args[5], args[6], args[7], args[8], Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
                    ** break;
lbl39:
                    // 1 sources

                    break;
                }
                case 6: {
                    if (!SparkMain.$assertionsDisabled && args.length < 10) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[9])) {
                        propsFilePath = args[9];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 10) {
                        configs.addAll(Arrays.asList(args).subList(10, args.length));
                    }
                    returnCode = SparkMain.compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7], Integer.parseInt(args[8]), "execute", propsFilePath, configs);
                    ** break;
lbl52:
                    // 1 sources

                    break;
                }
                case 7: {
                    if (!SparkMain.$assertionsDisabled && args.length < 9) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[8])) {
                        propsFilePath = args[8];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 9) {
                        configs.addAll(Arrays.asList(args).subList(9, args.length));
                    }
                    returnCode = SparkMain.compact(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[6], Integer.parseInt(args[7]), "scheduleandexecute", propsFilePath, configs);
                    ** break;
lbl65:
                    // 1 sources

                    break;
                }
                case 8: {
                    if (!SparkMain.$assertionsDisabled && args.length < 7) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[6])) {
                        propsFilePath = args[6];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 7) {
                        configs.addAll(Arrays.asList(args).subList(7, args.length));
                    }
                    returnCode = SparkMain.compact(jsc, args[3], args[4], args[5], 1, "", 0, "schedule", propsFilePath, configs);
                    ** break;
lbl78:
                    // 1 sources

                    break;
                }
                case 9: {
                    if (!SparkMain.$assertionsDisabled && args.length != 7) {
                        throw new AssertionError();
                    }
                    SparkMain.doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]));
                    returnCode = 0;
                    ** break;
lbl85:
                    // 1 sources

                    break;
                }
                case 10: {
                    if (!SparkMain.$assertionsDisabled && args.length != 8) {
                        throw new AssertionError();
                    }
                    SparkMain.doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), Boolean.parseBoolean(args[7]));
                    returnCode = 0;
                    ** break;
lbl92:
                    // 1 sources

                    break;
                }
                case 11: {
                    if (!SparkMain.$assertionsDisabled && args.length != 10) {
                        throw new AssertionError();
                    }
                    SparkMain.doCompactUnscheduleFile(jsc, args[3], args[4], args[5], args[6], Integer.parseInt(args[7]), Boolean.parseBoolean(args[8]), Boolean.parseBoolean(args[9]));
                    returnCode = 0;
                    ** break;
lbl99:
                    // 1 sources

                    break;
                }
                case 12: {
                    if (!SparkMain.$assertionsDisabled && args.length != 9) {
                        throw new AssertionError();
                    }
                    SparkMain.doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
                    returnCode = 0;
                    ** break;
lbl106:
                    // 1 sources

                    break;
                }
                case 13: {
                    if (!SparkMain.$assertionsDisabled && args.length < 9) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[8])) {
                        propsFilePath = args[8];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 9) {
                        configs.addAll(Arrays.asList(args).subList(9, args.length));
                    }
                    returnCode = SparkMain.cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2], Integer.parseInt(args[7]), "execute", propsFilePath, configs);
                    ** break;
lbl119:
                    // 1 sources

                    break;
                }
                case 14: {
                    if (!SparkMain.$assertionsDisabled && args.length < 8) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[7])) {
                        propsFilePath = args[7];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 8) {
                        configs.addAll(Arrays.asList(args).subList(8, args.length));
                    }
                    returnCode = SparkMain.cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2], Integer.parseInt(args[6]), "scheduleandexecute", propsFilePath, configs);
                    ** break;
lbl132:
                    // 1 sources

                    break;
                }
                case 15: {
                    if (!SparkMain.$assertionsDisabled && args.length < 7) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[6])) {
                        propsFilePath = args[6];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 7) {
                        configs.addAll(Arrays.asList(args).subList(7, args.length));
                    }
                    returnCode = SparkMain.cluster(jsc, args[3], args[4], args[5], 1, args[2], 0, "schedule", propsFilePath, configs);
                    ** break;
lbl145:
                    // 1 sources

                    break;
                }
                case 16: {
                    if (!SparkMain.$assertionsDisabled && args.length < 5) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[4])) {
                        propsFilePath = args[4];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 5) {
                        configs.addAll(Arrays.asList(args).subList(5, args.length));
                    }
                    SparkMain.clean(jsc, args[3], propsFilePath, configs);
                    ** break;
lbl158:
                    // 1 sources

                    break;
                }
                case 17: {
                    if (!SparkMain.$assertionsDisabled && args.length != 7) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.createSavepoint(jsc, args[3], args[4], args[5], args[6]);
                    ** break;
lbl164:
                    // 1 sources

                    break;
                }
                case 18: {
                    if (!SparkMain.$assertionsDisabled && args.length != 5) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.deleteMarker(jsc, args[3], args[4]);
                    ** break;
lbl170:
                    // 1 sources

                    break;
                }
                case 19: {
                    if (!SparkMain.$assertionsDisabled && args.length != 5) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.deleteSavepoint(jsc, args[3], args[4]);
                    ** break;
lbl176:
                    // 1 sources

                    break;
                }
                case 20: {
                    if (!SparkMain.$assertionsDisabled && args.length < 18) {
                        throw new AssertionError();
                    }
                    propsFilePath = null;
                    if (!StringUtils.isNullOrEmpty((String)args[17])) {
                        propsFilePath = args[17];
                    }
                    configs = new ArrayList<String>();
                    if (args.length > 18) {
                        configs.addAll(Arrays.asList(args).subList(18, args.length));
                    }
                    returnCode = SparkMain.doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs);
                    ** break;
lbl189:
                    // 1 sources

                    break;
                }
                case 21: 
                case 22: {
                    if (!SparkMain.$assertionsDisabled && args.length != 5) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.upgradeOrDowngradeTable(jsc, args[3], args[4]);
                    ** break;
lbl195:
                    // 1 sources

                    break;
                }
                case 23: {
                    if (!SparkMain.$assertionsDisabled && args.length != 4) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.repairDeprecatedPartition(jsc, args[3]);
                    ** break;
lbl201:
                    // 1 sources

                    break;
                }
                case 24: {
                    if (!SparkMain.$assertionsDisabled && args.length != 6) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.renamePartition(jsc, args[3], args[4], args[5]);
                    ** break;
lbl207:
                    // 1 sources

                    break;
                }
                case 25: {
                    if (!SparkMain.$assertionsDisabled && args.length != 8) {
                        throw new AssertionError();
                    }
                    returnCode = SparkMain.archive(jsc, Integer.parseInt(args[3]), Integer.parseInt(args[4]), Integer.parseInt(args[5]), Boolean.parseBoolean(args[6]), args[7]);
                    ** break;
lbl213:
                    // 1 sources

                    break;
                }
                ** default:
lbl215:
                // 1 sources

                break;
            }
        }
        catch (Throwable throwable) {
            SparkMain.LOG.error("Fail to execute commandString", throwable);
            returnCode = -1;
        }
        finally {
            jsc.stop();
        }
        System.exit(returnCode);
    }

    protected static void clean(JavaSparkContext jsc, String basePath, String propsFilePath, List<String> configs) {
        HoodieCleaner.Config cfg = new HoodieCleaner.Config();
        cfg.basePath = basePath;
        cfg.propsFilePath = propsFilePath;
        cfg.configs = configs;
        new HoodieCleaner(cfg, jsc).run();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) {
        try (SparkRDDWriteClient client = SparkMain.createHoodieClient(jsc, basePath, false);){
            HoodieWriteConfig config = client.getConfig();
            HoodieEngineContext context = client.getEngineContext();
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)context);
            WriteMarkersFactory.get((MarkerType)config.getMarkersType(), (HoodieTable)table, (String)instantTime).quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
            int n = 0;
            return n;
        }
        catch (Exception e) {
            LOG.warn(String.format("Failed: Could not clean marker instantTime: \"%s\".", instantTime), (Throwable)e);
            return -1;
        }
    }

    private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, int retry, String propsFilePath, List<String> configs) {
        HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
        cfg.command = command;
        cfg.srcPath = srcPath;
        cfg.targetPath = targetPath;
        cfg.tableName = tableName;
        cfg.tableType = tableType;
        cfg.rowKey = rowKey;
        cfg.partitionKey = partitionKey;
        cfg.parallelism = parallelism;
        cfg.schemaFile = schemaFile;
        cfg.propsFilePath = propsFilePath;
        cfg.configs = configs;
        return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
    }

    private static void doCompactValidate(JavaSparkContext jsc, String basePath, String compactionInstant, String outputPath, int parallelism) throws Exception {
        HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
        cfg.basePath = basePath;
        cfg.operation = HoodieCompactionAdminTool.Operation.VALIDATE;
        cfg.outputPath = outputPath;
        cfg.compactionInstantTime = compactionInstant;
        cfg.parallelism = parallelism;
        new HoodieCompactionAdminTool(cfg).run(jsc);
    }

    private static void doCompactRepair(JavaSparkContext jsc, String basePath, String compactionInstant, String outputPath, int parallelism, boolean dryRun) throws Exception {
        HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
        cfg.basePath = basePath;
        cfg.operation = HoodieCompactionAdminTool.Operation.REPAIR;
        cfg.outputPath = outputPath;
        cfg.compactionInstantTime = compactionInstant;
        cfg.parallelism = parallelism;
        cfg.dryRun = dryRun;
        new HoodieCompactionAdminTool(cfg).run(jsc);
    }

    private static void doCompactUnschedule(JavaSparkContext jsc, String basePath, String compactionInstant, String outputPath, int parallelism, boolean skipValidation, boolean dryRun) throws Exception {
        HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
        cfg.basePath = basePath;
        cfg.operation = HoodieCompactionAdminTool.Operation.UNSCHEDULE_PLAN;
        cfg.outputPath = outputPath;
        cfg.compactionInstantTime = compactionInstant;
        cfg.parallelism = parallelism;
        cfg.dryRun = dryRun;
        cfg.skipValidation = skipValidation;
        new HoodieCompactionAdminTool(cfg).run(jsc);
    }

    private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, String partitionPath, String outputPath, int parallelism, boolean skipValidation, boolean dryRun) throws Exception {
        HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
        cfg.basePath = basePath;
        cfg.operation = HoodieCompactionAdminTool.Operation.UNSCHEDULE_FILE;
        cfg.outputPath = outputPath;
        cfg.partitionPath = partitionPath;
        cfg.fileId = fileId;
        cfg.parallelism = parallelism;
        cfg.dryRun = dryRun;
        cfg.skipValidation = skipValidation;
        new HoodieCompactionAdminTool(cfg).run(jsc);
    }

    private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, int parallelism, String schemaFile, int retry, String mode, String propsFilePath, List<String> configs) {
        HoodieCompactor.Config cfg = new HoodieCompactor.Config();
        cfg.basePath = basePath;
        cfg.tableName = tableName;
        cfg.compactionInstantTime = compactionInstant;
        cfg.strategyClassName = UnBoundedCompactionStrategy.class.getCanonicalName();
        cfg.parallelism = parallelism;
        cfg.schemaFile = schemaFile;
        cfg.runningMode = mode;
        cfg.propsFilePath = propsFilePath;
        cfg.configs = configs;
        return new HoodieCompactor(jsc, cfg).compact(retry);
    }

    private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant, int parallelism, String sparkMemory, int retry, String runningMode, String propsFilePath, List<String> configs) {
        HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config();
        cfg.basePath = basePath;
        cfg.tableName = tableName;
        cfg.clusteringInstantTime = clusteringInstant;
        cfg.parallelism = parallelism;
        cfg.runningMode = runningMode;
        cfg.propsFilePath = propsFilePath;
        cfg.configs = configs;
        jsc.getConf().set("spark.executor.memory", sparkMemory);
        return new HoodieClusteringJob(jsc, cfg).cluster(retry);
    }

    private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, String repairedOutputPath, String basePath, boolean dryRun, String dedupeType) {
        DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), FSUtils.getFs((String)basePath, (Configuration)jsc.hadoopConfiguration()), DeDupeType.withName((String)dedupeType));
        job.fixDuplicates(dryRun);
        return 0;
    }

    public static int repairDeprecatedPartition(JavaSparkContext jsc, String basePath) {
        SQLContext sqlContext = new SQLContext(jsc);
        Dataset<Row> recordsToRewrite = SparkMain.getRecordsToRewrite(basePath, "default", sqlContext);
        if (!recordsToRewrite.isEmpty()) {
            recordsToRewrite.cache();
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
            Map<String, String> propsMap = SparkMain.getPropsForRewrite(metaClient);
            SparkMain.rewriteRecordsToNewPartition(basePath, "__HIVE_DEFAULT_PARTITION__", recordsToRewrite, metaClient, propsMap);
            SparkMain.deleteOlderPartition(basePath, "default", recordsToRewrite, propsMap);
        }
        return 0;
    }

    public static int renamePartition(JavaSparkContext jsc, String basePath, String oldPartition, String newPartition) {
        SQLContext sqlContext = new SQLContext(jsc);
        Dataset<Row> recordsToRewrite = SparkMain.getRecordsToRewrite(basePath, oldPartition, sqlContext);
        if (!recordsToRewrite.isEmpty()) {
            recordsToRewrite.cache();
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
            Map<String, String> propsMap = SparkMain.getPropsForRewrite(metaClient);
            SparkMain.rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap);
            SparkMain.deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap);
            FileSystem fs = FSUtils.getFs((Path)new Path(basePath), (Configuration)metaClient.getHadoopConf());
            try {
                fs.delete(new Path(basePath, oldPartition), true);
            }
            catch (IOException e) {
                LOG.warn("Failed to delete older partition " + basePath);
            }
        }
        return 0;
    }

    private static void deleteOlderPartition(String basePath, String oldPartition, Dataset<Row> recordsToRewrite, Map<String, String> propsMap) {
        propsMap.put("hoodie.datasource.write.partitions.to.delete", oldPartition);
        recordsToRewrite.write().options(propsMap).option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value()).format("hudi").mode("Append").save(basePath);
    }

    private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) {
        String partitionFieldProp = metaClient.getTableConfig().getPartitionFieldProp();
        StructType structType = recordsToRewrite.schema();
        int partitionIndex = structType.fieldIndex(partitionFieldProp);
        recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(null).cast(structType.apply(partitionIndex).dataType())).write().options(propsMap).option("hoodie.datasource.write.operation", WriteOperationType.BULK_INSERT.value()).format("hudi").mode("Append").save(basePath);
    }

    private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) {
        return sqlContext.read().format("hudi").load(basePath + "/" + oldPartition).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD).drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
    }

    private static Map<String, String> getPropsForRewrite(HoodieTableMetaClient metaClient) {
        HashMap<String, String> propsMap = new HashMap<String, String>();
        metaClient.getTableConfig().getProps().forEach((k, v) -> propsMap.put(k.toString(), v.toString()));
        propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), "true");
        propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), metaClient.getTableConfig().getRecordKeyFieldProp());
        propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), metaClient.getTableConfig().getPartitionFieldProp());
        propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), metaClient.getTableConfig().getKeyGeneratorClassName());
        return propsMap;
    }

    private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath, String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass, String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider, String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
        TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs) : UtilHelpers.readConfig((Configuration)jsc.hadoopConfiguration(), (Path)new Path(propsFilePath), configs).getProps(true);
        properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);
        if (!StringUtils.isNullOrEmpty((String)keyGenerator) && KeyGeneratorType.getNames().contains(keyGenerator.toUpperCase(Locale.ROOT))) {
            properties.setProperty(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenerator.toUpperCase(Locale.ROOT));
        } else {
            properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), keyGenerator);
        }
        properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME.key(), fullBootstrapInputProvider);
        properties.setProperty(HoodieBootstrapConfig.PARALLELISM_VALUE.key(), parallelism);
        properties.setProperty(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), selectorClass);
        properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKeyCols);
        properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), partitionFields);
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.targetTableName = tableName;
        cfg.targetBasePath = basePath;
        cfg.tableType = tableType;
        cfg.schemaProviderClassName = schemaProviderClass;
        cfg.bootstrapIndexClass = bootstrapIndexClass;
        cfg.payloadClassName = payloadClassName;
        cfg.enableHiveSync = Boolean.valueOf(enableHiveSync);
        new BootstrapExecutor(cfg, jsc, FSUtils.getFs((String)basePath, (Configuration)jsc.hadoopConfiguration()), jsc.hadoopConfiguration(), properties).execute();
        return 0;
    }

    private static int rollback(JavaSparkContext jsc, String instantTime, String basePath, Boolean rollbackUsingMarkers) throws Exception {
        SparkRDDWriteClient client = SparkMain.createHoodieClient(jsc, basePath, rollbackUsingMarkers, false);
        if (client.rollback(instantTime)) {
            LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
            return 0;
        }
        LOG.warn(String.format("The commit \"%s\" failed to roll back.", instantTime));
        return -1;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user, String comments, String basePath) throws Exception {
        try (SparkRDDWriteClient client = SparkMain.createHoodieClient(jsc, basePath, false);){
            client.savepoint(commitTime, user, comments);
            LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime));
            int n = 0;
            return n;
        }
        catch (HoodieSavepointException se) {
            LOG.warn(String.format("Failed: Could not create savepoint \"%s\".", commitTime));
            return -1;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath, boolean lazyCleanPolicy) throws Exception {
        try (SparkRDDWriteClient client = SparkMain.createHoodieClient(jsc, basePath, lazyCleanPolicy);){
            client.restoreToSavepoint(savepointTime);
            LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
            int n = 0;
            return n;
        }
        catch (Exception e) {
            LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime), (Throwable)e);
            return -1;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
        try (SparkRDDWriteClient client = SparkMain.createHoodieClient(jsc, basePath, false);){
            client.deleteSavepoint(savepointTime);
            LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
            int n = 0;
            return n;
        }
        catch (Exception e) {
            LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime), (Throwable)e);
            return -1;
        }
    }

    protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
        HoodieWriteConfig config = SparkMain.getWriteConfig(basePath, Boolean.parseBoolean((String)HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()), false);
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath()).setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()).setLayoutVersion(Option.of((Object)new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
        HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().withProps((Map)config.getProps()).forTable(metaClient.getTableConfig().getTableName()).build();
        try {
            new UpgradeDowngrade(metaClient, updatedConfig, (HoodieEngineContext)new HoodieSparkEngineContext(jsc), (SupportsUpgradeDowngrade)SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.valueOf((String)toVersion), null);
            LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion));
            return 0;
        }
        catch (Exception e) {
            LOG.warn(String.format("Failed: Could not upgrade/downgrade table at \"%s\" to version \"%s\".", basePath, toVersion), (Throwable)e);
            return -1;
        }
    }

    private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) throws Exception {
        HoodieWriteConfig config = SparkMain.getWriteConfig(basePath, rollbackUsingMarkers, lazyCleanPolicy);
        return new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(jsc), config);
    }

    private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, boolean lazyCleanPolicy) throws Exception {
        return SparkMain.createHoodieClient(jsc, basePath, Boolean.parseBoolean((String)HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()), lazyCleanPolicy);
    }

    private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withRollbackUsingMarkers(rollbackUsingMarkers.booleanValue()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY : HoodieFailedWritesCleaningPolicy.EAGER).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
    }

    private static int archive(JavaSparkContext jsc, int minCommits, int maxCommits, int commitsRetained, boolean enableMetadata, String basePath) {
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits, maxCommits).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build()).withEmbeddedTimelineServerEnabled(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()).build();
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)context);
        try {
            HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, (HoodieTable)table);
            archiver.archiveIfRequired((HoodieEngineContext)context, true);
        }
        catch (IOException ioe) {
            LOG.error("Failed to archive with IOException: " + ioe);
            return -1;
        }
        return 0;
    }

    static enum SparkCommand {
        BOOTSTRAP,
        ROLLBACK,
        DEDUPLICATE,
        ROLLBACK_TO_SAVEPOINT,
        SAVEPOINT,
        IMPORT,
        UPSERT,
        COMPACT_SCHEDULE,
        COMPACT_RUN,
        COMPACT_SCHEDULE_AND_EXECUTE,
        COMPACT_UNSCHEDULE_PLAN,
        COMPACT_UNSCHEDULE_FILE,
        COMPACT_VALIDATE,
        COMPACT_REPAIR,
        CLUSTERING_SCHEDULE,
        CLUSTERING_RUN,
        CLUSTERING_SCHEDULE_AND_EXECUTE,
        CLEAN,
        DELETE_MARKER,
        DELETE_SAVEPOINT,
        UPGRADE,
        DOWNGRADE,
        REPAIR_DEPRECATED_PARTITION,
        RENAME_PARTITION,
        ARCHIVE;

    }
}

