/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite.dag.nodes;

import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.Analyzer;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterable;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.mutable.Buffer;

public class ValidateDatasetNode
extends DagNode<Boolean> {
    private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class);

    public ValidateDatasetNode(DeltaConfig.Config config) {
        this.config = config;
    }

    @Override
    public void execute(ExecutionContext context) throws Exception {
        Dataset hudiDf;
        Dataset trimmedDf;
        Dataset intersectionDf;
        FileStatus[] fileStatuses;
        SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
        String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
        String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
        log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path " + hudiPath);
        String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
        FileSystem fs = new Path(inputPathStr).getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
        for (FileStatus fileStatus : fileStatuses = fs.listStatus(new Path(inputPathStr))) {
            log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
        }
        String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
        String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY());
        Dataset inputDf = session.read().format("avro").load(inputPath);
        ExpressionEncoder encoder = this.getEncoder(inputDf.schema());
        Dataset inputSnapshotDf = inputDf.groupByKey((MapFunction & Serializable)value -> partitionPathField + "+" + recordKeyField, Encoders.STRING()).reduceGroups((ReduceFunction & Serializable)(v1, v2) -> {
            int ts2;
            int ts1 = (Integer)v1.getAs("test_suite_source_ordering_field");
            if (ts1 > (ts2 = ((Integer)v2.getAs("test_suite_source_ordering_field")).intValue())) {
                return v1;
            }
            return v2;
        }).map((MapFunction & Serializable)value -> (Row)value._2, (Encoder)encoder).filter("_hoodie_is_deleted is NULL");
        if (inputSnapshotDf.except(intersectionDf = inputSnapshotDf.intersect(trimmedDf = (hudiDf = session.read().format("hudi").load(hudiPath)).drop("_hoodie_commit_time").drop("_hoodie_commit_seqno").drop("_hoodie_record_key").drop("_hoodie_partition_path").drop("_hoodie_file_name"))).count() != 0L) {
            log.error("Data set validation failed. Total count in hudi " + trimmedDf.count() + ", input df count " + inputSnapshotDf.count());
            throw new AssertionError((Object)"Hudi contents does not match contents input data. ");
        }
        String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
        String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
        log.warn("Validating hive table with db : " + database + " and table : " + tableName);
        Dataset cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
        Dataset trimmedCowDf = cowDf.drop("_hoodie_commit_time").drop("_hoodie_commit_seqno").drop("_hoodie_record_key").drop("_hoodie_partition_path").drop("_hoodie_file_name");
        intersectionDf = inputSnapshotDf.intersect(trimmedDf);
        if (inputSnapshotDf.except(intersectionDf).count() != 0L) {
            log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
            throw new AssertionError((Object)"Hudi hive table contents does not match contents input data. ");
        }
        if (this.config.isDeleteInputData()) {
            inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
            fs = new Path(inputPathStr).getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
            for (FileStatus fileStatus : fileStatuses = fs.listStatus(new Path(inputPathStr))) {
                log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
                fs.delete(fileStatus.getPath(), true);
            }
        }
    }

    private ExpressionEncoder getEncoder(StructType schema) {
        List attributes = JavaConversions.asJavaCollection((Iterable)schema.toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
        return RowEncoder.apply((StructType)schema).resolveAndBind(((Buffer)JavaConverters.asScalaBufferConverter(attributes).asScala()).toSeq(), (Analyzer)SimpleAnalyzer$.MODULE$);
    }
}

