/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

public class SparkRDDReadClient<T>
implements Serializable {
    private static final long serialVersionUID = 1L;
    private final transient HoodieIndex<?, ?> index;
    private HoodieTable hoodieTable;
    private transient Option<SQLContext> sqlContextOpt;
    private final transient HoodieSparkEngineContext context;
    private final transient StorageConfiguration<?> storageConf;

    public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath) {
        this(context, HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
    }

    public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) {
        this(context, basePath);
        this.sqlContextOpt = Option.of(sqlContext);
    }

    public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext, HoodieIndex.IndexType indexType) {
        this(context, HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).build());
        this.sqlContextOpt = Option.of(sqlContext);
    }

    public SparkRDDReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) {
        this.context = context;
        this.storageConf = context.getStorageConf();
        String basePath = clientConfig.getBasePath();
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
        this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient);
        this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
        this.sqlContextOpt = Option.empty();
    }

    public static SparkConf addHoodieSupport(SparkConf conf) {
        conf.set("spark.sql.hive.convertMetastoreParquet", "false");
        return conf;
    }

    private void assertSqlContext() {
        if (!this.sqlContextOpt.isPresent()) {
            throw new IllegalStateException("SQLContext must be set, when performing dataframe operations");
        }
    }

    private Option<String> convertToDataFilePath(Option<Pair<String, String>> partitionPathFileIDPair) {
        if (partitionPathFileIDPair.isPresent()) {
            HoodieBaseFile dataFile = this.hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPathFileIDPair.get().getLeft(), partitionPathFileIDPair.get().getRight()).get();
            return Option.of(dataFile.getPath());
        }
        return Option.empty();
    }

    public Dataset<Row> readROView(JavaRDD<HoodieKey> hoodieKeys, int parallelism) {
        this.assertSqlContext();
        JavaPairRDD<HoodieKey, Option<Pair<String, String>>> lookupResultRDD = this.checkExists(hoodieKeys);
        JavaPairRDD keyToFileRDD = lookupResultRDD.mapToPair((PairFunction & Serializable)r -> new Tuple2(r._1, this.convertToDataFilePath((Option)r._2)));
        List paths = keyToFileRDD.filter((Function & Serializable)keyFileTuple -> ((Option)keyFileTuple._2()).isPresent()).map((Function & Serializable)keyFileTuple -> (String)((Option)keyFileTuple._2()).get()).collect();
        HashSet uniquePaths = new HashSet(paths);
        Dataset originalDF = null;
        if (paths.size() == 0 || ((String)paths.get(0)).endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
            originalDF = this.sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
        } else if (((String)paths.get(0)).endsWith(HoodieFileFormat.ORC.getFileExtension())) {
            originalDF = this.sqlContextOpt.get().read().orc(uniquePaths.toArray(new String[uniquePaths.size()]));
        }
        StructType schema = originalDF.schema();
        JavaPairRDD keyRowRDD = originalDF.javaRDD().mapToPair((PairFunction & Serializable)row -> {
            HoodieKey key = new HoodieKey((String)row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD), (String)row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
            return new Tuple2((Object)key, row);
        });
        JavaRDD rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map((Function & Serializable)tuple -> (Row)((Tuple2)tuple._2())._1());
        return this.sqlContextOpt.get().createDataFrame(rowRDD, schema);
    }

    public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> checkExists(JavaRDD<HoodieKey> hoodieKeys) {
        return HoodieJavaRDD.getJavaRDD(this.index.tagLocation(HoodieJavaRDD.of(hoodieKeys.map((Function & Serializable)k -> new HoodieAvroRecord<Object>((HoodieKey)k, null))), (HoodieEngineContext)this.context, this.hoodieTable)).mapToPair((PairFunction & Serializable)hr -> new Tuple2((Object)hr.getKey(), hr.isCurrentLocationKnown() ? Option.of(Pair.of(hr.getPartitionPath(), hr.getCurrentLocation().getFileId())) : Option.empty()));
    }

    public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
        JavaRDD<HoodieRecord<T>> recordsWithLocation = this.tagLocation(hoodieRecords);
        return recordsWithLocation.filter((Function & Serializable)v1 -> !v1.isCurrentLocationKnown());
    }

    public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> hoodieRecords) throws HoodieIndexException {
        return HoodieJavaRDD.getJavaRDD(this.index.tagLocation(HoodieJavaRDD.of(hoodieRecords), (HoodieEngineContext)this.context, this.hoodieTable));
    }

    public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(this.hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build();
        return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(instantWorkloadPair -> Pair.of(((HoodieInstant)instantWorkloadPair.getKey()).getTimestamp(), instantWorkloadPair.getValue())).collect(Collectors.toList());
    }
}

