/*
 * Decompiled with CFR 0.152.
 */
package org.apache.submarine.server.rpc;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.submarine.client.cli.remote.RpcContext;
import org.apache.submarine.commons.rpc.ApplicationIdProto;
import org.apache.submarine.commons.rpc.ParameterProto;
import org.apache.submarine.commons.rpc.ParametersHolderProto;
import org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc;
import org.apache.submarine.commons.runtime.ClientContext;
import org.apache.submarine.commons.runtime.JobSubmitter;
import org.apache.submarine.commons.runtime.RuntimeFactory;
import org.apache.submarine.commons.runtime.exception.SubmarineException;
import org.apache.submarine.commons.runtime.param.Parameter;
import org.apache.submarine.commons.utils.SubmarineConfVars;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.server.rpc.SubmarineRpcServerProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubmarineRpcServer {
    private static final Logger LOG = LoggerFactory.getLogger((String)SubmarineRpcServer.class.getName());
    protected int port;
    protected Server server;

    public SubmarineRpcServer(int port) throws IOException {
        this(ServerBuilder.forPort((int)port), port);
    }

    public SubmarineRpcServer(ServerBuilder<?> serverBuilder, int port) {
        this(serverBuilder, port, new SubmarineServerRpcService());
    }

    public SubmarineRpcServer(int port, SubmarineServerProtocolGrpc.SubmarineServerProtocolImplBase service) {
        this(ServerBuilder.forPort((int)port), port, service);
    }

    public SubmarineRpcServer(ServerBuilder<?> serverBuilder, int port, SubmarineServerProtocolGrpc.SubmarineServerProtocolImplBase service) {
        this.port = port;
        this.server = serverBuilder.addService((BindableService)service).build();
    }

    public void start() throws IOException {
        this.server.start();
        LOG.info("Server started, listening on " + this.port);
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                LOG.info("*** shutting down gRPC server since JVM is shutting down");
                SubmarineRpcServer.this.stop();
            }
        });
    }

    public void stop() {
        if (this.server != null) {
            this.server.shutdown();
            LOG.info("*** server shut down");
        }
    }

    public void blockUntilShutdown() throws InterruptedException {
        if (this.server != null) {
            this.server.awaitTermination();
        }
    }

    private static ClientContext getClientContext(RpcContext rpcContext) {
        YarnConfiguration conf = new YarnConfiguration();
        ClientContext clientContext = new ClientContext();
        clientContext.setYarnConfig((Configuration)conf);
        SubmarineRpcServer.mergeSubmarineConfiguration(clientContext.getSubmarineConfig(), rpcContext);
        String runtimeClass = clientContext.getSubmarineConfig().getString(SubmarineConfVars.ConfVars.SUBMARINE_RUNTIME_CLASS);
        URLClassLoader classLoader = null;
        classLoader = runtimeClass.contains("YarnServiceRuntimeFactory") ? new URLClassLoader(SubmarineRpcServer.constructUrlsFromClasspath("../lib/submitter/yarnservice")) : new URLClassLoader(SubmarineRpcServer.constructUrlsFromClasspath("../lib/submitter/yarn"));
        RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory((ClientContext)clientContext, (ClassLoader)classLoader);
        clientContext.setRuntimeFactory(runtimeFactory);
        return clientContext;
    }

    private static URL[] constructUrlsFromClasspath(String classpath) {
        ArrayList<URL> urls = new ArrayList<URL>();
        for (String path : classpath.split(File.pathSeparator)) {
            if (path.endsWith("/*")) {
                path = path.substring(0, path.length() - 2);
            }
            File file = new File(path);
            try {
                if (file.isDirectory()) {
                    File[] items = file.listFiles();
                    if (items == null) continue;
                    for (File item : items) {
                        urls.add(item.toURI().toURL());
                    }
                    continue;
                }
                urls.add(file.toURI().toURL());
            }
            catch (MalformedURLException e) {
                LOG.error(e.getMessage(), (Throwable)e);
            }
        }
        return urls.toArray(new URL[0]);
    }

    private static void mergeSubmarineConfiguration(SubmarineConfiguration submarineConfiguration, RpcContext rpcContext) {
        Map submarineJobConfigMap = rpcContext.getSubmarineJobConfigMap();
        for (Map.Entry entry : submarineJobConfigMap.entrySet()) {
            submarineConfiguration.updateConfiguration((String)entry.getKey(), (String)entry.getValue());
        }
    }

    public static void main(String[] args) throws Exception {
        SubmarineRpcServer server = SubmarineRpcServer.startRpcServer();
        server.blockUntilShutdown();
    }

    public static SubmarineRpcServer startRpcServer() throws IOException {
        SubmarineConfiguration submarineConfiguration = SubmarineConfiguration.getInstance();
        int rpcServerPort = submarineConfiguration.getInt(SubmarineConfVars.ConfVars.SUBMARINE_SERVER_RPC_PORT);
        SubmarineRpcServer server = new SubmarineRpcServer(rpcServerPort);
        server.start();
        return server;
    }

    protected static class SubmarineServerRpcService
    extends SubmarineServerProtocolGrpc.SubmarineServerProtocolImplBase {
        protected SubmarineServerRpcService() {
        }

        public void submitJob(ParameterProto request, StreamObserver<ApplicationIdProto> responseObserver) {
            LOG.info("Start to submit a job.");
            RpcContext rpcContext = SubmarineRpcServerProto.convertParameterProtoToRpcContext(request);
            Parameter parameter = SubmarineRpcServerProto.convertParameterProtoToParameter(request);
            ClientContext clientContext = SubmarineRpcServer.getClientContext(rpcContext);
            ApplicationId applicationId = null;
            try {
                applicationId = this.run(clientContext, parameter);
            }
            catch (IOException | YarnException | SubmarineException e) {
                LOG.error(e.getMessage(), e);
            }
            responseObserver.onNext((Object)SubmarineRpcServerProto.convertApplicationIdToApplicationIdProto(applicationId));
            responseObserver.onCompleted();
        }

        public void testRpc(ParametersHolderProto request, StreamObserver<ApplicationIdProto> responseObserver) {
            responseObserver.onNext((Object)this.checkFeature(request));
            responseObserver.onCompleted();
        }

        private ApplicationIdProto checkFeature(ParametersHolderProto request) {
            LOG.debug(request.toString());
            return ApplicationIdProto.newBuilder().setApplicationId("application_1_1").build();
        }

        protected ApplicationId run(ClientContext clientContext, Parameter parameter) throws IOException, YarnException, SubmarineException {
            JobSubmitter jobSubmitter = clientContext.getRuntimeFactory().getJobSubmitterInstance();
            ApplicationId applicationId = jobSubmitter.submitJob(parameter);
            return applicationId;
        }
    }
}

