/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.testutils.sources;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.collection.RocksDBBasedMap;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBaseTestSource
extends AvroSource {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseTestSource.class);
    public static final int DEFAULT_PARTITION_NUM = 0;
    public static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<Integer, HoodieTestDataGenerator>();

    public static void initDataGen() {
        dataGeneratorMap.putIfAbsent(0, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
    }

    public static void initDataGen(TypedProperties props, int partition) {
        try {
            boolean useRocksForTestDataGenKeys = ConfigUtils.getBooleanWithAltKeys((Properties)props, SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
            String baseStoreDir = ConfigUtils.getStringWithAltKeys((TypedProperties)props, SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, (String)File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition;
            LOG.info("useRocksForTestDataGenKeys={}, BaseStoreDir={}", (Object)useRocksForTestDataGenKeys, (Object)baseStoreDir);
            dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, (Map)(useRocksForTestDataGenKeys ? new RocksDBBasedMap(baseStoreDir) : new HashMap())));
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    public static void initDataGen(SQLContext sqlContext, String globParquetPath, int partition) {
        List rows = sqlContext.read().format("hudi").load(globParquetPath).select("_hoodie_record_key", new String[]{"_hoodie_partition_path"}).collectAsList();
        Map keyPartitionMap = IntStream.range(0, rows.size()).boxed().collect(Collectors.toMap(Function.identity(), i -> {
            Row r = (Row)rows.get((int)i);
            HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
            kp.key = new HoodieKey(r.getString(0), r.getString(1));
            kp.partitionPath = r.getString(1);
            return kp;
        }));
        dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, keyPartitionMap));
    }

    public static void resetDataGen() {
        for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) {
            dataGenerator.close();
        }
        dataGeneratorMap.clear();
    }

    protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
    }

    protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String instantTime, int partition) {
        Stream<GenericRecord> updateStream;
        int maxUniqueKeys = ConfigUtils.getIntWithAltKeys((TypedProperties)props, SourceTestConfig.MAX_UNIQUE_RECORDS_PROP);
        HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
        int numExistingKeys = dataGenerator.getNumExistingKeys("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}");
        LOG.info("NumExistingKeys={}", (Object)numExistingKeys);
        int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
        int numInserts = sourceLimit - numUpdates;
        LOG.info("Before adjustments => numInserts={}, numUpdates={}", (Object)numInserts, (Object)numUpdates);
        boolean reachedMax = false;
        if (numInserts + numExistingKeys > maxUniqueKeys) {
            numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
            reachedMax = true;
        }
        if (numInserts + numUpdates < sourceLimit) {
            numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
        }
        Stream<Object> deleteStream = Stream.empty();
        long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
        LOG.info("Before DataGen. Memory Usage={}, Total Memory={}, Free Memory={}", new Object[]{memoryUsage1, Runtime.getRuntime().totalMemory(), Runtime.getRuntime().freeMemory()});
        if (!reachedMax && numUpdates >= 50) {
            LOG.info("After adjustments => NumInserts={}, NumUpdates={}, NumDeletes=50, maxUniqueRecords={}", new Object[]{numInserts, numUpdates - 50, maxUniqueKeys});
            deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, Integer.valueOf(50)).map(AbstractBaseTestSource::toGenericRecord);
            updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, Integer.valueOf(numUpdates - 50), "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        } else {
            LOG.info("After adjustments => NumInserts={}, NumUpdates={}, maxUniqueRecords={}", new Object[]{numInserts, numUpdates, maxUniqueKeys});
            updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, Integer.valueOf(numUpdates), "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        }
        Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(instantTime, Integer.valueOf(numInserts), false, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        if (Boolean.valueOf(props.getOrDefault((Object)"hoodie.test.source.generate.inserts", (Object)"false").toString()).booleanValue()) {
            return insertStream;
        }
        return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
    }

    private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
        try {
            RawTripTestPayload payload = (RawTripTestPayload)hoodieRecord.getData();
            return (GenericRecord)payload.getRecordToInsert(HoodieTestDataGenerator.AVRO_SCHEMA);
        }
        catch (IOException e) {
            return null;
        }
    }
}

