/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.cli;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieJsonPayload;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class HDFSParquetImporterUtils
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSParquetImporterUtils.class);
    private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd").withZone(ZoneId.systemDefault());
    private final String command;
    private final String srcPath;
    private final String targetPath;
    private final String tableName;
    private final String tableType;
    private final String rowKey;
    private final String partitionKey;
    private final int parallelism;
    private final String schemaFile;
    private int retry;
    private final String propsFilePath;
    private final List<String> configs = new ArrayList<String>();
    private TypedProperties props;

    public HDFSParquetImporterUtils(String command, String srcPath, String targetPath, String tableName, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, int retry, String propsFilePath) {
        this.command = command;
        this.srcPath = srcPath;
        this.targetPath = targetPath;
        this.tableName = tableName;
        this.tableType = tableType;
        this.rowKey = rowKey;
        this.partitionKey = partitionKey;
        this.parallelism = parallelism;
        this.schemaFile = schemaFile;
        this.retry = retry;
        this.propsFilePath = propsFilePath;
    }

    public boolean isUpsert() {
        return "upsert".equalsIgnoreCase(this.command);
    }

    public int dataImport(JavaSparkContext jsc) {
        FileSystem fs = HadoopFSUtils.getFs(this.targetPath, jsc.hadoopConfiguration());
        this.props = this.propsFilePath == null || this.propsFilePath.isEmpty() ? HDFSParquetImporterUtils.buildProperties(this.configs) : HDFSParquetImporterUtils.readConfig(fs.getConf(), new StoragePath(this.propsFilePath), this.configs).getProps(true);
        LOG.info("Starting data import with configs : " + this.props.toString());
        int ret = -1;
        try {
            if (fs.exists(new Path(this.targetPath)) && !this.isUpsert()) {
                throw new HoodieIOException(String.format("Make sure %s is not present.", this.targetPath));
            }
            while ((ret = this.dataImport(jsc, fs)) != 0 && this.retry-- > 0) {
            }
        }
        catch (Throwable t) {
            LOG.error("dataImport failed", t);
        }
        return ret;
    }

    public int dataImport(JavaSparkContext jsc, FileSystem fs) {
        try {
            if (fs.exists(new Path(this.targetPath)) && !this.isUpsert()) {
                fs.delete(new Path(this.targetPath), true);
            }
            if (!fs.exists(new Path(this.targetPath))) {
                HoodieTableMetaClient.newTableBuilder().setTableName(this.tableName).setTableType(this.tableType).initTable(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()), this.targetPath);
            }
            String schemaStr = HDFSParquetImporterUtils.parseSchema(fs, this.schemaFile);
            SparkRDDWriteClient<HoodieRecordPayload> client = HDFSParquetImporterUtils.createHoodieClient(jsc, this.targetPath, schemaStr, this.parallelism, Option.empty(), this.props);
            JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = this.buildHoodieRecordsForImport(jsc, schemaStr);
            String instantTime = client.startCommit();
            JavaRDD<WriteStatus> writeResponse = this.load(client, instantTime, hoodieRecords);
            return HDFSParquetImporterUtils.handleErrors(jsc, instantTime, writeResponse);
        }
        catch (Throwable t) {
            LOG.error("Error occurred.", t);
            return -1;
        }
    }

    public JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext jsc, String schemaStr) throws IOException {
        Job job = Job.getInstance((Configuration)jsc.hadoopConfiguration());
        job.getConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
        job.getConfiguration().set("mapreduce.input.fileinputformat.list-status.num-threads", "1024");
        AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), new Schema.Parser().parse(schemaStr));
        ParquetInputFormat.setReadSupportClass((Job)job, AvroReadSupport.class);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + this.tableName);
        return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()).coalesce(16 * this.parallelism).map((Function & Serializable)entry -> {
            GenericRecord genericRecord = (GenericRecord)((Tuple2)entry)._2();
            Object partitionField = genericRecord.get(this.partitionKey);
            if (partitionField == null) {
                throw new HoodieIOException("partition key is missing. :" + this.partitionKey);
            }
            Object rowField = genericRecord.get(this.rowKey);
            if (rowField == null) {
                throw new HoodieIOException("row field is missing. :" + this.rowKey);
            }
            String partitionPath = partitionField.toString();
            LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
            if (partitionField instanceof Number) {
                try {
                    long ts = (long)(Double.parseDouble(partitionField.toString()) * 1000.0);
                    partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
                }
                catch (NumberFormatException nfe) {
                    LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
                }
            }
            return new HoodieAvroRecord<HoodieJsonPayload>(new HoodieKey(rowField.toString(), partitionPath), new HoodieJsonPayload(genericRecord.toString()));
        });
    }

    public <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> client, String instantTime, JavaRDD<HoodieRecord<T>> hoodieRecords) {
        switch (this.command.toLowerCase()) {
            case "upsert": {
                return client.upsert(hoodieRecords, instantTime);
            }
            case "bulkinsert": {
                return client.bulkInsert(hoodieRecords, instantTime);
            }
        }
        return client.insert(hoodieRecords, instantTime);
    }

    public static TypedProperties buildProperties(List<String> props) {
        TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps();
        props.forEach(x -> {
            String[] kv = x.split("=");
            ValidationUtils.checkArgument(kv.length == 2);
            properties.setProperty(kv[0], kv[1]);
        });
        return properties;
    }

    public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, StoragePath cfgPath, List<String> overriddenProps) {
        DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
        try {
            if (!overriddenProps.isEmpty()) {
                LOG.info("Adding overridden properties to file properties.");
                conf.addPropsFromStream(new BufferedReader(new StringReader(String.join((CharSequence)"\n", overriddenProps))), cfgPath);
            }
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unexpected error adding config overrides", ioe);
        }
        return conf;
    }

    public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
        Option<CompactionStrategy> strategyOpt = compactionStrategyClass.map(ReflectionUtils::loadClass);
        HoodieCompactionConfig compactionConfig = strategyOpt.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withCompactionStrategy((CompactionStrategy)strategy).build()).orElseGet(() -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withParallelism(parallelism, parallelism).withBulkInsertParallelism(parallelism).withDeleteParallelism(parallelism).withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withProps(properties).build();
        return new SparkRDDWriteClient<HoodieRecordPayload>(new HoodieSparkEngineContext(jsc), config);
    }

    public static String parseSchema(FileSystem fs, String schemaFile) throws Exception {
        Path p = new Path(schemaFile);
        if (!fs.exists(p)) {
            throw new Exception(String.format("Could not find - %s - schema file.", schemaFile));
        }
        long len = fs.getFileStatus(p).getLen();
        ByteBuffer buf = ByteBuffer.allocate((int)len);
        try (FSDataInputStream inputStream = fs.open(p);){
            inputStream.readFully(0L, buf.array(), 0, buf.array().length);
        }
        return StringUtils.fromUTF8Bytes(buf.array());
    }

    public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) {
        LongAccumulator errors2 = jsc.sc().longAccumulator();
        writeResponse.foreach((VoidFunction & Serializable)writeStatus -> {
            if (writeStatus.hasErrors()) {
                errors2.add(1L);
                LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
            }
        });
        if (errors2.value() == 0L) {
            LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime));
            return 0;
        }
        LOG.error(String.format("Import failed with %d errors.", errors2.value()));
        return -1;
    }
}

