/*
 * Decompiled with CFR 0.152.
 */
package quix.python;

import com.google.common.io.Resources;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import com.zaxxer.nuprocess.NuProcess;
import com.zaxxer.nuprocess.NuProcessBuilder;
import com.zaxxer.nuprocess.NuProcessHandler;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.atomic.AtomicInt;
import monix.execution.atomic.AtomicInt$;
import monix.reactive.Observable;
import monix.reactive.Observable$;
import monix.reactive.OverflowStrategy;
import monix.reactive.observers.Subscriber;
import py4j.GatewayServer;
import quix.api.v1.execute.Batch;
import quix.api.v1.execute.Batch$;
import quix.api.v1.execute.BatchColumn$;
import quix.api.v2.execute.Builder;
import quix.api.v2.execute.Executor;
import quix.api.v2.execute.SubQuery;
import quix.python.ProcessEndSuccess;
import quix.python.ProcessStartFailure;
import quix.python.ProcessStderr;
import quix.python.ProcessStdout;
import quix.python.PythonBridge;
import quix.python.PythonConfig;
import quix.python.PythonExecutor$;
import quix.python.PythonMessage;
import quix.python.PythonProcessHandler;
import quix.python.PythonRunningProcess;
import quix.python.PythonRunningProcess$;
import quix.python.TabEnd;
import quix.python.TabFields;
import quix.python.TabRow;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005\rd\u0001B\t\u0013\u0001]A\u0001B\r\u0001\u0003\u0002\u0003\u0006Ia\r\u0005\u0006o\u0001!\t\u0001\u000f\u0005\bw\u0001\u0001\r\u0011\"\u0001=\u0011\u001d9\u0005\u00011A\u0005\u0002!CaA\u0014\u0001!B\u0013i\u0004\"B(\u0001\t\u0003\u0001\u0006\"B\u0011\u0001\t\u0003\u0001\b\"B>\u0001\t\u0003a\bbBA\u0003\u0001\u0011\u0005\u0011q\u0001\u0005\b\u0003#\u0001A\u0011BA\n\u0011\u001d\t\u0019\u0003\u0001C\u0001\u0003KAq!!\u000f\u0001\t\u0003\tYdB\u0005\u0002DI\t\t\u0011#\u0001\u0002F\u0019A\u0011CEA\u0001\u0012\u0003\t9\u0005\u0003\u00048\u001d\u0011\u0005\u0011\u0011\n\u0005\n\u0003\u0017r\u0011\u0013!C\u0001\u0003\u001b\u0012a\u0002U=uQ>tW\t_3dkR|'O\u0003\u0002\u0014)\u00051\u0001/\u001f;i_:T\u0011!F\u0001\u0005cVL\u0007p\u0001\u0001\u0014\t\u0001Ab\u0004\u000b\t\u00033qi\u0011A\u0007\u0006\u00027\u0005)1oY1mC&\u0011QD\u0007\u0002\u0007\u0003:L(+\u001a4\u0011\u0005}1S\"\u0001\u0011\u000b\u0005\u0005\u0012\u0013aB3yK\u000e,H/\u001a\u0006\u0003G\u0011\n!A\u001e\u001a\u000b\u0005\u0015\"\u0012aA1qS&\u0011q\u0005\t\u0002\t\u000bb,7-\u001e;peB\u0011\u0011\u0006M\u0007\u0002U)\u00111\u0006L\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003[9\n\u0001\u0002^=qKN\fg-\u001a\u0006\u0002_\u0005\u00191m\\7\n\u0005ER#a\u0003'bufdunZ4j]\u001e\faaY8oM&<\u0007C\u0001\u001b6\u001b\u0005\u0011\u0012B\u0001\u001c\u0013\u00051\u0001\u0016\u0010\u001e5p]\u000e{gNZ5h\u0003\u0019a\u0014N\\5u}Q\u0011\u0011H\u000f\t\u0003i\u0001AqA\r\u0002\u0011\u0002\u0003\u00071'\u0001\u0003q_J$X#A\u001f\u0011\u0005y*U\"A \u000b\u0005\u0001\u000b\u0015AB1u_6L7M\u0003\u0002C\u0007\u0006IQ\r_3dkRLwN\u001c\u0006\u0002\t\u0006)Qn\u001c8jq&\u0011ai\u0010\u0002\n\u0003R|W.[2J]R\f\u0001\u0002]8si~#S-\u001d\u000b\u0003\u00132\u0003\"!\u0007&\n\u0005-S\"\u0001B+oSRDq!\u0014\u0003\u0002\u0002\u0003\u0007Q(A\u0002yIE\nQ\u0001]8si\u0002\nAaY8qsR\u0019\u0011kV2\u0011\u0007I+\u0016*D\u0001T\u0015\t!6)\u0001\u0003fm\u0006d\u0017B\u0001,T\u0005\u0011!\u0016m]6\t\u000ba3\u0001\u0019A-\u0002\u0007\u0011L'\u000f\u0005\u0002[C6\t1L\u0003\u0002];\u0006!a-\u001b7f\u0015\tqv,A\u0002oS>T\u0011\u0001Y\u0001\u0005U\u00064\u0018-\u0003\u0002c7\n!\u0001+\u0019;i\u0011\u0015!g\u00011\u0001f\u0003!1\u0017\u000e\\3oC6,\u0007C\u00014n\u001d\t97\u000e\u0005\u0002i55\t\u0011N\u0003\u0002k-\u00051AH]8pizJ!\u0001\u001c\u000e\u0002\rA\u0013X\rZ3g\u0013\tqwN\u0001\u0004TiJLgn\u001a\u0006\u0003Yj!2!U9w\u0011\u0015\u0011x\u00011\u0001t\u0003\u0015\tX/\u001a:z!\tyB/\u0003\u0002vA\tA1+\u001e2Rk\u0016\u0014\u0018\u0010C\u0003x\u000f\u0001\u0007\u00010A\u0004ck&dG-\u001a:\u0011\u0005}I\u0018B\u0001>!\u0005\u001d\u0011U/\u001b7eKJ\f1\"\\1lKB\u0013xnY3tgR\u0019Q0a\u0001\u0011\u0007I+f\u0010\u0005\u00025\u007f&\u0019\u0011\u0011\u0001\n\u0003)AKH\u000f[8o%Vtg.\u001b8h!J|7-Z:t\u0011\u0015\u0011\b\u00021\u0001t\u00031\u0001(/\u001a9be\u00164\u0015\u000e\\3t)\u0019\tI!a\u0003\u0002\u0010A\u0019!+V-\t\r\u00055\u0011\u00021\u0001\u007f\u0003\u001d\u0001(o\\2fgNDQA]\u0005A\u0002M\f!cZ3oKJ\fG/Z+tKJ\u001c6M]5qiR1\u0011QCA\u0010\u0003C\u0001B!a\u0006\u0002\u001e5\u0011\u0011\u0011\u0004\u0006\u0004\u00037y\u0016\u0001\u00027b]\u001eL1A\\A\r\u0011\u0015A&\u00021\u0001Z\u0011\u0015\u0011(\u00021\u0001t\u00039\u0001(/\u001a9be\u0016<\u0015\r^3xCf$b!a\n\u00026\u0005]\u0002\u0003\u0002*V\u0003S\u0001B!a\u000b\u000225\u0011\u0011Q\u0006\u0006\u0003\u0003_\tA\u0001]=5U&!\u00111GA\u0017\u000559\u0015\r^3xCf\u001cVM\u001d<fe\"1\u0011QB\u0006A\u0002yDQA]\u0006A\u0002M\f1A];o)\u001d\t\u0016QHA \u0003\u0003Ba!!\u0004\r\u0001\u0004q\b\"\u0002:\r\u0001\u0004\u0019\b\"B<\r\u0001\u0004A\u0018A\u0004)zi\"|g.\u0012=fGV$xN\u001d\t\u0003i9\u0019\"A\u0004\r\u0015\u0005\u0005\u0015\u0013a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$\u0013'\u0006\u0002\u0002P)\u001a1'!\u0015,\u0005\u0005M\u0003\u0003BA+\u0003?j!!a\u0016\u000b\t\u0005e\u00131L\u0001\nk:\u001c\u0007.Z2lK\u0012T1!!\u0018\u001b\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003C\n9FA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\u0004")
public class PythonExecutor
implements Executor,
LazyLogging {
    private final PythonConfig config;
    private AtomicInt port;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    public static PythonConfig $lessinit$greater$default$1() {
        return PythonExecutor$.MODULE$.$lessinit$greater$default$1();
    }

    private Logger logger$lzycompute() {
        PythonExecutor pythonExecutor = this;
        synchronized (pythonExecutor) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$((LazyLogging)this);
                this.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? this.logger$lzycompute() : this.logger;
    }

    public AtomicInt port() {
        return this.port;
    }

    public void port_$eq(AtomicInt x$1) {
        this.port = x$1;
    }

    public Task<BoxedUnit> copy(Path dir, String filename) {
        return Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> Resources.toByteArray((URL)Resources.getResource((String)filename))).flatMap((Function1 & Serializable & scala.Serializable)bytes -> Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> Files.write(Paths.get(((Object)dir).toString(), filename), bytes, new OpenOption[0])).map((Function1 & Serializable & scala.Serializable)_ -> {
            PythonExecutor.$anonfun$copy$4(_);
            return BoxedUnit.UNIT;
        }));
    }

    public Task<BoxedUnit> execute(SubQuery query, Builder builder) {
        return this.makeProcess(query).bracket((Function1 & Serializable & scala.Serializable)process -> this.run((PythonRunningProcess)process, query, builder), (Function1 & Serializable & scala.Serializable)x$1 -> x$1.close());
    }

    public Task<PythonRunningProcess> makeProcess(SubQuery query) {
        PythonRunningProcess process = new PythonRunningProcess(query.id(), PythonRunningProcess$.MODULE$.apply$default$2(), PythonRunningProcess$.MODULE$.apply$default$3(), PythonRunningProcess$.MODULE$.apply$default$4(), PythonRunningProcess$.MODULE$.apply$default$5());
        return this.prepareFiles(process, query).flatMap((Function1 & Serializable & scala.Serializable)_2 -> this.prepareGateway(process, query).map((Function1 & Serializable & scala.Serializable)_ -> process));
    }

    public Task<Path> prepareFiles(PythonRunningProcess process, SubQuery query) {
        String user = query.user().email();
        Path dir = Paths.get(this.config.userScriptsDir(), user);
        Path bin = Paths.get(((Object)dir).toString(), "bin");
        byte[] script = this.generateUserScript(dir, query).getBytes("UTF-8");
        return Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> Files.notExists(dir, new LinkOption[0]) ? Files.createDirectories(dir, new FileAttribute[0]) : BoxedUnit.UNIT).flatMap((Function1 & Serializable & scala.Serializable)_2 -> Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> Files.notExists(bin, new LinkOption[0]) ? Files.createDirectories(bin, new FileAttribute[0]) : BoxedUnit.UNIT).flatMap((Function1 & Serializable & scala.Serializable)_ -> Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> Files.createTempFile(dir, "script-", ".py", new FileAttribute[0])).flatMap((Function1 & Serializable & scala.Serializable)file -> Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> Files.write(file, script, new OpenOption[0])).flatMap((Function1 & Serializable & scala.Serializable)_ -> this.copy(dir, "quix.py").flatMap((Function1 & Serializable & scala.Serializable)_2 -> this.copy(dir, "packages.py").flatMap((Function1 & Serializable & scala.Serializable)_ -> this.copy(bin, "activator.py").flatMap((Function1 & Serializable & scala.Serializable)_2 -> Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> process.file_$eq((Option<Path>)new Some(file))).map((Function1 & Serializable & scala.Serializable)_ -> file))))))));
    }

    private String generateUserScript(Path dir, SubQuery query) {
        String envSetup = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(133).append("\n         |from packages import Packages\n         |packages = Packages('").append(dir).append("', '").append(this.config.indexUrl()).append("', '").append(this.config.extraIndexUrl()).append("')\n         |packages.install(").append(((TraversableOnce)this.config.packages().map((Function1 & Serializable & scala.Serializable)lib -> new StringBuilder(11).append('\'').append((String)lib).append('\'').toString(), Seq$.MODULE$.canBuildFrom())).mkString(", ")).append(")\n         |\n         |").toString())).stripMargin();
        String quixSetup = new StringOps(Predef$.MODULE$.augmentString("\n         |try:\n         |  from py4j.java_gateway import JavaGateway\n         |except ImportError:\n         |  import sys\n         |  print(\"mandatory py4j package is missing, installing\", file = sys.stderr)\n         |  packages.install('py4j')\n         |\n         |from quix import Quix\n         |\n         |quix = Quix()\n         |\n         |")).stripMargin();
        return new StringBuilder(0).append(envSetup).append(quixSetup).append(this.config.additionalCode()).append(query.text()).toString();
    }

    public Task<GatewayServer> prepareGateway(PythonRunningProcess process, SubQuery query) {
        return Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> new PythonBridge(query.id())).flatMap((Function1 & Serializable & scala.Serializable)bridge -> Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> process.bridge_$eq((Option<PythonBridge>)new Some(bridge))).flatMap((Function1 & Serializable & scala.Serializable)_ -> Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> {
            AtomicInt qual$1 = this.port();
            int x$1 = qual$1.incrementAndGet$default$1();
            return new GatewayServer(bridge, qual$1.incrementAndGet(x$1));
        }).flatMap((Function1 & Serializable & scala.Serializable)gatewayServer -> Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> process.gatewayServer_$eq((Option<GatewayServer>)new Some(gatewayServer))).map((Function1 & Serializable & scala.Serializable)_ -> gatewayServer))));
    }

    public Task<BoxedUnit> run(PythonRunningProcess process, SubQuery query, Builder builder) {
        Observable pythonProcessMessages = Observable$.MODULE$.create((OverflowStrategy.Synchronous)OverflowStrategy.Unbounded$.MODULE$, Observable$.MODULE$.create$default$2(), (Function1 & Serializable & scala.Serializable)sub -> {
            Task task = Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> new NuProcessBuilder(new String[]{"python3", "-W", "ignore", process.file().getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException("No file to execute");
            }).toString(), Integer.toString(((GatewayServer)process.gatewayServer().getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException("No running gateway");
            })).getPort()), query.id(), query.user().email()})).flatMap((Function1 & Serializable & scala.Serializable)pb -> Task$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> new PythonProcessHandler(query.id(), (Subscriber<PythonMessage>)sub)).flatMap((Function1 & Serializable & scala.Serializable)handler -> Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> pb.setProcessListener((NuProcessHandler)handler)).flatMap((Function1 & Serializable & scala.Serializable)_ -> Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> process.gatewayServer().foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
                x$2.start();
                return BoxedUnit.UNIT;
            })).flatMap((Function1 & Serializable & scala.Serializable)_2 -> Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> process.process_$eq((Option<NuProcess>)new Some((Object)pb.start()))).map((Function1 & Serializable & scala.Serializable)_ -> {
                PythonExecutor.$anonfun$run$14(_);
                return BoxedUnit.UNIT;
            })))));
            return task.runToFuture(sub.scheduler());
        });
        Observable quixInteropMessages = Observable$.MODULE$.create((OverflowStrategy.Synchronous)OverflowStrategy.Unbounded$.MODULE$, Observable$.MODULE$.create$default$2(), (Function1 & Serializable & scala.Serializable)sub -> Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> process.bridge().foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
            x$3.register((Subscriber<PythonMessage>)sub);
            return BoxedUnit.UNIT;
        })).runToFuture(sub.scheduler()));
        Observable qual$1 = Observable$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Observable[]{quixInteropMessages, pythonProcessMessages}));
        Predef.$less$colon$less x$1 = Predef$.MODULE$.$conforms();
        OverflowStrategy x$2 = qual$1.merge$default$2();
        return qual$1.merge(x$1, x$2).takeWhileInclusive((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)PythonExecutor.$anonfun$run$18(query, x0$1))).mapEval((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Task task;
            PythonMessage pythonMessage = x0$2;
            if (pythonMessage instanceof ProcessStartFailure) {
                ProcessStartFailure processStartFailure = (ProcessStartFailure)pythonMessage;
                Throwable exception = processStartFailure.t();
                task = builder.error(query.id(), exception);
            } else if (pythonMessage instanceof ProcessStdout) {
                ProcessStdout processStdout = (ProcessStdout)pythonMessage;
                String jobId = processStdout.jobId();
                String line = processStdout.line();
                task = builder.log(jobId, line, "INFO");
            } else if (pythonMessage instanceof ProcessStderr) {
                ProcessStderr processStderr = (ProcessStderr)pythonMessage;
                String jobId = processStderr.jobId();
                String line = processStderr.line();
                task = builder.log(jobId, line, "ERROR");
            } else if (pythonMessage instanceof TabFields) {
                TabFields tabFields = (TabFields)pythonMessage;
                String tabId = tabFields.tabId();
                Seq<String> fields = tabFields.fields();
                Option x$3 = Option$.MODULE$.apply(fields.map((Function1)BatchColumn$.MODULE$, Seq$.MODULE$.canBuildFrom()));
                Seq x$4 = Batch$.MODULE$.apply$default$1();
                Option x$5 = Batch$.MODULE$.apply$default$3();
                Map x$6 = Batch$.MODULE$.apply$default$4();
                task = builder.startSubQuery(tabId, tabId).$times$greater(builder.addSubQuery(tabId, new Batch(x$4, x$3, x$5, x$6)));
            } else if (pythonMessage instanceof TabRow) {
                TabRow tabRow = (TabRow)pythonMessage;
                String tabId = tabRow.tabId();
                Seq<Object> row = tabRow.row();
                task = builder.addSubQuery(tabId, new Batch((Seq)new .colon.colon(row, (List)Nil$.MODULE$), Batch$.MODULE$.apply$default$2(), Batch$.MODULE$.apply$default$3(), Batch$.MODULE$.apply$default$4()));
            } else if (pythonMessage instanceof TabEnd) {
                TabEnd tabEnd = (TabEnd)pythonMessage;
                String tabId = tabEnd.tabId();
                task = builder.endSubQuery(tabId, Predef$.MODULE$.Map().empty());
            } else {
                task = Task$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                    BoxedUnit boxedUnit;
                    if (this.logger().underlying().isInfoEnabled()) {
                        this.logger().underlying().info("method=run event=unknown-event query-id={} user={} event={}", new Object[]{query.id(), query.user().email(), pythonMessage});
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                });
            }
            return task;
        }).lastL();
    }

    public static final /* synthetic */ void $anonfun$copy$4(Path _) {
    }

    public static final /* synthetic */ void $anonfun$run$14(BoxedUnit _) {
    }

    public static final /* synthetic */ boolean $anonfun$run$18(SubQuery query$4, PythonMessage x0$1) {
        PythonMessage pythonMessage = x0$1;
        boolean bl = pythonMessage instanceof ProcessEndSuccess ? false : !BoxesRunTime.unboxToBoolean((Object)query$4.canceled().get());
        return bl;
    }

    public PythonExecutor(PythonConfig config) {
        this.config = config;
        LazyLogging.$init$((LazyLogging)this);
        this.port = AtomicInt$.MODULE$.apply(25333);
    }
}

