/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.io.Serializable;
import java.util.List;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudDataFetcher
implements Serializable {
    private static final String EMPTY_STRING = "";
    private transient TypedProperties props;
    private transient JavaSparkContext sparkContext;
    private transient SparkSession sparkSession;
    private transient CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
    private static final Logger LOG = LoggerFactory.getLogger(CloudDataFetcher.class);
    private static final long serialVersionUID = 1L;
    private final HoodieIngestionMetrics metrics;

    public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession, HoodieIngestionMetrics metrics) {
        this(props, jsc, sparkSession, metrics, new CloudObjectsSelectorCommon(props));
    }

    public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession, HoodieIngestionMetrics metrics, CloudObjectsSelectorCommon cloudObjectsSelectorCommon) {
        this.props = props;
        this.sparkContext = jsc;
        this.sparkSession = sparkSession;
        this.metrics = metrics;
        this.cloudObjectsSelectorCommon = cloudObjectsSelectorCommon;
    }

    public static String getFileFormat(TypedProperties props) {
        return StringUtils.isNullOrEmpty(ConfigUtils.getStringWithAltKeys(props, CloudSourceConfig.DATAFILE_FORMAT, EMPTY_STRING)) ? ConfigUtils.getStringWithAltKeys(props, HoodieIncrSourceConfig.SOURCE_FILE_FORMAT, true) : ConfigUtils.getStringWithAltKeys(props, CloudSourceConfig.DATAFILE_FORMAT, EMPTY_STRING);
    }

    public Pair<Option<Dataset<Row>>, String> fetchPartitionedSource(CloudObjectsSelectorCommon.Type cloudType, CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint, Option<SourceProfileSupplier> sourceProfileSupplier, Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair, Option<SchemaProvider> schemaProvider, long sourceLimit) {
        long bytesPerPartitionFromProfile;
        long bytesPerPartition;
        boolean isSourceProfileSupplierAvailable;
        boolean bl = isSourceProfileSupplierAvailable = sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null;
        if (isSourceProfileSupplierAvailable) {
            LOG.debug("Using source limit from source profile sourceLimitFromConfig {} sourceLimitFromProfile {}", (Object)sourceLimit, (Object)sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes());
            sourceLimit = sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes();
        }
        QueryInfo queryInfo = queryInfoDatasetPair.getLeft();
        String filter = CloudObjectsSelectorCommon.generateFilter(cloudType, this.props);
        LOG.info("Adding filter string to Dataset: " + filter);
        Dataset filteredSourceData = queryInfoDatasetPair.getRight().filter(filter);
        LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit);
        Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit((Dataset<Row>)filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint);
        if (!checkPointAndDataset.getRight().isPresent()) {
            LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft());
            return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString());
        }
        LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
        boolean checkIfFileExists = ConfigUtils.getBooleanWithAltKeys(this.props, CloudSourceConfig.ENABLE_EXISTS_CHECK);
        List<CloudObjectMetadata> cloudObjectMetadata = CloudObjectsSelectorCommon.getObjectMetadata(cloudType, this.sparkContext, checkPointAndDataset.getRight().get(), checkIfFileExists, this.props);
        LOG.info("Total number of files to process :" + cloudObjectMetadata.size());
        long l = bytesPerPartition = this.props.containsKey(CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION.key()) ? this.props.getLong(CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION.key()) : this.props.getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue()));
        if (isSourceProfileSupplierAvailable && (bytesPerPartitionFromProfile = ((Long)sourceProfileSupplier.get().getSourceProfile().getSourceSpecificContext()).longValue()) > 0L) {
            LOG.debug("Using bytesPerPartition from source profile bytesPerPartitionFromConfig {} bytesPerPartitionFromProfile {}", (Object)bytesPerPartition, (Object)bytesPerPartitionFromProfile);
            bytesPerPartition = bytesPerPartitionFromProfile;
        }
        Option<Dataset<Row>> datasetOption = this.getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition);
        return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
    }

    private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata> cloudObjectMetadata, Option<SchemaProvider> schemaProviderOption, long bytesPerPartition) {
        long totalSize = 0L;
        for (CloudObjectMetadata o : cloudObjectMetadata) {
            totalSize += o.getSize();
        }
        double totalSizeWithHoodieMetaFields = (double)totalSize * 1.1;
        this.metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(totalSize);
        int numPartitions = (int)Math.max(Math.ceil(totalSizeWithHoodieMetaFields / (double)bytesPerPartition), 1.0);
        this.metrics.updateStreamerSourceParallelism(numPartitions);
        return this.cloudObjectsSelectorCommon.loadAsDataset(this.sparkSession, cloudObjectMetadata, CloudDataFetcher.getFileFormat(this.props), schemaProviderOption, numPartitions);
    }
}

