/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.utils;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class SparkValidatorUtils {
    private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);

    public static void runValidators(HoodieWriteConfig config, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata, HoodieEngineContext context, HoodieTable table, String instantTime) {
        if (StringUtils.isNullOrEmpty(config.getPreCommitValidators())) {
            LOG.info((Object)"no validators configured.");
        } else {
            if (!writeMetadata.getWriteStats().isPresent()) {
                writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
            }
            Set<String> partitionsModified = writeMetadata.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet());
            SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context));
            table.getMetaClient().reloadActiveTimeline();
            Dataset beforeState = SparkValidatorUtils.getRecordsFromCommittedFiles(sqlContext, partitionsModified, table).cache();
            Dataset afterState = SparkValidatorUtils.getRecordsFromPendingCommits(sqlContext, partitionsModified, writeMetadata, table, instantTime).cache();
            Stream<SparkPreCommitValidator> validators = Arrays.stream(config.getPreCommitValidators().split(",")).map(validatorClass -> (SparkPreCommitValidator)ReflectionUtils.loadClass(validatorClass, new Class[]{HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, new Object[]{table, context, config}));
            boolean allSuccess = validators.map(v -> SparkValidatorUtils.runValidatorAsync(v, writeMetadata, (Dataset<Row>)beforeState, (Dataset<Row>)afterState, instantTime)).map(CompletableFuture::join).reduce(true, Boolean::logicalAnd);
            if (allSuccess) {
                LOG.info((Object)"All validations succeeded");
            } else {
                LOG.error((Object)"At least one pre-commit validation failed");
                throw new HoodieValidationException("At least one pre-commit validation failed");
            }
        }
    }

    private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata, Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                validator.validate(instantTime, writeMetadata, beforeState, afterState);
                LOG.info((Object)("validation complete for " + validator.getClass().getName()));
                return true;
            }
            catch (HoodieValidationException e) {
                LOG.error((Object)("validation failed for " + validator.getClass().getName()));
                return false;
            }
        });
    }

    public static Dataset<Row> getRecordsFromCommittedFiles(SQLContext sqlContext, Set<String> partitionsAffected, HoodieTable table) {
        List<String> committedFiles = partitionsAffected.stream().flatMap(partition -> table.getBaseFileOnlyView().getLatestBaseFiles((String)partition).map(BaseFile::getPath)).collect(Collectors.toList());
        if (committedFiles.isEmpty()) {
            return sqlContext.emptyDataFrame();
        }
        return SparkValidatorUtils.readRecordsForBaseFiles(sqlContext, committedFiles);
    }

    public static Dataset<Row> readRecordsForBaseFiles(SQLContext sqlContext, List<String> baseFilePaths) {
        return sqlContext.read().parquet((Seq)JavaConverters.asScalaBufferConverter(baseFilePaths).asScala());
    }

    public static Dataset<Row> getRecordsFromPendingCommits(SQLContext sqlContext, Set<String> partitionsAffected, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata, HoodieTable table, String instantTime) {
        HoodieTablePreCommitFileSystemView fsView = new HoodieTablePreCommitFileSystemView(table.getMetaClient(), table.getHoodieView(), writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), instantTime);
        List<String> newFiles = partitionsAffected.stream().flatMap(partition -> fsView.getLatestBaseFiles((String)partition).map(BaseFile::getPath)).collect(Collectors.toList());
        if (newFiles.isEmpty()) {
            return sqlContext.emptyDataFrame();
        }
        return SparkValidatorUtils.readRecordsForBaseFiles(sqlContext, newFiles);
    }
}

