/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.service;

import java.io.Serializable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkSQLException;
import org.apache.spark.connect.proto.ExecutePlanRequest;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.connect.config.Connect$;
import org.apache.spark.sql.connect.service.ExecuteHolder;
import org.apache.spark.sql.connect.service.ExecuteInfo;
import org.apache.spark.sql.connect.service.ExecuteKey;
import org.apache.spark.sql.connect.service.SessionHolder;
import org.apache.spark.sql.connect.service.SparkConnectService$;
import org.slf4j.Logger;
import org.sparkproject.connect.guava.cache.Cache;
import org.sparkproject.connect.guava.cache.CacheBuilder;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.HashMap;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Left;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005%d!B\f\u0019\u0001i!\u0003\"B\u0019\u0001\t\u0003\u0019\u0004b\u0002\u001c\u0001\u0005\u0004%Ia\u000e\u0005\u0007\r\u0002\u0001\u000b\u0011\u0002\u001d\t\u000f\u001d\u0003!\u0019!C\u0005\u0011\"1\u0011\u000b\u0001Q\u0001\n%CqA\u0015\u0001C\u0002\u0013%1\u000b\u0003\u0004d\u0001\u0001\u0006I\u0001\u0016\u0005\bI\u0002\u0001\r\u0011\"\u0003f\u0011\u001da\u0007\u00011A\u0005\n5Daa\u001d\u0001!B\u00131\u0007b\u0002;\u0001\u0001\u0004%I!\u001e\u0005\t\u007f\u0002\u0001\r\u0011\"\u0003\u0002\u0002!9\u0011Q\u0001\u0001!B\u00131\b\u0002CA\u0004\u0001\u0011\u0005!$!\u0003\t\u0011\u0005u\u0001\u0001\"\u0001\u001b\u0003?A\u0001\"!\n\u0001\t\u0003Q\u0012q\u0005\u0005\t\u0003[\u0001A\u0011\u0001\u000e\u00020!9\u0011Q\u0007\u0001\u0005\u0002\u0005]\u0002bBA,\u0001\u0011\u0005\u0011\u0011\f\u0005\t\u00037\u0002A\u0011\u0001\r\u0002^!9\u0011q\f\u0001\u0005\n\u0005u\u0003\u0002CA1\u0001\u0011\u0005!$a\u0019\u00039M\u0003\u0018M]6D_:tWm\u0019;Fq\u0016\u001cW\u000f^5p]6\u000bg.Y4fe*\u0011\u0011DG\u0001\bg\u0016\u0014h/[2f\u0015\tYB$A\u0004d_:tWm\u0019;\u000b\u0005uq\u0012aA:rY*\u0011q\u0004I\u0001\u0006gB\f'o\u001b\u0006\u0003C\t\na!\u00199bG\",'\"A\u0012\u0002\u0007=\u0014xmE\u0002\u0001K-\u0002\"AJ\u0015\u000e\u0003\u001dR\u0011\u0001K\u0001\u0006g\u000e\fG.Y\u0005\u0003U\u001d\u0012a!\u00118z%\u00164\u0007C\u0001\u00170\u001b\u0005i#B\u0001\u0018\u001f\u0003!Ig\u000e^3s]\u0006d\u0017B\u0001\u0019.\u0005\u001daunZ4j]\u001e\fa\u0001P5oSRt4\u0001\u0001\u000b\u0002iA\u0011Q\u0007A\u0007\u00021\u0005QQ\r_3dkRLwN\\:\u0016\u0003a\u0002B!\u000f A\u00076\t!H\u0003\u0002<y\u00059Q.\u001e;bE2,'BA\u001f(\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003\u007fi\u0012q\u0001S1tQ6\u000b\u0007\u000f\u0005\u00026\u0003&\u0011!\t\u0007\u0002\u000b\u000bb,7-\u001e;f\u0017\u0016L\bCA\u001bE\u0013\t)\u0005DA\u0007Fq\u0016\u001cW\u000f^3I_2$WM]\u0001\fKb,7-\u001e;j_:\u001c\b%\u0001\bfq\u0016\u001cW\u000f^5p]NdunY6\u0016\u0003%\u0003\"AS(\u000e\u0003-S!\u0001T'\u0002\t1\fgn\u001a\u0006\u0002\u001d\u0006!!.\u0019<b\u0013\t\u00016J\u0001\u0004PE*,7\r^\u0001\u0010Kb,7-\u001e;j_:\u001cHj\\2lA\u0005\u0019\u0012MY1oI>tW\r\u001a+p[\n\u001cHo\u001c8fgV\tA\u000b\u0005\u0003V=\u0002\u0003W\"\u0001,\u000b\u0005]C\u0016!B2bG\",'BA-[\u0003\u0019\u0019w.\\7p]*\u00111\fX\u0001\u0007O>|w\r\\3\u000b\u0003u\u000b1aY8n\u0013\tyfKA\u0003DC\u000eDW\r\u0005\u00026C&\u0011!\r\u0007\u0002\f\u000bb,7-\u001e;f\u0013:4w.\u0001\u000bbE\u0006tGm\u001c8fIR{WNY:u_:,7\u000fI\u0001\u0012Y\u0006\u001cH/\u0012=fGV$\u0018n\u001c8US6,W#\u00014\u0011\u0007\u0019:\u0017.\u0003\u0002iO\t1q\n\u001d;j_:\u0004\"A\n6\n\u0005-<#\u0001\u0002'p]\u001e\fQ\u0003\\1ti\u0016CXmY;uS>tG+[7f?\u0012*\u0017\u000f\u0006\u0002ocB\u0011ae\\\u0005\u0003a\u001e\u0012A!\u00168ji\"9!/CA\u0001\u0002\u00041\u0017a\u0001=%c\u0005\u0011B.Y:u\u000bb,7-\u001e;j_:$\u0016.\\3!\u0003E\u00198\r[3ek2,G-\u0012=fGV$xN]\u000b\u0002mB\u0019aeZ<\u0011\u0005alX\"A=\u000b\u0005i\\\u0018AC2p]\u000e,(O]3oi*\u0011A0T\u0001\u0005kRLG.\u0003\u0002\u007fs\nA2k\u00195fIVdW\rZ#yK\u000e,Ho\u001c:TKJ4\u0018nY3\u0002+M\u001c\u0007.\u001a3vY\u0016$W\t_3dkR|'o\u0018\u0013fcR\u0019a.a\u0001\t\u000fId\u0011\u0011!a\u0001m\u0006\u00112o\u00195fIVdW\rZ#yK\u000e,Ho\u001c:!\u0003M\u0019'/Z1uK\u0016CXmY;uK\"{G\u000eZ3s)\r\u0019\u00151\u0002\u0005\b\u0003\u001bq\u0001\u0019AA\b\u0003\u001d\u0011X-];fgR\u0004B!!\u0005\u0002\u001a5\u0011\u00111\u0003\u0006\u0005\u0003+\t9\"A\u0003qe>$xN\u0003\u0002\u001c=%!\u00111DA\n\u0005I)\u00050Z2vi\u0016\u0004F.\u00198SKF,Xm\u001d;\u0002'I,Wn\u001c<f\u000bb,7-\u001e;f\u0011>dG-\u001a:\u0015\u00079\f\t\u0003\u0003\u0004\u0002$=\u0001\r\u0001Q\u0001\u0004W\u0016L\u0018\u0001E4fi\u0016CXmY;uK\"{G\u000eZ3s)\u0011\tI#a\u000b\u0011\u0007\u0019:7\t\u0003\u0004\u0002$A\u0001\r\u0001Q\u0001\u0016O\u0016$\u0018IY1oI>tW\r\u001a+p[\n\u001cHo\u001c8f)\u0011\t\t$a\r\u0011\u0007\u0019:\u0007\r\u0003\u0004\u0002$E\u0001\r\u0001Q\u0001\u0015Y&\u001cH/Q2uSZ,W\t_3dkRLwN\\:\u0016\u0005\u0005e\u0002cBA\u001e\u0003\u0017J\u0017\u0011\u000b\b\u0005\u0003{\t9E\u0004\u0003\u0002@\u0005\u0015SBAA!\u0015\r\t\u0019EM\u0001\u0007yI|w\u000e\u001e \n\u0003!J1!!\u0013(\u0003\u001d\u0001\u0018mY6bO\u0016LA!!\u0014\u0002P\t1Q)\u001b;iKJT1!!\u0013(!\u0015\tY$a\u0015a\u0013\u0011\t)&a\u0014\u0003\u0007M+\u0017/A\fmSN$\u0018IY1oI>tW\rZ#yK\u000e,H/[8ogV\u0011\u0011\u0011K\u0001\tg\",H\u000fZ8x]R\ta.\u0001\ftG\",G-\u001e7f!\u0016\u0014\u0018n\u001c3jG\u000eCWmY6t\u0003M\u0001XM]5pI&\u001cW*Y5oi\u0016t\u0017M\\2f)\rq\u0017Q\r\u0005\u0007\u0003O2\u0002\u0019A5\u0002\u000fQLW.Z8vi\u0002")
public class SparkConnectExecutionManager
implements Logging {
    private final HashMap<ExecuteKey, ExecuteHolder> executions;
    private final Object executionsLock;
    private final Cache<ExecuteKey, ExecuteInfo> abandonedTombstones;
    private Option<Object> lastExecutionTime;
    private Option<ScheduledExecutorService> scheduledExecutor;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private HashMap<ExecuteKey, ExecuteHolder> executions() {
        return this.executions;
    }

    private Object executionsLock() {
        return this.executionsLock;
    }

    private Cache<ExecuteKey, ExecuteInfo> abandonedTombstones() {
        return this.abandonedTombstones;
    }

    private Option<Object> lastExecutionTime() {
        return this.lastExecutionTime;
    }

    private void lastExecutionTime_$eq(Option<Object> x$1) {
        this.lastExecutionTime = x$1;
    }

    private Option<ScheduledExecutorService> scheduledExecutor() {
        return this.scheduledExecutor;
    }

    private void scheduledExecutor_$eq(Option<ScheduledExecutorService> x$1) {
        this.scheduledExecutor = x$1;
    }

    public ExecuteHolder createExecuteHolder(ExecutePlanRequest request) {
        SessionHolder sessionHolder = SparkConnectService$.MODULE$.getOrCreateIsolatedSession(request.getUserContext().getUserId(), request.getSessionId());
        ExecuteHolder executeHolder = new ExecuteHolder(request, sessionHolder);
        Object object = this.executionsLock();
        synchronized (object) {
            if (this.executions().get((Object)executeHolder.key()).isDefined()) {
                if (this.getAbandonedTombstone(executeHolder.key()).isDefined()) {
                    throw new SparkSQLException("INVALID_HANDLE.OPERATION_ABANDONED", (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"handle"), (Object)executeHolder.operationId())})));
                }
                throw new SparkSQLException("INVALID_HANDLE.OPERATION_ALREADY_EXISTS", (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"handle"), (Object)executeHolder.operationId())})));
            }
            sessionHolder.addExecuteHolder(executeHolder);
            this.executions().put((Object)executeHolder.key(), (Object)executeHolder);
            this.lastExecutionTime_$eq((Option<Object>)None$.MODULE$);
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(26).append("ExecuteHolder ").append(executeHolder.key()).append(" is created.").toString());
        }
        this.schedulePeriodicChecks();
        return executeHolder;
    }

    public void removeExecuteHolder(ExecuteKey key) {
        None$ executeHolder = None$.MODULE$;
        Object object = this.executionsLock();
        synchronized (object) {
            executeHolder = this.executions().remove((Object)key);
            executeHolder.foreach((Function1 & Serializable & scala.Serializable)e -> {
                SparkConnectExecutionManager.$anonfun$removeExecuteHolder$1(e);
                return BoxedUnit.UNIT;
            });
            if (this.executions().isEmpty()) {
                this.lastExecutionTime_$eq((Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis())));
            }
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(26).append("ExecuteHolder ").append(key).append(" is removed.").toString());
        }
        executeHolder.foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
            x$1.close();
            return BoxedUnit.UNIT;
        });
    }

    public Option<ExecuteHolder> getExecuteHolder(ExecuteKey key) {
        Option option;
        Object object = this.executionsLock();
        synchronized (object) {
            option = this.executions().get((Object)key);
        }
        return option;
    }

    public Option<ExecuteInfo> getAbandonedTombstone(ExecuteKey key) {
        return Option$.MODULE$.apply((Object)this.abandonedTombstones().getIfPresent(key));
    }

    public Either<Object, Seq<ExecuteInfo>> listActiveExecutions() {
        Left left;
        Object object = this.executionsLock();
        synchronized (object) {
            left = this.executions().isEmpty() ? package$.MODULE$.Left().apply(this.lastExecutionTime().get()) : package$.MODULE$.Right().apply((Object)((TraversableOnce)this.executions().values().map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.getExecuteInfo(), Iterable$.MODULE$.canBuildFrom())).toBuffer().toSeq());
        }
        return left;
    }

    public Seq<ExecuteInfo> listAbandonedExecutions() {
        return ((MapLike)JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(this.abandonedTombstones().asMap()).asScala()).values().toBuffer().toSeq();
    }

    public void shutdown() {
        Object object = this.executionsLock();
        synchronized (object) {
            this.scheduledExecutor().foreach((Function1 & Serializable & scala.Serializable)executor -> BoxesRunTime.boxToBoolean((boolean)SparkConnectExecutionManager.$anonfun$shutdown$1(executor)));
            this.scheduledExecutor_$eq((Option<ScheduledExecutorService>)None$.MODULE$);
        }
    }

    private void schedulePeriodicChecks() {
        Object object = this.executionsLock();
        synchronized (object) {
            long interval = BoxesRunTime.unboxToLong((Object)SparkEnv$.MODULE$.get().conf().get(Connect$.MODULE$.CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL()));
            long timeout = BoxesRunTime.unboxToLong((Object)SparkEnv$.MODULE$.get().conf().get(Connect$.MODULE$.CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT()));
            Option<ScheduledExecutorService> option = this.scheduledExecutor();
            if (option instanceof Some) {
            } else if (None$.MODULE$.equals(option)) {
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(61).append("Starting thread for cleanup of abandoned executions every ").append(interval).append(" ms").toString());
                this.scheduledExecutor_$eq((Option<ScheduledExecutorService>)new Some((Object)Executors.newSingleThreadScheduledExecutor()));
                ((ScheduledExecutorService)this.scheduledExecutor().get()).scheduleAtFixedRate(() -> {
                    try {
                        this.periodicMaintenance(timeout);
                    }
                    catch (Throwable throwable) {
                        Throwable throwable2 = throwable;
                        Option option = NonFatal$.MODULE$.unapply(throwable2);
                        if (!option.isEmpty()) {
                            Throwable ex = (Throwable)option.get();
                            this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unexpected exception in periodic task", ex);
                        }
                        throw throwable;
                    }
                }, interval, interval, TimeUnit.MILLISECONDS);
            } else {
                throw new MatchError(option);
            }
        }
    }

    public void periodicMaintenance(long timeout) {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Started periodic run of SparkConnectExecutionManager maintenance.");
        ArrayBuffer toRemove = new ArrayBuffer();
        Object object = this.executionsLock();
        synchronized (object) {
            long nowMs = System.currentTimeMillis();
            this.executions().values().foreach((Function1 & Serializable & scala.Serializable)executeHolder -> {
                Option<Object> option = executeHolder.lastAttachedRpcTime();
                if (option instanceof Some) {
                    Some some = (Some)option;
                    long detached = BoxesRunTime.unboxToLong((Object)some.value());
                    if (detached + timeout < nowMs) {
                        return toRemove.$plus$eq(executeHolder);
                    }
                    return BoxedUnit.UNIT;
                }
                return BoxedUnit.UNIT;
            });
        }
        if (!toRemove.isEmpty()) {
            toRemove.foreach((Function1 & Serializable & scala.Serializable)executeHolder -> {
                SparkConnectExecutionManager.$anonfun$periodicMaintenance$3(this, executeHolder);
                return BoxedUnit.UNIT;
            });
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Finished periodic run of SparkConnectExecutionManager maintenance.");
    }

    public static final /* synthetic */ void $anonfun$removeExecuteHolder$1(ExecuteHolder e) {
        e.sessionHolder().removeExecuteHolder(e.operationId());
    }

    public static final /* synthetic */ boolean $anonfun$shutdown$1(ScheduledExecutorService executor) {
        executor.shutdown();
        return executor.awaitTermination(1L, TimeUnit.MINUTES);
    }

    public static final /* synthetic */ void $anonfun$periodicMaintenance$3(SparkConnectExecutionManager $this, ExecuteHolder executeHolder) {
        ExecuteInfo info = executeHolder.getExecuteInfo();
        $this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Found execution ").append(info).append(" that was abandoned and expired and will be removed.").toString());
        $this.removeExecuteHolder(executeHolder.key());
        $this.abandonedTombstones().put(executeHolder.key(), info);
    }

    public SparkConnectExecutionManager() {
        Logging.$init$((Logging)this);
        this.executions = new HashMap();
        this.executionsLock = new Object();
        this.abandonedTombstones = CacheBuilder.newBuilder().maximumSize(BoxesRunTime.unboxToInt((Object)SparkEnv$.MODULE$.get().conf().get(Connect$.MODULE$.CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE()))).build();
        this.lastExecutionTime = new Some((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis()));
        this.scheduledExecutor = None$.MODULE$;
    }
}

