/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.IValueValidator;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.com.beust.jcommander.ParameterException;
import org.apache.hudi.common.HoodieJsonPayload;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;

public class HDFSParquetImporter
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LogManager.getLogger(HDFSParquetImporter.class);
    private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd").withZone(ZoneId.systemDefault());
    private final Config cfg;
    private transient FileSystem fs;
    private TypedProperties props;

    public HDFSParquetImporter(Config cfg) {
        this.cfg = cfg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args2) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
        JavaSparkContext jssc = UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
        try {
            dataImporter.dataImport(jssc, cfg.retry);
        }
        finally {
            jssc.stop();
        }
    }

    private boolean isUpsert() {
        return "upsert".equalsIgnoreCase(this.cfg.command);
    }

    public int dataImport(JavaSparkContext jsc, int retry) {
        this.fs = FSUtils.getFs(this.cfg.targetPath, jsc.hadoopConfiguration());
        this.props = this.cfg.propsFilePath == null ? UtilHelpers.buildProperties(this.cfg.configs) : UtilHelpers.readConfig(this.fs.getConf(), new Path(this.cfg.propsFilePath), this.cfg.configs).getProps(true);
        LOG.info((Object)("Starting data import with configs : " + this.props.toString()));
        int ret = -1;
        try {
            if (this.fs.exists(new Path(this.cfg.targetPath)) && !this.isUpsert()) {
                throw new HoodieIOException(String.format("Make sure %s is not present.", this.cfg.targetPath));
            }
            while ((ret = this.dataImport(jsc)) != 0 && retry-- > 0) {
            }
        }
        catch (Throwable t) {
            LOG.error((Object)t);
        }
        return ret;
    }

    protected int dataImport(JavaSparkContext jsc) throws IOException {
        try {
            if (this.fs.exists(new Path(this.cfg.targetPath)) && !this.isUpsert()) {
                this.fs.delete(new Path(this.cfg.targetPath), true);
            }
            if (!this.fs.exists(new Path(this.cfg.targetPath))) {
                Properties properties = HoodieTableMetaClient.withPropertyBuilder().setTableName(this.cfg.tableName).setTableType(this.cfg.tableType).build();
                HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), this.cfg.targetPath, properties);
            }
            String schemaStr = UtilHelpers.parseSchema(this.fs, this.cfg.schemaFile);
            SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.targetPath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);
            JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = this.buildHoodieRecordsForImport(jsc, schemaStr);
            String instantTime = client.startCommit();
            JavaRDD<WriteStatus> writeResponse = this.load(client, instantTime, hoodieRecords);
            return UtilHelpers.handleErrors(jsc, instantTime, writeResponse);
        }
        catch (Throwable t) {
            LOG.error((Object)"Error occurred.", t);
            return -1;
        }
    }

    protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext jsc, String schemaStr) throws IOException {
        Job job = Job.getInstance((Configuration)jsc.hadoopConfiguration());
        job.getConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
        job.getConfiguration().set("mapreduce.input.fileinputformat.list-status.num-threads", "1024");
        AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), new Schema.Parser().parse(schemaStr));
        ParquetInputFormat.setReadSupportClass((Job)job, AvroReadSupport.class);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + this.cfg.tableName);
        return jsc.newAPIHadoopFile(this.cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()).coalesce(16 * this.cfg.parallelism).map((Function & Serializable)entry -> {
            GenericRecord genericRecord = (GenericRecord)((Tuple2)entry)._2();
            Object partitionField = genericRecord.get(this.cfg.partitionKey);
            if (partitionField == null) {
                throw new HoodieIOException("partition key is missing. :" + this.cfg.partitionKey);
            }
            Object rowField = genericRecord.get(this.cfg.rowKey);
            if (rowField == null) {
                throw new HoodieIOException("row field is missing. :" + this.cfg.rowKey);
            }
            String partitionPath = partitionField.toString();
            LOG.debug((Object)("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"));
            if (partitionField instanceof Number) {
                try {
                    long ts = (long)(Double.parseDouble(partitionField.toString()) * 1000.0);
                    partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
                }
                catch (NumberFormatException nfe) {
                    LOG.warn((Object)("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"));
                }
            }
            return new HoodieAvroRecord<HoodieJsonPayload>(new HoodieKey(rowField.toString(), partitionPath), new HoodieJsonPayload(genericRecord.toString()));
        });
    }

    protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> client, String instantTime, JavaRDD<HoodieRecord<T>> hoodieRecords) {
        switch (this.cfg.command.toLowerCase()) {
            case "upsert": {
                return client.upsert(hoodieRecords, instantTime);
            }
            case "bulkinsert": {
                return client.bulkInsert(hoodieRecords, instantTime);
            }
        }
        return client.insert(hoodieRecords, instantTime);
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--command", "-c"}, description="Write command Valid values are insert(default)/upsert/bulkinsert", validateValueWith={CommandValidator.class})
        public String command = "INSERT";
        @Parameter(names={"--src-path", "-sp"}, description="Base path for the input table", required=true)
        public String srcPath = null;
        @Parameter(names={"--target-path", "-tp"}, description="Base path for the target hoodie table", required=true)
        public String targetPath = null;
        @Parameter(names={"--table-name", "-tn"}, description="Table name", required=true)
        public String tableName = null;
        @Parameter(names={"--table-type", "-tt"}, description="Table type", required=true)
        public String tableType = null;
        @Parameter(names={"--row-key-field", "-rk"}, description="Row key field name", required=true)
        public String rowKey = null;
        @Parameter(names={"--partition-key-field", "-pk"}, description="Partition key field name", required=true)
        public String partitionKey = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for hoodie insert(default)/upsert/bulkinsert", required=true)
        public int parallelism = 1;
        @Parameter(names={"--schema-file", "-sf"}, description="path for Avro schema file", required=true)
        public String schemaFile = null;
        @Parameter(names={"--format", "-f"}, description="Format for the input data.", validateValueWith={FormatValidator.class})
        public String format = null;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master")
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=true)
        public String sparkMemory = null;
        @Parameter(names={"--retry", "-rt"}, description="number of retries")
        public int retry = 0;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for importing")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
    }

    public static class FormatValidator
    implements IValueValidator<String> {
        List<String> validFormats = Collections.singletonList("parquet");

        @Override
        public void validate(String name, String value) {
            if (value == null || !this.validFormats.contains(value)) {
                throw new ParameterException(String.format("Invalid format type: value:%s: supported formats:%s", value, this.validFormats));
            }
        }
    }

    public static class CommandValidator
    implements IValueValidator<String> {
        List<String> validCommands = Arrays.asList("insert", "upsert", "bulkinsert");

        @Override
        public void validate(String name, String value) {
            if (value == null || !this.validCommands.contains(value.toLowerCase())) {
                throw new ParameterException(String.format("Invalid command: value:%s: supported commands:%s", value, this.validCommands));
            }
        }
    }
}

