/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudObjectsSelectorCommon {
    private static final Logger LOG = LoggerFactory.getLogger(CloudObjectsSelectorCommon.class);
    public static final String S3_OBJECT_KEY = "s3.object.key";
    public static final String S3_OBJECT_SIZE = "s3.object.size";
    public static final String S3_BUCKET_NAME = "s3.bucket.name";
    public static final String GCS_OBJECT_KEY = "name";
    public static final String GCS_OBJECT_SIZE = "size";
    private static final String SPACE_DELIMTER = " ";
    private static final String GCS_PREFIX = "gs://";
    private final TypedProperties properties;

    public CloudObjectsSelectorCommon(TypedProperties properties2) {
        this.properties = properties2;
    }

    public static MapPartitionsFunction<Row, CloudObjectMetadata> getCloudObjectMetadataPerPartition(String storageUrlSchemePrefix, StorageConfiguration<Configuration> storageConf, boolean checkIfExists) {
        return (MapPartitionsFunction & Serializable)rows -> {
            ArrayList cloudObjectMetadataPerPartition = new ArrayList();
            rows.forEachRemaining(row -> {
                Option<String> filePathUrl = CloudObjectsSelectorCommon.getUrlForFile(row, storageUrlSchemePrefix, storageConf, checkIfExists);
                filePathUrl.ifPresent(url2 -> {
                    long size;
                    LOG.info("Adding file: " + url2);
                    Object obj = row.get(2);
                    if (obj instanceof String) {
                        size = Long.parseLong((String)obj);
                    } else if (obj instanceof Integer) {
                        size = ((Integer)obj).longValue();
                    } else if (obj instanceof Long) {
                        size = (Long)obj;
                    } else {
                        throw new HoodieIOException("unexpected object size's type in Cloud storage events: " + obj.getClass());
                    }
                    cloudObjectMetadataPerPartition.add(new CloudObjectMetadata((String)url2, size));
                });
            });
            return cloudObjectMetadataPerPartition.iterator();
        };
    }

    private static Option<String> getUrlForFile(Row row, String storageUrlSchemePrefix, StorageConfiguration<Configuration> storageConf, boolean checkIfExists) {
        Configuration configuration = storageConf.unwrapCopy();
        String bucket = row.getString(0);
        String filePath = storageUrlSchemePrefix + bucket + "/" + row.getString(1);
        try {
            String filePathUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());
            if (!checkIfExists) {
                return Option.of(filePathUrl);
            }
            boolean exists = CloudObjectsSelectorCommon.checkIfFileExists(storageUrlSchemePrefix, bucket, filePathUrl, configuration);
            return exists ? Option.of(filePathUrl) : Option.empty();
        }
        catch (Exception exception) {
            LOG.warn(String.format("Failed to generate path to cloud file %s", filePath), (Throwable)exception);
            throw new HoodieException(String.format("Failed to generate path to cloud file %s", filePath), exception);
        }
    }

    private static boolean checkIfFileExists(String storageUrlSchemePrefix, String bucket, String filePathUrl, Configuration configuration) {
        try {
            FileSystem fs = HadoopFSUtils.getFs(storageUrlSchemePrefix + bucket, configuration);
            return fs.exists(new Path(filePathUrl));
        }
        catch (IOException ioe) {
            String errMsg = String.format("Error while checking path exists for %s ", filePathUrl);
            LOG.error(errMsg, (Throwable)ioe);
            throw new HoodieIOException(errMsg, ioe);
        }
    }

    public static String generateFilter(Type type2, TypedProperties props) {
        String objectSizeKey;
        String objectKey;
        String fileFormat = CloudDataFetcher.getFileFormat(props);
        Option<String> selectRelativePathPrefix = CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX);
        Option<String> ignoreRelativePathPrefix = CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX);
        Option<String> ignoreRelativePathSubStr = CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR);
        Option<String> selectRelativePathRegex = CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.SELECT_RELATIVE_PATH_REGEX);
        if (type2.equals((Object)Type.S3)) {
            objectKey = S3_OBJECT_KEY;
            objectSizeKey = S3_OBJECT_SIZE;
            selectRelativePathPrefix = selectRelativePathPrefix.or(() -> CloudObjectsSelectorCommon.getPropVal(props, S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX));
            ignoreRelativePathPrefix = ignoreRelativePathPrefix.or(() -> CloudObjectsSelectorCommon.getPropVal(props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX));
            ignoreRelativePathSubStr = ignoreRelativePathSubStr.or(() -> CloudObjectsSelectorCommon.getPropVal(props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING));
        } else {
            objectKey = GCS_OBJECT_KEY;
            objectSizeKey = GCS_OBJECT_SIZE;
        }
        StringBuilder filter = new StringBuilder(String.format("%s > 0", objectSizeKey));
        if (selectRelativePathPrefix.isPresent() || selectRelativePathRegex.isPresent()) {
            String prefix = selectRelativePathPrefix.orElse("");
            String regex2 = selectRelativePathRegex.orElse("");
            if (!regex2.isEmpty()) {
                String updatedPathRegex = prefix.isEmpty() || prefix.endsWith("/") ? prefix + regex2 : prefix + "/" + regex2;
                filter.append(SPACE_DELIMTER).append(String.format("and %s rlike '%s'", objectKey, updatedPathRegex));
            } else if (!prefix.isEmpty()) {
                filter.append(SPACE_DELIMTER).append(String.format("and %s like '%s%%'", objectKey, prefix));
            }
        }
        if (ignoreRelativePathPrefix.isPresent()) {
            filter.append(SPACE_DELIMTER).append(String.format("and %s not like '%s%%'", objectKey, ignoreRelativePathPrefix.get()));
        }
        if (ignoreRelativePathSubStr.isPresent()) {
            filter.append(SPACE_DELIMTER).append(String.format("and %s not like '%%%s%%'", objectKey, ignoreRelativePathSubStr.get()));
        }
        CloudObjectsSelectorCommon.getPropVal(props, CloudSourceConfig.CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat)).map(val -> filter.append(SPACE_DELIMTER).append(String.format("and %s like '%%%s'", objectKey, val)));
        return filter.toString();
    }

    public static List<CloudObjectMetadata> getObjectMetadata(Type type2, JavaSparkContext jsc, Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists, TypedProperties props) {
        StorageConfiguration<Configuration> storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
        if (type2 == Type.GCS) {
            return cloudObjectMetadataDF.select("bucket", new String[]{GCS_OBJECT_KEY, GCS_OBJECT_SIZE}).distinct().mapPartitions(CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition(GCS_PREFIX, storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)).collectAsList();
        }
        if (type2 == Type.S3) {
            String s3FS = ConfigUtils.getStringWithAltKeys((Properties)props, S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX, true).toLowerCase();
            String s3Prefix = s3FS + "://";
            return cloudObjectMetadataDF.select(S3_BUCKET_NAME, new String[]{S3_OBJECT_KEY, S3_OBJECT_SIZE}).distinct().mapPartitions(CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition(s3Prefix, storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)).collectAsList();
        }
        throw new UnsupportedOperationException("Invalid cloud type " + (Object)((Object)type2));
    }

    public Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, String fileFormat, Option<SchemaProvider> schemaProviderOption, int numPartitions) {
        Schema sourceSchema;
        Schema sourceSchema2;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Extracted distinct files " + cloudObjectMetadata.size() + " and some samples " + cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10L).collect(Collectors.toList()));
        }
        if (CollectionUtils.isNullOrEmpty(cloudObjectMetadata)) {
            return Option.empty();
        }
        DataFrameReader reader = spark.read().format(fileFormat);
        String datasourceOpts = ConfigUtils.getStringWithAltKeys((Properties)this.properties, CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
        StructType rowSchema = null;
        if (schemaProviderOption.isPresent() && (sourceSchema2 = schemaProviderOption.get().getSourceSchema()) != null && !sourceSchema2.equals((Object)InputBatch.NULL_SCHEMA)) {
            rowSchema = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema2);
            reader = CloudObjectsSelectorCommon.isCoalesceRequired(this.properties, sourceSchema2) ? reader.schema(CloudObjectsSelectorCommon.addAliasesToRowSchema(sourceSchema2, rowSchema)) : reader.schema(rowSchema);
        }
        if (StringUtils.isNullOrEmpty(datasourceOpts)) {
            datasourceOpts = ConfigUtils.getStringWithAltKeys((Properties)this.properties, S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
        }
        if (StringUtils.nonEmpty(datasourceOpts)) {
            ObjectMapper mapper = new ObjectMapper();
            Object sparkOptionsMap = null;
            try {
                sparkOptionsMap = (Map)mapper.readValue(datasourceOpts, Map.class);
            }
            catch (IOException e) {
                throw new HoodieException(String.format("Failed to parse sparkOptions: %s", datasourceOpts), e);
            }
            LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
            reader = reader.options((Map)sparkOptionsMap);
        }
        ArrayList<String> paths = new ArrayList<String>();
        for (CloudObjectMetadata o : cloudObjectMetadata) {
            paths.add(o.getPath());
        }
        boolean isCommaSeparatedPathFormat = this.properties.getBoolean(CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), false);
        Dataset dataset = isCommaSeparatedPathFormat ? reader.load(String.join((CharSequence)",", paths)) : reader.load(paths.toArray(new String[cloudObjectMetadata.size()]));
        if (schemaProviderOption.isPresent() && CloudObjectsSelectorCommon.isCoalesceRequired(this.properties, sourceSchema = schemaProviderOption.get().getSourceSchema())) {
            dataset = spark.createDataFrame(CloudObjectsSelectorCommon.coalesceAliasFields(dataset, sourceSchema).rdd(), rowSchema);
        }
        if (ConfigUtils.containsConfigProperty(this.properties, CloudSourceConfig.PATH_BASED_PARTITION_FIELDS)) {
            String[] partitionKeysToAdd;
            for (String partitionKey : partitionKeysToAdd = ConfigUtils.getStringWithAltKeys(this.properties, CloudSourceConfig.PATH_BASED_PARTITION_FIELDS).split(",")) {
                String partitionPathPattern = String.format("%s=", partitionKey);
                LOG.info(String.format("Adding column %s to dataset", partitionKey));
                dataset = dataset.withColumn(partitionKey, functions.split((Column)functions.split((Column)functions.input_file_name(), (String)partitionPathPattern).getItem((Object)1), (String)"/").getItem((Object)0));
            }
        }
        dataset = IncrSourceHelper.coalesceOrRepartition(dataset, numPartitions);
        return Option.of(dataset);
    }

    private static boolean isCoalesceRequired(TypedProperties properties2, Schema sourceSchema) {
        return ConfigUtils.getBooleanWithAltKeys(properties2, CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS) && Objects.nonNull(sourceSchema) && CloudObjectsSelectorCommon.hasFieldWithAliases(sourceSchema);
    }

    private static boolean hasFieldWithAliases(Schema schema) {
        if (CloudObjectsSelectorCommon.isNestedRecord(schema)) {
            for (Schema.Field field : CloudObjectsSelectorCommon.getRecordFields(schema)) {
                if (!field.aliases().isEmpty()) {
                    return true;
                }
                if (!CloudObjectsSelectorCommon.hasFieldWithAliases(field.schema())) continue;
                return true;
            }
        }
        return false;
    }

    private static StructType addAliasesToRowSchema(Schema avroSchema, StructType rowSchema) {
        Map rowFieldsMap = Arrays.stream(rowSchema.fields()).collect(Collectors.toMap(StructField::name, Function.identity()));
        StructField[] modifiedFields = (StructField[])CloudObjectsSelectorCommon.getRecordFields(avroSchema).stream().flatMap(avroField -> CloudObjectsSelectorCommon.generateRowFieldsWithAliases(avroField, (StructField)rowFieldsMap.get(avroField.name())).stream()).toArray(StructField[]::new);
        return new StructType(modifiedFields);
    }

    private static List<Schema.Field> getRecordFields(Schema schema) {
        if (schema.getType() == Schema.Type.RECORD) {
            return schema.getFields();
        }
        if (schema.getType() == Schema.Type.UNION) {
            return schema.getTypes().stream().filter(subSchema -> subSchema.getType() == Schema.Type.RECORD).findFirst().map(Schema::getFields).orElse(Collections.emptyList());
        }
        return Collections.emptyList();
    }

    private static List<StructField> generateRowFieldsWithAliases(Schema.Field avroField, StructField rowField) {
        ArrayList<StructField> fieldList = new ArrayList<StructField>();
        if (CloudObjectsSelectorCommon.isNestedRecord(avroField.schema())) {
            StructType updatedSchema = CloudObjectsSelectorCommon.addAliasesToRowSchema(avroField.schema(), (StructType)rowField.dataType());
            if (CloudObjectsSelectorCommon.schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
                CloudObjectsSelectorCommon.addFieldWithAliases(fieldList, avroField.name(), (DataType)updatedSchema, rowField.metadata(), avroField.aliases());
            } else {
                fieldList.add(rowField);
            }
        } else if (!avroField.aliases().isEmpty()) {
            CloudObjectsSelectorCommon.addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(), rowField.metadata(), avroField.aliases());
        } else {
            fieldList.add(rowField);
        }
        return fieldList;
    }

    private static void addFieldWithAliases(List<StructField> fieldList, String fieldName, DataType dataType, Metadata metadata2, Set<String> aliases) {
        fieldList.add(new StructField(fieldName, dataType, true, metadata2));
        aliases.forEach(alias -> fieldList.add(new StructField(alias, dataType, true, metadata2)));
    }

    private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, Schema sourceSchema) {
        return CloudObjectsSelectorCommon.coalesceNestedAliases(CloudObjectsSelectorCommon.coalesceTopLevelAliases(dataset, sourceSchema), sourceSchema);
    }

    private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset, Schema sourceSchema) {
        return CloudObjectsSelectorCommon.getRecordFields(sourceSchema).stream().filter(field -> !field.aliases().isEmpty()).reduce(dataset, (ds, field) -> CloudObjectsSelectorCommon.coalesceAndDropAliasFields((Dataset<Row>)ds, field.name(), field.aliases()), (ds1, ds2) -> ds1);
    }

    private static Dataset<Row> coalesceAndDropAliasFields(Dataset<Row> dataset, String fieldName, Set<String> aliases) {
        ArrayList<Column> columns = new ArrayList<Column>();
        columns.add(dataset.col(fieldName));
        aliases.forEach(alias -> columns.add(dataset.col(alias)));
        return dataset.withColumn(fieldName, functions.coalesce((Column[])columns.toArray(new Column[0]))).drop(aliases.toArray(new String[0]));
    }

    private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset, Schema sourceSchema) {
        for (Schema.Field field : CloudObjectsSelectorCommon.getRecordFields(sourceSchema)) {
            if (!CloudObjectsSelectorCommon.isNestedRecord(field.schema()) || !CloudObjectsSelectorCommon.hasFieldWithAliases(field.schema())) continue;
            dataset = dataset.withColumn(field.name(), functions.struct((Column[])CloudObjectsSelectorCommon.getNestedFields("", field, dataset)));
        }
        return dataset;
    }

    private static Column[] getNestedFields(String parentField, Schema.Field field, Dataset<Row> dataset) {
        return (Column[])CloudObjectsSelectorCommon.getRecordFields(field.schema()).stream().map(avroField -> {
            ArrayList<Column> columns = new ArrayList<Column>();
            String newParentField = CloudObjectsSelectorCommon.getFullName(parentField, field.name());
            if (CloudObjectsSelectorCommon.isNestedRecord(avroField.schema())) {
                columns.add(functions.struct((Column[])CloudObjectsSelectorCommon.getNestedFields(newParentField, avroField, dataset)));
            } else {
                columns.add(dataset.col(CloudObjectsSelectorCommon.getFullName(newParentField, avroField.name())));
            }
            avroField.aliases().forEach(alias -> columns.add(dataset.col(CloudObjectsSelectorCommon.getFullName(newParentField, alias))));
            return avroField.aliases().isEmpty() ? (Column)columns.get(0) : functions.coalesce((Column[])columns.toArray(new Column[0])).alias(avroField.name());
        }).toArray(Column[]::new);
    }

    private static boolean isNestedRecord(Schema schema) {
        if (schema.getType() == Schema.Type.RECORD) {
            return true;
        }
        if (schema.getType() == Schema.Type.UNION) {
            return schema.getTypes().stream().anyMatch(subSchema -> subSchema.getType() == Schema.Type.RECORD);
        }
        return false;
    }

    private static String getFullName(String namespace, String fieldName) {
        return namespace.isEmpty() ? fieldName : namespace + "." + fieldName;
    }

    private static boolean schemaModifiedOrHasAliases(Schema.Field avroField, StructType modifiedNestedSchema, StructField rowField) {
        return !modifiedNestedSchema.equals((Object)rowField.dataType()) || !avroField.aliases().isEmpty();
    }

    private static Option<String> getPropVal(TypedProperties props, ConfigProperty<String> configProperty) {
        String value = ConfigUtils.getStringWithAltKeys((Properties)props, configProperty, true);
        if (!StringUtils.isNullOrEmpty(value)) {
            return Option.of(value);
        }
        return Option.empty();
    }

    public static enum Type {
        S3,
        GCS;

    }
}

