/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.feathr.offline.generation;

import com.linkedin.feathr.common.configObj.generation.OutputProcessorConfig;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrConfigException;
import com.linkedin.feathr.offline.config.location.SimplePath;
import com.linkedin.feathr.offline.generation.FeatureGenerationPathName$;
import com.linkedin.feathr.offline.generation.IncrementalAggContext;
import com.linkedin.feathr.offline.generation.IncrementalAggSnapshotLoader;
import com.linkedin.feathr.offline.generation.IncrementalAggSnapshotLoader$;
import com.linkedin.feathr.offline.job.FeatureGenSpec;
import com.linkedin.feathr.offline.source.dataloader.BatchDataLoader;
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler;
import com.linkedin.feathr.offline.util.IncrementalAggUtils$;
import com.linkedin.feathr.offline.util.datetime.OfflineDateTimeUtils$;
import com.typesafe.config.Config;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.time.LocalDateTime;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenSeq;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.ObjectRef;

public final class IncrementalAggSnapshotLoader$
implements IncrementalAggSnapshotLoader {
    public static IncrementalAggSnapshotLoader$ MODULE$;
    private final Logger logger;

    static {
        new IncrementalAggSnapshotLoader$();
    }

    @Override
    public IncrementalAggContext load(FeatureGenSpec featureGenSpec, List<DataLoaderHandler> dataLoaderHandlers) {
        return IncrementalAggSnapshotLoader.load$(this, featureGenSpec, dataLoaderHandlers);
    }

    private Logger logger() {
        return this.logger;
    }

    @Override
    public IncrementalAggContext load(FeatureGenSpec featureGenSpec, FileSystem fs, List<DataLoaderHandler> dataLoaderHandlers) {
        IncrementalAggContext incrementalAggContext;
        boolean isIncrementalAggEnabled = featureGenSpec.isEnableIncrementalAgg();
        if (!isIncrementalAggEnabled) {
            incrementalAggContext = new IncrementalAggContext(isIncrementalAggEnabled, (Option<Object>)None$.MODULE$, (Map<String, Dataset<Row>>)Predef$.MODULE$.Map().empty(), (Map<String, String>)Predef$.MODULE$.Map().empty());
        } else {
            ObjectRef resolvedDayGapBetweenPreAggAndEndTime;
            Seq<OutputProcessorConfig> outputProcessorConfigs = featureGenSpec.getOutputProcessorConfigs();
            Seq hdfsProcessorConfig = (Seq)outputProcessorConfigs.filter((Function1 & Serializable & scala.Serializable)outputConfig -> BoxesRunTime.boxToBoolean((boolean)IncrementalAggSnapshotLoader$.$anonfun$load$1(outputConfig)));
            Seq incrementalAggDirToFeatureAndDF = (Seq)hdfsProcessorConfig.collect((PartialFunction)new scala.Serializable(resolvedDayGapBetweenPreAggAndEndTime = ObjectRef.create((Object)None$.MODULE$), featureGenSpec, fs, dataLoaderHandlers){
                public static final long serialVersionUID = 0L;
                private final ObjectRef resolvedDayGapBetweenPreAggAndEndTime$1;
                private final FeatureGenSpec featureGenSpec$1;
                private final FileSystem fs$1;
                private final List dataLoaderHandlers$1;

                public final <A1 extends OutputProcessorConfig, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                    Object object;
                    A1 A1 = x1;
                    if (A1.getParams().hasPath(FeatureGenerationPathName$.MODULE$.STORE_NAME()) && A1.getParams().hasPath(FeatureGenerationPathName$.MODULE$.FEATURES())) {
                        Tuple2 tuple2;
                        Config params = A1.getParams();
                        String path = params.getString("path");
                        String basePath = new StringBuilder(1).append(path).append("/").append(params.getString(FeatureGenerationPathName$.MODULE$.STORE_NAME())).toString();
                        String preAggRootDir = FeatureGenerationPathName$.MODULE$.getDataPath(basePath, (Option<String>)None$.MODULE$);
                        this.resolvedDayGapBetweenPreAggAndEndTime$1.elem = IncrementalAggUtils$.MODULE$.getDaysGapBetweenLatestAggSnapshotAndEndTime(preAggRootDir, this.featureGenSpec$1.endTimeStr(), this.featureGenSpec$1.endTimeFormat());
                        if (this.fs$1.exists(new Path(basePath)) && ((Option)this.resolvedDayGapBetweenPreAggAndEndTime$1.elem).isDefined()) {
                            LocalDateTime endDate = OfflineDateTimeUtils$.MODULE$.createTimeFromString(this.featureGenSpec$1.endTimeStr(), this.featureGenSpec$1.endTimeFormat(), OfflineDateTimeUtils$.MODULE$.createTimeFromString$default$3()).toLocalDateTime();
                            String directory = (String)IncrementalAggUtils$.MODULE$.getLatestAggSnapshotDFPath(preAggRootDir, endDate).get();
                            SparkSession spark = SparkSession$.MODULE$.builder().getOrCreate();
                            Dataset<Row> preAggSnapshot = new BatchDataLoader(spark, new SimplePath(directory), (List<DataLoaderHandler>)this.dataLoaderHandlers$1).loadDataFrame();
                            Buffer features = (Buffer)JavaConverters$.MODULE$.asScalaBufferConverter(params.getStringList(FeatureGenerationPathName$.MODULE$.FEATURES())).asScala();
                            Object[] objectArray = Predef$.MODULE$.refArrayOps((Object[])preAggSnapshot.columns());
                            Buffer oldFeatures = (Buffer)features.filter((Function1 & Serializable & scala.Serializable)elem -> BoxesRunTime.boxToBoolean((boolean)anonfun.1.$anonfun$applyOrElse$1(objectArray, elem)));
                            tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)preAggRootDir), (Object)new Tuple2((Object)oldFeatures, (Object)new Some(preAggSnapshot)));
                        } else {
                            tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)""), (Object)new Tuple2((Object)Nil$.MODULE$, (Object)None$.MODULE$));
                        }
                        object = tuple2;
                    } else {
                        object = function1.apply(x1);
                    }
                    return (B1)object;
                }

                public final boolean isDefinedAt(OutputProcessorConfig x1) {
                    OutputProcessorConfig outputProcessorConfig = x1;
                    boolean bl = outputProcessorConfig.getParams().hasPath(FeatureGenerationPathName$.MODULE$.STORE_NAME()) && outputProcessorConfig.getParams().hasPath(FeatureGenerationPathName$.MODULE$.FEATURES());
                    return bl;
                }

                public static final /* synthetic */ boolean $anonfun$applyOrElse$1(Object[] eta$0$1$1, Object elem) {
                    return new ArrayOps.ofRef(eta$0$1$1).contains(elem);
                }
                {
                    this.resolvedDayGapBetweenPreAggAndEndTime$1 = resolvedDayGapBetweenPreAggAndEndTime$1;
                    this.featureGenSpec$1 = featureGenSpec$1;
                    this.fs$1 = fs$1;
                    this.dataLoaderHandlers$1 = dataLoaderHandlers$1;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$1$adapted(java.lang.Object[] java.lang.Object )}, serializedLambda);
                }
            }, Seq$.MODULE$.canBuildFrom());
            if (incrementalAggDirToFeatureAndDF.isEmpty()) {
                throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, new StringBuilder(109).append("In order to support incremental aggregation, please specify ").append(FeatureGenerationPathName$.MODULE$.STORE_NAME()).append(" and ").append(FeatureGenerationPathName$.MODULE$.FEATURES()).append(" in the HDFS output").append(" processors with FDS type").toString());
            }
            Map<String, String> preAggRootDirMap = this.getPreAggRootDirMap((Seq<Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>>)incrementalAggDirToFeatureAndDF);
            Map<String, Dataset<Row>> preAggSnapshotMap = this.getPreAggSnapshotMap((Seq<Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>>)incrementalAggDirToFeatureAndDF);
            incrementalAggContext = new IncrementalAggContext(isIncrementalAggEnabled, (Option<Object>)((Option)resolvedDayGapBetweenPreAggAndEndTime.elem), preAggSnapshotMap, preAggRootDirMap);
        }
        return incrementalAggContext;
    }

    public Map<String, String> getPreAggRootDirMap(Seq<Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>> aggDirToFeatureAndDF) {
        return ((TraversableOnce)aggDirToFeatureAndDF.flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2;
            String preAggDir;
            block3: {
                Tuple2 tuple22;
                block2: {
                    tuple22 = x0$1;
                    if (tuple22 == null) break block2;
                    preAggDir = (String)tuple22._1();
                    tuple2 = (Tuple2)tuple22._2();
                    if (tuple2 != null) break block3;
                }
                throw new MatchError((Object)tuple22);
            }
            Seq features = (Seq)tuple2._1();
            Seq seq = (Seq)features.map((Function1 & Serializable & scala.Serializable)featureName -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(featureName), (Object)preAggDir), Seq$.MODULE$.canBuildFrom());
            return seq;
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    public Map<String, Dataset<Row>> getPreAggSnapshotMap(Seq<Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>> aggDirToFeatureAndDF) {
        Map map;
        Seq featureNameToPreAggDF = (Seq)aggDirToFeatureAndDF.collect((PartialFunction)new scala.Serializable(){
            public static final long serialVersionUID = 0L;

            /*
             * Enabled aggressive block sorting
             */
            public final <A1 extends Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>>, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                Tuple2 tuple2;
                A1 A1 = x1;
                if (A1 != null && (tuple2 = (Tuple2)A1._2()) != null) {
                    Seq features = (Seq)tuple2._1();
                    Option option = (Option)tuple2._2();
                    if (option instanceof Some) {
                        Some some = (Some)option;
                        Dataset preAggDF = (Dataset)some.value();
                        object = features.map((Function1 & Serializable & scala.Serializable)featureName -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(featureName), (Object)preAggDF), Seq$.MODULE$.canBuildFrom());
                        return (B1)object;
                    }
                }
                object = function1.apply(x1);
                return (B1)object;
            }

            public final boolean isDefinedAt(Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>> x1) {
                Option option;
                Tuple2 tuple2;
                Tuple2<String, Tuple2<Seq<String>, Option<Dataset<Row>>>> tuple22 = x1;
                boolean bl = tuple22 != null && (tuple2 = (Tuple2)tuple22._2()) != null && (option = (Option)tuple2._2()) instanceof Some;
                return bl;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$2(org.apache.spark.sql.Dataset java.lang.String )}, serializedLambda);
            }
        }, Seq$.MODULE$.canBuildFrom());
        if (featureNameToPreAggDF.nonEmpty()) {
            map = ((TraversableOnce)featureNameToPreAggDF.reduce((Function2 & Serializable & scala.Serializable)(x$1, x$2) -> (Seq)x$1.union((GenSeq)x$2, Seq$.MODULE$.canBuildFrom()))).toMap(Predef$.MODULE$.$conforms());
        } else {
            this.logger().info("No preAgg dataset exist so far, i.e. first time running incremental agg (cold start)");
            map = Predef$.MODULE$.Map().empty();
        }
        return map;
    }

    public static final /* synthetic */ boolean $anonfun$load$1(OutputProcessorConfig outputConfig) {
        String string = outputConfig.getName();
        String string2 = "HDFS";
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    private IncrementalAggSnapshotLoader$() {
        MODULE$ = this;
        IncrementalAggSnapshotLoader.$init$(this);
        this.logger = LogManager.getLogger(this.getClass());
    }
}

