/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.AbstractHoodieClient;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

public class HoodieReadClient<T extends HoodieRecordPayload>
extends AbstractHoodieClient {
    private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class);
    private final transient HoodieIndex<T> index;
    private final HoodieTimeline commitTimeline;
    private HoodieTable hoodieTable;
    private transient Option<SQLContext> sqlContextOpt;

    public HoodieReadClient(JavaSparkContext jsc, String basePath, Option<EmbeddedTimelineService> timelineService) {
        this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(), timelineService);
    }

    public HoodieReadClient(JavaSparkContext jsc, String basePath) {
        this(jsc, basePath, Option.empty());
    }

    public HoodieReadClient(JavaSparkContext jsc, String basePath, SQLContext sqlContext) {
        this(jsc, basePath);
        this.sqlContextOpt = Option.of(sqlContext);
    }

    public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
        this(jsc, clientConfig, Option.empty());
    }

    public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) {
        super(jsc, clientConfig, timelineService);
        String basePath = clientConfig.getBasePath();
        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
        this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc);
        this.commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
        this.index = HoodieIndex.createIndex(clientConfig, jsc);
        this.sqlContextOpt = Option.empty();
    }

    public static SparkConf addHoodieSupport(SparkConf conf) {
        conf.set("spark.sql.hive.convertMetastoreParquet", "false");
        return conf;
    }

    private void assertSqlContext() {
        if (!this.sqlContextOpt.isPresent()) {
            throw new IllegalStateException("SQLContext must be set, when performing dataframe operations");
        }
    }

    private Option<String> convertToDataFilePath(Option<Pair<String, String>> partitionPathFileIDPair) {
        if (partitionPathFileIDPair.isPresent()) {
            HoodieBaseFile dataFile = this.hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPathFileIDPair.get().getLeft(), partitionPathFileIDPair.get().getRight()).get();
            return Option.of(dataFile.getPath());
        }
        return Option.empty();
    }

    public Dataset<Row> readROView(JavaRDD<HoodieKey> hoodieKeys, int parallelism) {
        this.assertSqlContext();
        JavaPairRDD<HoodieKey, Option<Pair<String, String>>> lookupResultRDD = this.index.fetchRecordLocation(hoodieKeys, this.jsc, this.hoodieTable);
        JavaPairRDD keyToFileRDD = lookupResultRDD.mapToPair((PairFunction & Serializable)r -> new Tuple2(r._1, this.convertToDataFilePath((Option)r._2)));
        List paths = keyToFileRDD.filter((Function & Serializable)keyFileTuple -> ((Option)keyFileTuple._2()).isPresent()).map((Function & Serializable)keyFileTuple -> (String)((Option)keyFileTuple._2()).get()).collect();
        HashSet uniquePaths = new HashSet(paths);
        Dataset originalDF = this.sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
        StructType schema = originalDF.schema();
        JavaPairRDD keyRowRDD = originalDF.javaRDD().mapToPair((PairFunction & Serializable)row -> {
            HoodieKey key = new HoodieKey((String)row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD), (String)row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
            return new Tuple2((Object)key, row);
        });
        JavaRDD rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map((Function & Serializable)tuple -> (Row)((Tuple2)tuple._2())._1());
        return this.sqlContextOpt.get().createDataFrame(rowRDD, schema);
    }

    public JavaPairRDD<HoodieKey, Option<String>> checkExists(JavaRDD<HoodieKey> hoodieKeys) {
        return this.index.fetchRecordLocation(hoodieKeys, this.jsc, this.hoodieTable);
    }

    public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
        JavaRDD<HoodieRecord<T>> recordsWithLocation = this.tagLocation(hoodieRecords);
        return recordsWithLocation.filter((Function & Serializable)v1 -> !v1.isCurrentLocationKnown());
    }

    public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> hoodieRecords) throws HoodieIndexException {
        return this.index.tagLocation(hoodieRecords, this.jsc, this.hoodieTable);
    }

    public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.jsc.hadoopConfiguration(), this.hoodieTable.getMetaClient().getBasePath(), true);
        return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(instantWorkloadPair -> Pair.of(((HoodieInstant)instantWorkloadPair.getKey()).getTimestamp(), instantWorkloadPair.getValue())).collect(Collectors.toList());
    }
}

