/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite.dag.nodes;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.Analyzer;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.collection.Iterable;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.mutable.Buffer;

public abstract class BaseValidateDatasetNode
extends DagNode<Boolean> {
    public BaseValidateDatasetNode(DeltaConfig.Config config) {
        this.config = config;
    }

    public abstract Logger getLogger();

    public abstract Dataset<Row> getDatasetToValidate(SparkSession var1, ExecutionContext var2, StructType var3);

    @Override
    public void execute(ExecutionContext context, int curItrCount) throws Exception {
        int validateOnceEveryItr = this.config.validateOnceEveryIteration();
        int itrCountToExecute = this.config.getIterationCountToExecute();
        if (itrCountToExecute != -1 && itrCountToExecute == curItrCount || itrCountToExecute == -1 && curItrCount % validateOnceEveryItr == 0) {
            FileSystem fs = new Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath).getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
            if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode.booleanValue()) {
                this.awaitUntilDeltaStreamerCaughtUp(context, context.getHoodieTestSuiteWriter().getCfg().targetBasePath, fs, context.getHoodieTestSuiteWriter().getCfg().inputBasePath);
            }
            SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
            String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
            log.info("Validation using data from input path " + inputPath);
            String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
            if (log.isDebugEnabled()) {
                FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
                log.info("fileStatuses length: " + fileStatuses.length);
                for (FileStatus fileStatus : fileStatuses) {
                    log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
                }
            }
            Dataset<Row> inputSnapshotDf = this.getInputDf(context, session, inputPath);
            Dataset<Row> trimmedHudiDf = this.getDatasetToValidate(session, context, inputSnapshotDf.schema());
            if (this.config.isValidateFullData()) {
                log.debug("Validating full dataset");
                Dataset exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
                Dataset exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
                long exceptInputCount = exceptInputDf.count();
                long exceptHudiCount = exceptHudiDf.count();
                log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount);
                if (exceptInputCount != 0L || exceptHudiCount != 0L) {
                    log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count() + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount);
                    throw new AssertionError((Object)"Hudi contents does not match contents input data. ");
                }
            } else {
                Dataset intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
                long inputCount = inputSnapshotDf.count();
                long outputCount = trimmedHudiDf.count();
                log.debug("Input count: " + inputCount + "; output count: " + outputCount);
                if (outputCount == 0L || inputCount == 0L || inputSnapshotDf.except(intersectionDf).count() != 0L) {
                    log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
                    throw new AssertionError((Object)"Hudi contents does not match contents input data. ");
                }
                if (this.config.isValidateHive()) {
                    String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
                    String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
                    log.warn("Validating hive table with db : " + database + " and table : " + tableName);
                    session.sql("REFRESH TABLE " + database + "." + tableName);
                    Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, test_suite_source_ordering_field FROM " + database + "." + tableName);
                    Dataset reorderedInputDf = inputSnapshotDf.select("_row_key", new String[]{"rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "_hoodie_is_deleted", "test_suite_source_ordering_field"});
                    Dataset intersectedHiveDf = reorderedInputDf.intersect(cowDf);
                    outputCount = trimmedHudiDf.count();
                    log.warn("Input count: " + inputCount + "; output count: " + outputCount);
                    if (outputCount == 0L || reorderedInputDf.except(intersectedHiveDf).count() != 0L) {
                        log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
                        throw new AssertionError((Object)"Hudi hive table contents does not match contents input data. ");
                    }
                }
                if (this.config.isDeleteInputData()) {
                    FileStatus[] fileStatuses;
                    inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
                    for (FileStatus fileStatus : fileStatuses = fs.listStatus(new Path(inputPathStr))) {
                        log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
                        fs.delete(fileStatus.getPath(), true);
                    }
                }
            }
        }
    }

    private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hudiTablePath, FileSystem fs, String inputPath) throws IOException, InterruptedException {
        HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(hudiTablePath).build();
        HoodieTimeline commitTimeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        Option<String> latestCheckpoint = this.getLatestCheckpoint(commitTimeline);
        FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
        List<FileStatus> subDirList = Arrays.asList(subDirs);
        subDirList.sort(Comparator.comparingLong(entry -> Long.parseLong(entry.getPath().getName())));
        String latestSubDir = subDirList.get(subDirList.size() - 1).getPath().getName();
        log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + (latestCheckpoint.isPresent() ? (String)latestCheckpoint.get() : "none"));
        long maxWaitTime = this.config.maxWaitTimeForDeltastreamerToCatchupMs();
        long waitedSoFar = 0L;
        while (!latestCheckpoint.isPresent() || !((String)latestCheckpoint.get()).equals(latestSubDir)) {
            log.warn("Sleeping for 20 secs awaiting for deltastreamer to catch up with ingested data");
            Thread.sleep(20000L);
            meta.reloadActiveTimeline();
            commitTimeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            latestCheckpoint = this.getLatestCheckpoint(commitTimeline);
            if ((waitedSoFar += 20000L) >= maxWaitTime) {
                throw new AssertionError((Object)("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " + (latestCheckpoint.isPresent() ? (String)latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir));
            }
            log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + (latestCheckpoint.isPresent() ? (String)latestCheckpoint.get() : "none"));
        }
    }

    private Option<String> getLatestCheckpoint(HoodieTimeline timeline) {
        return timeline.getReverseOrderedInstants().map(instant -> {
            try {
                HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])timeline.getInstantDetails(instant).get()), HoodieCommitMetadata.class);
                if (!StringUtils.isNullOrEmpty((String)commitMetadata.getMetadata("deltastreamer.checkpoint.key"))) {
                    return Option.of((Object)commitMetadata.getMetadata("deltastreamer.checkpoint.key"));
                }
                return Option.empty();
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
            }
        }).filter(Option::isPresent).findFirst().orElse(Option.empty());
    }

    private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session, String inputPath) {
        Dataset inputDf;
        String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
        String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
        Dataset trimmedDf = inputDf = session.read().format("avro").load(inputPath);
        if (!this.config.inputPartitonsToSkipWithValidate().isEmpty()) {
            trimmedDf = inputDf.filter("instr(" + partitionPathField + ", '" + this.config.inputPartitonsToSkipWithValidate() + "') != 1");
        }
        ExpressionEncoder encoder = this.getEncoder(inputDf.schema());
        return trimmedDf.groupByKey((MapFunction & Serializable)value -> partitionPathField.isEmpty() ? (String)value.getAs(recordKeyField) : value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField), Encoders.STRING()).reduceGroups((ReduceFunction & Serializable)(v1, v2) -> {
            int ts2;
            int ts1 = (Integer)v1.getAs("test_suite_source_ordering_field");
            if (ts1 > (ts2 = ((Integer)v2.getAs("test_suite_source_ordering_field")).intValue())) {
                return v1;
            }
            return v2;
        }).map((MapFunction & Serializable)value -> (Row)value._2, (Encoder)encoder).filter("_hoodie_is_deleted != true");
    }

    private ExpressionEncoder getEncoder(StructType schema) {
        List attributes = JavaConversions.asJavaCollection((Iterable)schema.toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
        return RowEncoder.apply((StructType)schema).resolveAndBind(((Buffer)JavaConverters.asScalaBufferConverter(attributes).asScala()).toSeq(), (Analyzer)SimpleAnalyzer$.MODULE$);
    }
}

