/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudObjectsSelectorCommon {
    private static final Logger LOG = LoggerFactory.getLogger(CloudObjectsSelectorCommon.class);
    public static final String S3_OBJECT_KEY = "s3.object.key";
    public static final String S3_OBJECT_SIZE = "s3.object.size";
    public static final String S3_BUCKET_NAME = "s3.bucket.name";
    public static final String GCS_OBJECT_KEY = "name";
    public static final String GCS_OBJECT_SIZE = "size";
    private static final String SPACE_DELIMTER = " ";
    private static final String GCS_PREFIX = "gs://";
    private final TypedProperties properties;

    public CloudObjectsSelectorCommon(TypedProperties properties) {
        this.properties = properties;
    }

    public static MapPartitionsFunction<Row, CloudObjectMetadata> getCloudObjectMetadataPerPartition(String storageUrlSchemePrefix, StorageConfiguration<Configuration> storageConf, boolean checkIfExists) {
        return (MapPartitionsFunction & Serializable)rows -> {
            ArrayList cloudObjectMetadataPerPartition = new ArrayList();
            rows.forEachRemaining(row -> {
                Option<String> filePathUrl = CloudObjectsSelectorCommon.getUrlForFile(row, storageUrlSchemePrefix, storageConf, checkIfExists);
                filePathUrl.ifPresent(url2 -> {
                    long size;
                    LOG.info("Adding file: " + url2);
                    Object obj = row.get(2);
                    if (obj instanceof String) {
                        size = Long.parseLong((String)obj);
                    } else if (obj instanceof Integer) {
                        size = ((Integer)obj).longValue();
                    } else if (obj instanceof Long) {
                        size = (Long)obj;
                    } else {
                        throw new HoodieIOException("unexpected object size's type in Cloud storage events: " + obj.getClass());
                    }
                    cloudObjectMetadataPerPartition.add(new CloudObjectMetadata((String)url2, size));
                });
            });
            return cloudObjectMetadataPerPartition.iterator();
        };
    }

    private static Option<String> getUrlForFile(Row row, String storageUrlSchemePrefix, StorageConfiguration<Configuration> storageConf, boolean checkIfExists) {
        Configuration configuration = storageConf.unwrapCopy();
        String bucket = row.getString(0);
        String filePath = storageUrlSchemePrefix + bucket + "/" + row.getString(1);
        try {
            String filePathUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
            if (!checkIfExists) {
                return Option.of(filePathUrl);
            }
            boolean exists = CloudObjectsSelectorCommon.checkIfFileExists(storageUrlSchemePrefix, bucket, filePathUrl, configuration);
            return exists ? Option.of(filePathUrl) : Option.empty();
        }
        catch (Exception exception) {
            LOG.warn(String.format("Failed to generate path to cloud file %s", filePath), (Throwable)exception);
            throw new HoodieException(String.format("Failed to generate path to cloud file %s", filePath), exception);
        }
    }

    private static boolean checkIfFileExists(String storageUrlSchemePrefix, String bucket, String filePathUrl, Configuration configuration) {
        try {
            FileSystem fs = HadoopFSUtils.getFs(storageUrlSchemePrefix + bucket, configuration);
            return fs.exists(new Path(filePathUrl));
        }
        catch (IOException ioe) {
            String errMsg = String.format("Error while checking path exists for %s ", filePathUrl);
            LOG.error(errMsg, (Throwable)ioe);
            throw new HoodieIOException(errMsg, ioe);
        }
    }

    public static String generateFilter(Type type, TypedProperties props) {
        String objectSizeKey;
        String objectKey;
        String fileFormat = CloudDataFetcher.getFileFormat(props);
        Option<String> selectRelativePathPrefix = CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX);
        Option<String> ignoreRelativePathPrefix = CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX);
        Option<String> ignoreRelativePathSubStr = CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR);
        if (type.equals((Object)Type.S3)) {
            objectKey = S3_OBJECT_KEY;
            objectSizeKey = S3_OBJECT_SIZE;
            selectRelativePathPrefix = selectRelativePathPrefix.or(() -> CloudObjectsSelectorCommon.getPropVal(props, S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX));
            ignoreRelativePathPrefix = ignoreRelativePathPrefix.or(() -> CloudObjectsSelectorCommon.getPropVal(props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX));
            ignoreRelativePathSubStr = ignoreRelativePathSubStr.or(() -> CloudObjectsSelectorCommon.getPropVal(props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING));
        } else {
            objectKey = GCS_OBJECT_KEY;
            objectSizeKey = GCS_OBJECT_SIZE;
        }
        StringBuilder filter = new StringBuilder(String.format("%s > 0", objectSizeKey));
        if (selectRelativePathPrefix.isPresent()) {
            filter.append(SPACE_DELIMTER).append(String.format("and %s like '%s%%'", objectKey, selectRelativePathPrefix.get()));
        }
        if (ignoreRelativePathPrefix.isPresent()) {
            filter.append(SPACE_DELIMTER).append(String.format("and %s not like '%s%%'", objectKey, ignoreRelativePathPrefix.get()));
        }
        if (ignoreRelativePathSubStr.isPresent()) {
            filter.append(SPACE_DELIMTER).append(String.format("and %s not like '%%%s%%'", objectKey, ignoreRelativePathSubStr.get()));
        }
        CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat)).map(val -> filter.append(SPACE_DELIMTER).append(String.format("and %s like '%%%s'", objectKey, val)));
        return filter.toString();
    }

    public static List<CloudObjectMetadata> getObjectMetadata(Type type, JavaSparkContext jsc, Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists, TypedProperties props) {
        StorageConfiguration<Configuration> storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
        if (type == Type.GCS) {
            return cloudObjectMetadataDF.select("bucket", new String[]{GCS_OBJECT_KEY, GCS_OBJECT_SIZE}).distinct().mapPartitions(CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition(GCS_PREFIX, storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)).collectAsList();
        }
        if (type == Type.S3) {
            String s3FS = ConfigUtils.getStringWithAltKeys(props, S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX, true).toLowerCase();
            String s3Prefix = s3FS + "://";
            return cloudObjectMetadataDF.select(S3_BUCKET_NAME, new String[]{S3_OBJECT_KEY, S3_OBJECT_SIZE}).distinct().mapPartitions(CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition(s3Prefix, storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)).collectAsList();
        }
        throw new UnsupportedOperationException("Invalid cloud type " + (Object)((Object)type));
    }

    public Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, String fileFormat, Option<SchemaProvider> schemaProviderOption, int numPartitions) {
        Schema sourceSchema;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Extracted distinct files " + cloudObjectMetadata.size() + " and some samples " + cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10L).collect(Collectors.toList()));
        }
        if (CollectionUtils.isNullOrEmpty(cloudObjectMetadata)) {
            return Option.empty();
        }
        DataFrameReader reader = spark.read().format(fileFormat);
        String datasourceOpts = ConfigUtils.getStringWithAltKeys(this.properties, CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
        if (schemaProviderOption.isPresent() && (sourceSchema = schemaProviderOption.get().getSourceSchema()) != null && !sourceSchema.equals((Object)InputBatch.NULL_SCHEMA)) {
            reader = reader.schema(AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema));
        }
        if (StringUtils.isNullOrEmpty(datasourceOpts)) {
            datasourceOpts = ConfigUtils.getStringWithAltKeys(this.properties, S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
        }
        if (StringUtils.nonEmpty(datasourceOpts)) {
            ObjectMapper mapper = new ObjectMapper();
            Object sparkOptionsMap = null;
            try {
                sparkOptionsMap = (Map)mapper.readValue(datasourceOpts, Map.class);
            }
            catch (IOException e) {
                throw new HoodieException(String.format("Failed to parse sparkOptions: %s", datasourceOpts), e);
            }
            LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
            reader = reader.options((Map)sparkOptionsMap);
        }
        ArrayList<String> paths = new ArrayList<String>();
        for (CloudObjectMetadata o : cloudObjectMetadata) {
            paths.add(o.getPath());
        }
        boolean isCommaSeparatedPathFormat = this.properties.getBoolean(CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), false);
        Dataset dataset = isCommaSeparatedPathFormat ? reader.load(String.join((CharSequence)",", paths)) : reader.load(paths.toArray(new String[cloudObjectMetadata.size()]));
        if (ConfigUtils.containsConfigProperty(this.properties, CloudSourceConfig.PATH_BASED_PARTITION_FIELDS)) {
            String[] partitionKeysToAdd;
            for (String partitionKey : partitionKeysToAdd = ConfigUtils.getStringWithAltKeys(this.properties, CloudSourceConfig.PATH_BASED_PARTITION_FIELDS).split(",")) {
                String partitionPathPattern = String.format("%s=", partitionKey);
                LOG.info(String.format("Adding column %s to dataset", partitionKey));
                dataset = dataset.withColumn(partitionKey, functions.split((Column)functions.split((Column)functions.input_file_name(), (String)partitionPathPattern).getItem((Object)1), (String)"/").getItem((Object)0));
            }
        }
        dataset = CloudObjectsSelectorCommon.coalesceOrRepartition(dataset, numPartitions);
        return Option.of(dataset);
    }

    private static Dataset<Row> coalesceOrRepartition(Dataset dataset, int numPartitions) {
        int existingNumPartitions = dataset.rdd().getNumPartitions();
        LOG.info(String.format("existing number of partitions=%d, required number of partitions=%d", existingNumPartitions, numPartitions));
        dataset = existingNumPartitions < numPartitions ? dataset.repartition(numPartitions) : dataset.coalesce(numPartitions);
        return dataset;
    }

    private static Option<String> getPropVal(TypedProperties props, ConfigProperty<String> configProperty) {
        String value = ConfigUtils.getStringWithAltKeys(props, configProperty, true);
        if (!StringUtils.isNullOrEmpty(value)) {
            return Option.of(value);
        }
        return Option.empty();
    }

    public static enum Type {
        S3,
        GCS;

    }
}

