/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.prophecy;

import akka.Done$;
import akka.actor.ActorRef;
import akka.actor.ClassicActorSystemProvider;
import akka.actor.PoisonPill$;
import akka.actor.Props;
import akka.actor.Props$;
import akka.actor.ScalaActorRef;
import akka.actor.package$;
import akka.event.LoggingAdapter;
import akka.http.scaladsl.ConnectionContext;
import akka.http.scaladsl.Http$;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.Uri$;
import akka.http.scaladsl.model.ws.Message;
import akka.http.scaladsl.model.ws.WebSocketRequest;
import akka.http.scaladsl.model.ws.WebSocketRequest$;
import akka.http.scaladsl.model.ws.WebSocketUpgradeResponse;
import akka.http.scaladsl.settings.ClientConnectionSettings;
import akka.stream.Graph;
import akka.stream.Materializer$;
import akka.stream.OverflowStrategy$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.UUID;
import org.apache.spark.sql.prophecy.NoopEventActor$;
import org.apache.spark.sql.prophecy.ProphecyEventActor;
import org.apache.spark.sql.prophecy.ProphecyEventSendingListener$;
import org.apache.spark.sql.prophecy.util.CommonUtils$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.mutable.Map;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;

public final class ProphecyEventActor$
implements LazyLogging {
    public static ProphecyEventActor$ MODULE$;
    private final Map<String, ActorRef> mapFromSessionIdToWs;
    private final FiniteDuration ReapInterval;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    static {
        new ProphecyEventActor$();
    }

    private Logger logger$lzycompute() {
        ProphecyEventActor$ prophecyEventActor$ = this;
        synchronized (prophecyEventActor$) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$((LazyLogging)this);
                this.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? this.logger$lzycompute() : this.logger;
    }

    public Map<String, ActorRef> mapFromSessionIdToWs() {
        return this.mapFromSessionIdToWs;
    }

    public FiniteDuration ReapInterval() {
        return this.ReapInterval;
    }

    public Props props(String sessionId) {
        return Props$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> new ProphecyEventActor(sessionId), ClassTag$.MODULE$.apply(ProphecyEventActor.class));
    }

    public ActorRef apply(String url, String sessionId, boolean scheduled) {
        ActorRef actorRef;
        if (!scheduled) {
            ActorRef actorRef2;
            Option actorRef3;
            BoxedUnit boxedUnit;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Creating event actor for session {} ", new Object[]{sessionId});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            Option option = actorRef3 = this.mapFromSessionIdToWs().get((Object)sessionId);
            if (option instanceof Some) {
                BoxedUnit boxedUnit2;
                Some some = (Some)option;
                ActorRef value = (ActorRef)some.value();
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Returning existing actor {}", new Object[]{sessionId});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                actorRef2 = value;
            } else if (None$.MODULE$.equals(option)) {
                actorRef2 = this.createActorRef(url, sessionId);
            } else {
                throw new MatchError((Object)option);
            }
            ActorRef actor = actorRef2;
            this.mapFromSessionIdToWs().$plus$eq(Predef.ArrowAssoc$.MODULE$.$u2192$extension(Predef$.MODULE$.ArrowAssoc((Object)sessionId), (Object)actor));
            actorRef = actor;
        } else {
            BoxedUnit boxedUnit;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Creating noop - event actor for session {} ", new Object[]{sessionId});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            actorRef = ProphecyEventSendingListener$.MODULE$.system().actorOf(NoopEventActor$.MODULE$.props());
        }
        return actorRef;
    }

    public boolean apply$default$3() {
        return false;
    }

    public void remove(String sessionId) {
        this.mapFromSessionIdToWs().$minus$eq((Object)sessionId);
    }

    public ActorRef createActorRef(String url, String sessionId) {
        BoxedUnit boxedUnit;
        SourceQueueWithComplete sourceQueueWithComplete;
        Future upgradeResp;
        ActorRef actor;
        block8: {
            Tuple2 tuple2;
            block7: {
                BoxedUnit boxedUnit2;
                String urlWithSession = new StringBuilder(1).append(url).append("/").append(sessionId).toString();
                Source source = Source$.MODULE$.queue(1024, OverflowStrategy$.MODULE$.backpressure());
                actor = ProphecyEventSendingListener$.MODULE$.system().actorOf(this.props(sessionId), new StringBuilder(1).append(sessionId).append("-").append(UUID.randomUUID().toString()).toString());
                Sink sink = Sink$.MODULE$.actorRef(actor, (Object)Done$.MODULE$);
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Connecting to websocket at url {}", new Object[]{urlWithSession});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                HttpExt qual$1 = Http$.MODULE$.apply(ProphecyEventSendingListener$.MODULE$.system());
                WebSocketRequest x$1 = new WebSocketRequest(Uri$.MODULE$.apply(urlWithSession), WebSocketRequest$.MODULE$.apply$default$2(), WebSocketRequest$.MODULE$.apply$default$3());
                ConnectionContext x$2 = qual$1.webSocketClientFlow$default$2();
                Option x$3 = qual$1.webSocketClientFlow$default$3();
                ClientConnectionSettings x$4 = qual$1.webSocketClientFlow$default$4();
                LoggingAdapter x$5 = qual$1.webSocketClientFlow$default$5();
                Flow flow = qual$1.webSocketClientFlow(x$1, x$2, x$3, x$4, x$5);
                tuple2 = (Tuple2)source.viaMat((Graph)flow, Keep$.MODULE$.both()).toMat((Graph)sink, Keep$.MODULE$.left()).run(Materializer$.MODULE$.matFromSystem((ClassicActorSystemProvider)ProphecyEventSendingListener$.MODULE$.system()));
                if (tuple2 == null) break block7;
                SourceQueueWithComplete queue = (SourceQueueWithComplete)tuple2._1();
                upgradeResp = (Future)tuple2._2();
                if (queue == null) break block7;
                sourceQueueWithComplete = queue;
                if (upgradeResp != null) break block8;
            }
            throw new MatchError((Object)tuple2);
        }
        Future future = upgradeResp;
        Tuple2 tuple2 = new Tuple2((Object)sourceQueueWithComplete, (Object)future);
        Tuple2 tuple22 = tuple2;
        SourceQueueWithComplete queue = (SourceQueueWithComplete)tuple22._1();
        Future upgradeResp2 = (Future)tuple22._2();
        WebSocketUpgradeResponse ur = (WebSocketUpgradeResponse)Await$.MODULE$.result((Awaitable)upgradeResp2, (Duration)new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minute());
        ScalaActorRef qual$2 = package$.MODULE$.actorRef2Scala(actor);
        ProphecyEventActor.Initialize x$6 = new ProphecyEventActor.Initialize((SourceQueueWithComplete<Message>)queue);
        ActorRef x$7 = qual$2.$bang$default$2((Object)x$6);
        qual$2.$bang((Object)x$6, x$7);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("Websocket upgrade successful: {}", new Object[]{ur});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return actor;
    }

    public void kill(String url, String sessionId) {
        ScalaActorRef qual$1 = package$.MODULE$.actorRef2Scala(this.apply(url, sessionId, this.apply$default$3()));
        PoisonPill$ x$1 = PoisonPill$.MODULE$;
        ActorRef x$2 = qual$1.$bang$default$2((Object)x$1);
        qual$1.$bang((Object)x$1, x$2);
    }

    private ProphecyEventActor$() {
        MODULE$ = this;
        LazyLogging.$init$((LazyLogging)this);
        this.mapFromSessionIdToWs = CommonUtils$.MODULE$.emptyMMap();
        this.ReapInterval = new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).minutes();
    }
}

