/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.testutils.sources;

import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedTestDataSource
extends AbstractBaseTestSource {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedTestDataSource.class);
    private final int numTestSourcePartitions;

    public DistributedTestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
        this.numTestSourcePartitions = ConfigUtils.getIntWithAltKeys((TypedProperties)props, SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP);
    }

    protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
        int nextCommitNum = (Integer)lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse((Object)0);
        String instantTime = String.format("%05d", nextCommitNum);
        LOG.info("Source Limit is set to {}", (Object)sourceLimit);
        if (sourceLimit <= 0L) {
            return new InputBatch(Option.empty(), instantTime);
        }
        TypedProperties newProps = new TypedProperties();
        newProps.putAll((Map)this.props);
        int maxUniqueRecords = ConfigUtils.getIntWithAltKeys((TypedProperties)this.props, SourceTestConfig.MAX_UNIQUE_RECORDS_PROP);
        String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / this.numTestSourcePartitions));
        newProps.setProperty(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), maxUniqueRecordsPerPartition);
        int perPartitionSourceLimit = Math.max(1, (int)(sourceLimit / (long)this.numTestSourcePartitions));
        JavaRDD avroRDD = this.sparkContext.parallelize(IntStream.range(0, this.numTestSourcePartitions).boxed().collect(Collectors.toList()), this.numTestSourcePartitions).mapPartitionsWithIndex((Function2 & Serializable)(p, idx) -> {
            LOG.info("Initializing source with newProps={}", (Object)newProps);
            if (!dataGeneratorMap.containsKey(p)) {
                DistributedTestDataSource.initDataGen(newProps, p);
            }
            return DistributedTestDataSource.fetchNextBatch(newProps, perPartitionSourceLimit, instantTime, p).iterator();
        }, true);
        return new InputBatch(Option.of((Object)avroRDD), instantTime);
    }
}

