/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieStreamingSink;
import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class DataSourceUtils {
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceUtils.class);

    public static String getTablePath(HoodieStorage storage, List<StoragePath> userProvidedPaths) throws IOException {
        LOG.info("Getting table path..");
        for (StoragePath path : userProvidedPaths) {
            try {
                Option tablePath = TablePathUtils.getTablePath((HoodieStorage)storage, (StoragePath)path);
                if (!tablePath.isPresent()) continue;
                return ((StoragePath)tablePath.get()).toString();
            }
            catch (HoodieException he) {
                LOG.warn("Error trying to get table path from {}", (Object)path.toString(), (Object)he);
            }
        }
        throw new TableNotFoundException(userProvidedPaths.stream().map(StoragePath::toString).collect(Collectors.joining(",")));
    }

    public static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config) throws HoodieException {
        String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass();
        try {
            return StringUtils.isNullOrEmpty((String)bulkInsertPartitionerClass) ? Option.empty() : Option.of((Object)((BulkInsertPartitioner)ReflectionUtils.loadClass((String)bulkInsertPartitionerClass, (Object[])new Object[]{config})));
        }
        catch (Throwable e) {
            throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e);
        }
    }

    public static Option<BulkInsertPartitioner<Dataset<Row>>> createUserDefinedBulkInsertPartitionerWithRows(HoodieWriteConfig config) throws HoodieException {
        String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass();
        try {
            return StringUtils.isNullOrEmpty((String)bulkInsertPartitionerClass) ? Option.empty() : Option.of((Object)((BulkInsertPartitioner)ReflectionUtils.loadClass((String)bulkInsertPartitionerClass, (Object[])new Object[]{config})));
        }
        catch (Throwable e) {
            throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e);
        }
    }

    public static Map<String, String> getExtraMetadata(Map<String, String> properties) {
        HashMap<String, String> extraMetadataMap = new HashMap<String, String>();
        if (properties.containsKey(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key())) {
            properties.entrySet().forEach(entry -> {
                if (((String)entry.getKey()).startsWith((String)properties.get(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key()))) {
                    extraMetadataMap.put((String)entry.getKey(), (String)entry.getValue());
                }
            });
        }
        if (properties.containsKey(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())) {
            extraMetadataMap.put(HoodieStreamingSink.SINK_CHECKPOINT_KEY(), CommitUtils.getCheckpointValueAsString((String)properties.getOrDefault(DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().key(), (String)DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().defaultValue()), (String)properties.get(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())));
        }
        return extraMetadataMap;
    }

    public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName, Map<String, String> parameters) {
        boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
        boolean inlineCompact = false;
        if (parameters.containsKey(HoodieCompactionConfig.INLINE_COMPACT.key())) {
            inlineCompact = Boolean.parseBoolean(parameters.get(HoodieCompactionConfig.INLINE_COMPACT.key()));
        }
        if (!inlineCompact) {
            inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key()).equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
        }
        boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key()));
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(basePath).combineInput(combineInserts, true);
        if (schemaStr != null) {
            builder = builder.withSchema(schemaStr);
        }
        return builder.forTable(tblName).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(inlineCompact)).build()).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadClass(parameters.getOrDefault(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), parameters.getOrDefault(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), HoodieTableConfig.getDefaultPayloadClassName()))).withPayloadOrderingFields(ConfigUtils.getOrderingFieldsStrDuringWrite(parameters)).build()).withProps(parameters).build();
    }

    public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, String tblName, Map<String, String> parameters) {
        return new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(jssc), DataSourceUtils.createHoodieConfig(schemaStr, basePath, tblName, parameters));
    }

    public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, JavaRDD<HoodieRecord> hoodieRecords, String instantTime, WriteOperationType operation, Boolean isPrepped) throws HoodieException {
        switch (operation) {
            case BULK_INSERT: {
                Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner = DataSourceUtils.createUserDefinedBulkInsertPartitioner(client.getConfig());
                return new HoodieWriteResult(client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner));
            }
            case INSERT: {
                return new HoodieWriteResult(client.insert(hoodieRecords, instantTime));
            }
            case UPSERT: {
                if (isPrepped.booleanValue()) {
                    return new HoodieWriteResult(client.upsertPreppedRecords(hoodieRecords, instantTime));
                }
                return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime));
            }
            case INSERT_OVERWRITE: {
                return client.insertOverwrite(hoodieRecords, instantTime);
            }
            case INSERT_OVERWRITE_TABLE: {
                return client.insertOverwriteTable(hoodieRecords, instantTime);
            }
        }
        throw new HoodieException("Not a valid operation type for doWriteOperation: " + operation);
    }

    public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient client, JavaRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>> hoodieKeysAndLocations, String instantTime, boolean isPrepped) {
        if (isPrepped) {
            HoodieRecord.HoodieRecordType recordType = client.getConfig().getRecordMerger().getRecordType();
            JavaRDD records = hoodieKeysAndLocations.map((Function & Serializable)tuple -> {
                HoodieEmptyRecord record = new HoodieEmptyRecord((HoodieKey)tuple._1, recordType);
                record.setCurrentLocation((HoodieRecordLocation)((Option)tuple._2).get());
                return record;
            });
            return new HoodieWriteResult(client.deletePrepped(records, instantTime));
        }
        return new HoodieWriteResult(client.delete(hoodieKeysAndLocations.map((Function & Serializable)tuple -> (HoodieKey)tuple._1()), instantTime));
    }

    public static HoodieWriteResult doDeletePartitionsOperation(SparkRDDWriteClient client, List<String> partitionsToDelete, String instantTime) {
        return client.deletePartitions(partitionsToDelete, instantTime);
    }

    public static JavaRDD<HoodieRecord> handleDuplicates(HoodieSparkEngineContext engineContext, JavaRDD<HoodieRecord> incomingHoodieRecords, HoodieWriteConfig writeConfig, boolean failOnDuplicates) {
        try {
            SparkRDDReadClient client = new SparkRDDReadClient(engineContext, writeConfig);
            return client.tagLocation(incomingHoodieRecords).filter((Function & Serializable)r -> DataSourceUtils.shouldIncludeRecord((HoodieRecord)r, failOnDuplicates));
        }
        catch (TableNotFoundException e) {
            return incomingHoodieRecords;
        }
    }

    private static boolean shouldIncludeRecord(HoodieRecord<?> record, boolean failOnDuplicates) {
        if (!record.isCurrentLocationKnown()) {
            return true;
        }
        if (failOnDuplicates) {
            throw new HoodieDuplicateKeyException(record.getRecordKey());
        }
        return false;
    }

    public static JavaRDD<HoodieRecord> resolveDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, Map<String, String> parameters, boolean failOnDuplicates) {
        HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
        return DataSourceUtils.handleDuplicates(new HoodieSparkEngineContext(jssc), incomingHoodieRecords, writeConfig, failOnDuplicates);
    }

    static class SparkDataSourceWriteStatusValidator
    implements WriteStatusValidator {
        private final WriteOperationType writeOperationType;
        private final AtomicBoolean hasErrored;

        public SparkDataSourceWriteStatusValidator(WriteOperationType writeOperationType, AtomicBoolean hasErrored) {
            this.writeOperationType = writeOperationType;
            this.hasErrored = hasErrored;
        }

        public boolean validate(long totalRecords, long totalErroredRecords, Option<HoodieData<WriteStatus>> writeStatusesOpt) {
            if (totalErroredRecords > 0L) {
                this.hasErrored.set(true);
                ValidationUtils.checkArgument((boolean)writeStatusesOpt.isPresent(), (String)"RDD <WriteStatus> expected to be present when there are errors");
                long errorCount = HoodieJavaRDD.getJavaRDD((HoodieData)((HoodieData)writeStatusesOpt.get())).filter(WriteStatus::hasErrors).count();
                String errorSummary = String.format("%s operation failed with %d error(s).%n%nTotal write statuses with errors: %d%n%nCheck the driver logs for error stacktraces which provide more information on the failure.", this.writeOperationType, totalErroredRecords, errorCount);
                LOG.error(errorSummary);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Printing out the top 100 errors");
                    HoodieJavaRDD.getJavaRDD((HoodieData)((HoodieData)writeStatusesOpt.get())).filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
                        LOG.trace("Global error:", ws.getGlobalError());
                        if (!ws.getErrors().isEmpty()) {
                            ws.getErrors().forEach((k, v) -> LOG.trace("Error for key {}: {}", k, v));
                        }
                    });
                }
                return false;
            }
            return true;
        }
    }
}

