/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.feathr.offline.evaluator.datasource;

import com.linkedin.feathr.common.AnchorExtractor;
import com.linkedin.feathr.common.DateTimeResolution$;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrConfigException;
import com.linkedin.feathr.compute.AnyNode;
import com.linkedin.feathr.compute.DataSourceType;
import com.linkedin.feathr.compute.KeyExpressionType;
import com.linkedin.feathr.core.config.producer.common.KeyListExtractor;
import com.linkedin.feathr.offline.client.plugins.AnchorExtractorAdaptor;
import com.linkedin.feathr.offline.client.plugins.FeathrUdfPluginContext$;
import com.linkedin.feathr.offline.client.plugins.SourceKeyExtractorAdaptor;
import com.linkedin.feathr.offline.client.plugins.UdfAdaptor;
import com.linkedin.feathr.offline.config.ConfigLoaderUtils$;
import com.linkedin.feathr.offline.config.JoinConfigSettings;
import com.linkedin.feathr.offline.config.JoinTimeSetting;
import com.linkedin.feathr.offline.evaluator.NodeEvaluator;
import com.linkedin.feathr.offline.graph.DataframeAndColumnMetadata;
import com.linkedin.feathr.offline.graph.DataframeAndColumnMetadata$;
import com.linkedin.feathr.offline.graph.FCMGraphTraverser;
import com.linkedin.feathr.offline.source.DataSource;
import com.linkedin.feathr.offline.source.DataSource$;
import com.linkedin.feathr.offline.source.SourceFormatType$;
import com.linkedin.feathr.offline.source.TimeWindowParams;
import com.linkedin.feathr.offline.source.accessor.DataPathHandler;
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor;
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor$;
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler;
import com.linkedin.feathr.offline.source.pathutil.PathChecker;
import com.linkedin.feathr.offline.source.pathutil.PathChecker$;
import com.linkedin.feathr.offline.source.pathutil.PathInfo;
import com.linkedin.feathr.offline.source.pathutil.TimeBasedHdfsPathAnalyzer;
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils$;
import com.linkedin.feathr.offline.util.datetime.DateTimeInterval;
import com.linkedin.feathr.offline.util.datetime.OfflineDateTimeUtils$;
import com.linkedin.feathr.sparkcommon.SourceKeyExtractor;
import java.io.Serializable;
import java.time.Duration;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Enumeration;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashMap$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class DataSourceNodeEvaluator$
implements NodeEvaluator {
    public static DataSourceNodeEvaluator$ MODULE$;
    private final Logger log;

    static {
        new DataSourceNodeEvaluator$();
    }

    public Logger log() {
        return this.log;
    }

    private DataframeAndColumnMetadata processContextNode(Dataset<Row> contextDataFrame, com.linkedin.feathr.compute.DataSource dataSource) {
        String colName = dataSource.getExternalSourceRef();
        return new DataframeAndColumnMetadata(contextDataFrame, (Seq<String>)((Seq)new .colon.colon((Object)colName, (List)Nil$.MODULE$)), DataframeAndColumnMetadata$.MODULE$.apply$default$3(), DataframeAndColumnMetadata$.MODULE$.apply$default$4(), DataframeAndColumnMetadata$.MODULE$.apply$default$5());
    }

    private DataframeAndColumnMetadata processEventNode(SparkSession ss, com.linkedin.feathr.compute.DataSource dataSourceNode, Option<DateTimeInterval> timeRange, List<DataPathHandler> dataPathHandlers) {
        Tuple3 tuple3;
        Tuple3 tuple32;
        Predef$.MODULE$.assert(dataSourceNode.hasConcreteKey());
        Predef$.MODULE$.assert(((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter((java.util.List)dataSourceNode.getConcreteKey().getKey()).asScala()).nonEmpty());
        String path = dataSourceNode.getExternalSourceRef();
        DataSource source = DataSource$.MODULE$.apply(path, SourceFormatType$.MODULE$.TIME_SERIES_PATH(), (Option<TimeWindowParams>)(dataSourceNode.hasTimestampColumnInfo() ? new Some((Object)new TimeWindowParams(dataSourceNode.getTimestampColumnInfo().getExpression(), dataSourceNode.getTimestampColumnInfo().getFormat())) : None$.MODULE$), (Option<String>)(dataSourceNode.hasFilePartitionFormat() ? new Some((Object)dataSourceNode.getFilePartitionFormat()) : None$.MODULE$), DataSource$.MODULE$.apply$default$5());
        TimeWindowParams timeWindowParam = dataSourceNode.hasTimestampColumnInfo() ? new TimeWindowParams(dataSourceNode.getTimestampColumnInfo().getExpression(), dataSourceNode.getTimestampColumnInfo().getFormat()) : new TimeWindowParams(SlidingWindowFeatureUtils$.MODULE$.TIMESTAMP_PARTITION_COLUMN(), "epoch");
        String timeStampExpr = SlidingWindowFeatureUtils$.MODULE$.constructTimeStampExpr(timeWindowParam.timestampColumn(), timeWindowParam.timestampColumnFormat(), SlidingWindowFeatureUtils$.MODULE$.constructTimeStampExpr$default$3());
        boolean needTimestampColumn = !dataSourceNode.hasTimestampColumnInfo();
        SparkSession x$1 = ss;
        DataSource x$2 = source;
        Option<DateTimeInterval> x$3 = timeRange;
        None$ x$4 = None$.MODULE$;
        boolean x$5 = false;
        boolean x$6 = needTimestampColumn;
        List<DataPathHandler> x$7 = dataPathHandlers;
        boolean x$8 = DataSourceAccessor$.MODULE$.apply$default$7();
        DataSourceAccessor dataSourceAccessor = DataSourceAccessor$.MODULE$.apply(x$1, x$2, x$3, (Option<Class<?>>)x$4, x$5, x$6, x$8, x$7);
        Dataset<Row> sourceDF = dataSourceAccessor.get();
        KeyExpressionType keyExpressionType = dataSourceNode.getKeyExpressionType();
        KeyExpressionType keyExpressionType2 = KeyExpressionType.UDF;
        if (!(keyExpressionType != null ? !keyExpressionType.equals(keyExpressionType2) : keyExpressionType2 != null)) {
            SourceKeyExtractor sourceKeyExtractor;
            Class<?> className = Class.forName(dataSourceNode.getKeyExpression());
            Object obj = className.newInstance();
            if (obj instanceof SourceKeyExtractor) {
                SourceKeyExtractor sourceKeyExtractor2;
                sourceKeyExtractor = sourceKeyExtractor2 = (SourceKeyExtractor)obj;
            } else {
                Some some;
                UdfAdaptor adaptor;
                Option<UdfAdaptor<?>> option = FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(className);
                if (!(option instanceof Some) || !((adaptor = (UdfAdaptor)(some = (Some)option).value()) instanceof SourceKeyExtractorAdaptor)) {
                    throw new UnsupportedOperationException(new StringBuilder(24).append("Unknown extractor type: ").append(className).toString());
                }
                SourceKeyExtractorAdaptor sourceKeyExtractorAdaptor = (SourceKeyExtractorAdaptor)adaptor;
                SourceKeyExtractor sourceKeyExtractor3 = (SourceKeyExtractor)sourceKeyExtractorAdaptor.adaptUdf(className.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
                sourceKeyExtractor = sourceKeyExtractor3;
            }
            SourceKeyExtractor keyExtractorClass = sourceKeyExtractor;
            tuple32 = new Tuple3(keyExtractorClass.appendKeyColumns(sourceDF), keyExtractorClass.getKeyColumnNames(keyExtractorClass.getKeyColumnNames$default$1()), (Object)timeStampExpr);
        } else {
            Seq featureKeys = (Seq)ConfigLoaderUtils$.MODULE$.javaListToSeqWithDeepCopy(KeyListExtractor.getInstance().extractFromHocon(dataSourceNode.getKeyExpression())).map((Function1 & Serializable & scala.Serializable)k -> new StringBuilder(17).append("CAST (").append((String)k).append(" AS string)").toString(), Seq$.MODULE$.canBuildFrom());
            tuple32 = tuple3 = new Tuple3(sourceDF, (Object)featureKeys, (Object)timeStampExpr);
        }
        if (tuple3 == null) {
            throw new MatchError((Object)tuple3);
        }
        Dataset df = (Dataset)tuple3._1();
        Seq keyExtractor = (Seq)tuple3._2();
        String timestampExpr = (String)tuple3._3();
        Tuple3 tuple33 = new Tuple3((Object)df, (Object)keyExtractor, (Object)timestampExpr);
        Tuple3 tuple34 = tuple33;
        Dataset df2 = (Dataset)tuple34._1();
        Seq keyExtractor2 = (Seq)tuple34._2();
        String timestampExpr2 = (String)tuple34._3();
        return new DataframeAndColumnMetadata((Dataset<Row>)df2, (Seq<String>)keyExtractor2, (Option<String>)None$.MODULE$, (Option<DataSource>)new Some((Object)source), (Option<String>)new Some((Object)timestampExpr2));
    }

    /*
     * Unable to fully structure code
     */
    private DataframeAndColumnMetadata processTableNode(SparkSession ss, com.linkedin.feathr.compute.DataSource dataSourceNode, List<DataPathHandler> dataPathHandlers) {
        block9: {
            block5: {
                block7: {
                    block8: {
                        block6: {
                            Predef$.MODULE$.assert(dataSourceNode.hasConcreteKey());
                            Predef$.MODULE$.assert(((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter((java.util.List)dataSourceNode.getConcreteKey().getKey()).asScala()).nonEmpty());
                            path = dataSourceNode.getExternalSourceRef();
                            dataSource = DataSource$.MODULE$.apply(path, SourceFormatType$.MODULE$.FIXED_PATH(), DataSource$.MODULE$.apply$default$3(), DataSource$.MODULE$.apply$default$4(), DataSource$.MODULE$.apply$default$5());
                            x$1 = ss;
                            x$2 = dataSource;
                            x$3 = None$.MODULE$;
                            x$4 = None$.MODULE$;
                            x$5 = false;
                            x$6 = dataPathHandlers;
                            x$7 = DataSourceAccessor$.MODULE$.apply$default$6();
                            x$8 = DataSourceAccessor$.MODULE$.apply$default$7();
                            dataSourceAccessor = DataSourceAccessor$.MODULE$.apply(x$1, x$2, (Option<DateTimeInterval>)x$3, (Option<Class<?>>)x$4, x$5, x$7, x$8, x$6);
                            sourceDF = dataSourceAccessor.get();
                            v0 = dataSourceNode.getKeyExpressionType();
                            var21_16 = KeyExpressionType.UDF;
                            if (v0 != null ? v0.equals(var21_16) == false : var21_16 != null) break block5;
                            className = Class.forName(dataSourceNode.getKeyExpression());
                            var23_18 = className.newInstance();
                            if (!(var23_18 instanceof SourceKeyExtractor)) break block6;
                            var24_19 = (SourceKeyExtractor)var23_18;
                            updatedDf = var24_19.appendKeyColumns(sourceDF);
                            var5_21 = new Tuple2(updatedDf, var24_19.getKeyColumnNames(var24_19.getKeyColumnNames$default$1()));
                            break block7;
                        }
                        if (!(var23_18 instanceof AnchorExtractor)) break block8;
                        var5_21 = new Tuple2(sourceDF, (Object)Nil$.MODULE$);
                        break block7;
                    }
                    x = FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(className);
                    this.log().info((Object)new StringBuilder(20).append("x is ").append(x).append(" and x type is ").append(x.getClass()).toString());
                    var27_23 = false;
                    var28_24 = null;
                    var29_25 = FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(className);
                    if (!(var29_25 instanceof Some)) ** GOTO lbl-1000
                    var27_23 = true;
                    var28_24 = (Some)var29_25;
                    adaptor = (UdfAdaptor)var28_24.value();
                    if (adaptor instanceof SourceKeyExtractorAdaptor) {
                        var31_27 = (SourceKeyExtractorAdaptor)adaptor;
                        keyExtractor = (SourceKeyExtractor)var31_27.adaptUdf(className.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
                        updatedDf = keyExtractor.appendKeyColumns(sourceDF);
                        var6_30 = new Tuple2(updatedDf, keyExtractor.getKeyColumnNames(keyExtractor.getKeyColumnNames$default$1()));
                    } else if (var27_23 && (adaptor = (UdfAdaptor)var28_24.value()) instanceof AnchorExtractorAdaptor) {
                        var6_30 = new Tuple2(sourceDF, (Object)Nil$.MODULE$);
                    } else {
                        throw new UnsupportedOperationException(new StringBuilder(98).append("Unknown extractor type: ").append(className).append(" FeathrUdfPluginContext").append(".getRegisteredUdfAdaptor(className) is ").append(FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(className)).append("and type is ").append(x.get() instanceof AnchorExtractorAdaptor).toString());
                    }
                    var5_21 = var6_30;
                }
                v1 = var5_21;
                break block9;
            }
            featureKeys = ConfigLoaderUtils$.MODULE$.javaListToSeqWithDeepCopy(KeyListExtractor.getInstance().extractFromHocon(dataSourceNode.getKeyExpression()));
            v1 = var20_33 = new Tuple2(sourceDF, featureKeys);
        }
        if (var20_33 == null) {
            throw new MatchError((Object)var20_33);
        }
        df = (Dataset)var20_33._1();
        keyExtractor = (Seq)var20_33._2();
        var4_36 = new Tuple2((Object)df, (Object)keyExtractor);
        var19_37 = var4_36;
        df = (Dataset)var19_37._1();
        keyExtractor = (Seq)var19_37._2();
        x$9 = df;
        x$10 = keyExtractor;
        x$11 = new Some((Object)dataSource);
        x$12 = DataframeAndColumnMetadata$.MODULE$.apply$default$3();
        x$13 = DataframeAndColumnMetadata$.MODULE$.apply$default$5();
        return new DataframeAndColumnMetadata((Dataset<Row>)x$9, (Seq<String>)x$10, x$12, (Option<DataSource>)x$11, x$13);
    }

    private Map<String, Duration> getOptimizedDurationMap(Seq<AnyNode> nodes) {
        Seq allSWANodes = (Seq)nodes.filter((Function1 & Serializable & scala.Serializable)node -> BoxesRunTime.boxToBoolean((boolean)DataSourceNodeEvaluator$.$anonfun$getOptimizedDurationMap$1(node)));
        Map swaDurationMap = ((TraversableOnce)allSWANodes.map((Function1 & Serializable & scala.Serializable)node -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)node.getAggregation().getInput().getId()), (Object)Duration.parse((CharSequence)node.getAggregation().getFunction().getParameters().get((Object)"window_size"))), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Seq allEventSourceNodes = (Seq)nodes.filter((Function1 & Serializable & scala.Serializable)node -> BoxesRunTime.boxToBoolean((boolean)DataSourceNodeEvaluator$.$anonfun$getOptimizedDurationMap$3(node)));
        HashMap pathToDurationMap = HashMap$.MODULE$.empty();
        allEventSourceNodes.map((Function1 & Serializable & scala.Serializable)node -> {
            Duration duration;
            String sourcePath = node.getDataSource().getExternalSourceRef();
            return !pathToDurationMap.contains((Object)sourcePath) ? pathToDurationMap.put((Object)sourcePath, swaDurationMap.apply((Object)node.getDataSource().getId())) : ((duration = (Duration)pathToDurationMap.apply((Object)sourcePath)).toHours() < ((Duration)swaDurationMap.apply((Object)node.getDataSource().getId())).toHours() ? pathToDurationMap.put((Object)sourcePath, swaDurationMap.apply((Object)node.getDataSource().getId())) : BoxedUnit.UNIT);
        }, Seq$.MODULE$.canBuildFrom());
        return pathToDurationMap.toMap(Predef$.MODULE$.$conforms());
    }

    @Override
    public Dataset<Row> evaluate(AnyNode node, FCMGraphTraverser graphTraverser, Dataset<Row> contextDf, List<DataPathHandler> dataPathHandlers) {
        com.linkedin.feathr.compute.DataSource dataSource = node.getDataSource();
        Integer nodeId = node.getDataSource().getId();
        DataSourceType dataSourceType = dataSource.getSourceType();
        if (DataSourceType.CONTEXT.equals(dataSourceType)) {
            BoxedUnit boxedUnit;
            if (dataSource.hasConcreteKey()) {
                String key = dataSource.getKeyExpression();
                Dataset<Row> df = contextDf;
                graphTraverser.nodeIdToDataframeAndColumnMetadataMap().update((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(nodeId)), (Object)new DataframeAndColumnMetadata(df, (Seq<String>)((Seq)new .colon.colon((Object)key, (List)Nil$.MODULE$)), DataframeAndColumnMetadata$.MODULE$.apply$default$3(), DataframeAndColumnMetadata$.MODULE$.apply$default$4(), DataframeAndColumnMetadata$.MODULE$.apply$default$5()));
                boxedUnit = BoxedUnit.UNIT;
            } else {
                graphTraverser.nodeIdToDataframeAndColumnMetadataMap().update((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(nodeId)), (Object)this.processContextNode(contextDf, dataSource));
                boxedUnit = BoxedUnit.UNIT;
            }
            BoxedUnit boxedUnit2 = boxedUnit;
        } else if (DataSourceType.UPDATE.equals(dataSourceType)) {
            graphTraverser.nodeIdToDataframeAndColumnMetadataMap().update((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(nodeId)), (Object)this.processTableNode(graphTraverser.ss(), dataSource, dataPathHandlers));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (DataSourceType.EVENT.equals(dataSourceType)) {
            List dataLoaderHandlers = (List)dataPathHandlers.map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.dataLoaderHandler(), List$.MODULE$.canBuildFrom());
            PathChecker pathChecker = PathChecker$.MODULE$.apply(graphTraverser.ss(), (List<DataLoaderHandler>)dataLoaderHandlers);
            TimeBasedHdfsPathAnalyzer pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, (List<DataLoaderHandler>)dataLoaderHandlers);
            PathInfo pathInfo = pathAnalyzer.analyze(node.getDataSource().getExternalSourceRef());
            Enumeration.Value value = pathInfo.dateTimeResolution();
            Enumeration.Value value2 = DateTimeResolution$.MODULE$.DAILY();
            DateTimeInterval adjustedObsTimeRange = !(value != null ? !value.equals(value2) : value2 != null) ? graphTraverser.timeConfigSettings().obsTimeRange().adjustWithDateTimeResolution(DateTimeResolution$.MODULE$.DAILY()) : graphTraverser.timeConfigSettings().obsTimeRange();
            Map<String, Duration> eventPathToDurationMap = this.getOptimizedDurationMap((Seq<AnyNode>)graphTraverser.nodes());
            Duration duration = (Duration)eventPathToDurationMap.apply((Object)node.getDataSource().getExternalSourceRef());
            if (graphTraverser.timeConfigSettings().timeConfigSettings().isEmpty() || ((JoinConfigSettings)graphTraverser.timeConfigSettings().timeConfigSettings().get()).joinTimeSetting().isEmpty()) {
                throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, "joinTimeSettings section is not defined in join config, cannot perform window aggregation operation");
            }
            DateTimeInterval adjustedTimeRange = OfflineDateTimeUtils$.MODULE$.getFactDataTimeRange(adjustedObsTimeRange, duration, (Duration[])((Object[])new Duration[]{(Duration)((JoinTimeSetting)((JoinConfigSettings)graphTraverser.timeConfigSettings().timeConfigSettings().get()).joinTimeSetting().get()).simulateTimeDelay().getOrElse((Function0 & Serializable & scala.Serializable)() -> Duration.ZERO)}));
            graphTraverser.nodeIdToDataframeAndColumnMetadataMap().update((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(node.getDataSource().getId())), (Object)this.processEventNode(graphTraverser.ss(), node.getDataSource(), (Option<DateTimeInterval>)new Some((Object)adjustedTimeRange), dataPathHandlers));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)dataSourceType);
        }
        return contextDf;
    }

    @Override
    public Dataset<Row> batchEvaluate(Seq<AnyNode> nodes, FCMGraphTraverser graphTraverser, Dataset<Row> contextDf, List<DataPathHandler> dataPathHandlers) {
        nodes.foreach((Function1 & Serializable & scala.Serializable)x$4 -> MODULE$.evaluate((AnyNode)x$4, graphTraverser, contextDf, dataPathHandlers));
        return contextDf;
    }

    public static final /* synthetic */ boolean $anonfun$getOptimizedDurationMap$1(AnyNode node) {
        return node.getAggregation() != null;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static final /* synthetic */ boolean $anonfun$getOptimizedDurationMap$3(AnyNode node) {
        if (!node.isDataSource()) return false;
        DataSourceType dataSourceType = node.getDataSource().getSourceType();
        DataSourceType dataSourceType2 = DataSourceType.EVENT;
        if (dataSourceType != null) {
            if (!dataSourceType.equals(dataSourceType2)) return false;
            return true;
        }
        if (dataSourceType2 == null) return true;
        return false;
    }

    private DataSourceNodeEvaluator$() {
        MODULE$ = this;
        this.log = Logger.getLogger(this.getClass());
    }
}

