/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Collections;
import java.util.List;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryRunner {
    private final SparkSession sparkSession;
    private final TypedProperties props;
    private final String sourcePath;
    private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);

    public QueryRunner(SparkSession sparkSession, TypedProperties props) {
        this.sparkSession = sparkSession;
        this.props = props;
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        this.sourcePath = ConfigUtils.getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
    }

    public Pair<QueryInfo, Dataset<Row>> run(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
        if (queryInfo.isIncremental()) {
            return this.runIncrementalQuery(queryInfo);
        }
        if (queryInfo.isSnapshot()) {
            return this.runSnapshotQuery(queryInfo, snapshotLoadQuerySplitterOption);
        }
        throw new HoodieException("Unknown query type " + queryInfo.getQueryType());
    }

    public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> orderByColumns) {
        if (orderByColumns != null && !orderByColumns.isEmpty()) {
            LOG.debug("Applying ordering " + orderByColumns);
            return dataset.orderBy((Column[])orderByColumns.stream().map(functions::col).toArray(Column[]::new));
        }
        return dataset;
    }

    public Pair<QueryInfo, Dataset<Row>> runIncrementalQuery(QueryInfo queryInfo) {
        LOG.info("Running incremental query");
        return Pair.of(queryInfo, this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant()).option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()).option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), this.props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())).load(this.sourcePath));
    }

    public Pair<QueryInfo, Dataset<Row>> runSnapshotQuery(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
        LOG.info("Running snapshot query");
        Dataset snapshot = this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(this.sourcePath);
        QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption.map(snapshotLoadQuerySplitter -> snapshotLoadQuerySplitter.getNextCheckpoint((Dataset<Row>)snapshot, queryInfo)).orElse(queryInfo);
        return Pair.of(snapshotQueryInfo, this.applySnapshotQueryFilters((Dataset<Row>)snapshot, snapshotQueryInfo));
    }

    public Dataset<Row> applySnapshotQueryFilters(Dataset<Row> snapshot, QueryInfo snapshotQueryInfo) {
        return snapshot.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, snapshotQueryInfo.getStartInstant())).filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, snapshotQueryInfo.getEndInstant()));
    }
}

