/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.jobsubmission;

import java.io.IOException;
import java.nio.file.Paths;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.runners.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.sdk.expansion.service.ExpansionServer;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JobServerDriver
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(JobServerDriver.class);
    @VisibleForTesting
    public ServerConfiguration configuration;
    private final ServerFactory jobServerFactory;
    private final ServerFactory artifactServerFactory;
    private final JobInvokerFactory jobInvokerFactory;
    private volatile GrpcFnServer<InMemoryJobService> jobServer;
    private volatile GrpcFnServer<ArtifactStagingService> artifactStagingServer;
    private volatile ExpansionServer expansionServer;

    protected InMemoryJobService createJobService() throws IOException {
        this.artifactStagingServer = this.createArtifactStagingService();
        this.expansionServer = this.createExpansionService();
        JobInvoker invoker = this.jobInvokerFactory.create();
        return InMemoryJobService.create(this.artifactStagingServer, this::createSessionToken, (ThrowingConsumer<Exception, String>)((ThrowingConsumer)stagingSessionToken -> {
            if (this.configuration.cleanArtifactsPerJob) {
                ((ArtifactStagingService)this.artifactStagingServer.getService()).removeStagedArtifacts(stagingSessionToken);
            }
        }), invoker, this.configuration.getMaxInvocationHistory());
    }

    protected static ServerFactory createJobServerFactory(ServerConfiguration configuration) {
        return ServerFactory.createWithPortSupplier(() -> configuration.port);
    }

    protected static ServerFactory createArtifactServerFactory(ServerConfiguration configuration) {
        return ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
    }

    protected JobServerDriver(ServerConfiguration configuration, ServerFactory jobServerFactory, ServerFactory artifactServerFactory, JobInvokerFactory jobInvokerFactory) {
        this.configuration = configuration;
        this.jobServerFactory = jobServerFactory;
        this.artifactServerFactory = artifactServerFactory;
        this.jobInvokerFactory = jobInvokerFactory;
    }

    public String getJobServerUrl() {
        return this.jobServer != null ? this.jobServer.getApiServiceDescriptor().getUrl() : null;
    }

    public String start() throws IOException {
        this.jobServer = this.createJobServer();
        return this.jobServer.getApiServiceDescriptor().getUrl();
    }

    @Override
    public void run() {
        try {
            this.jobServer = this.createJobServer();
            LOG.info("Job server now running, terminate with Ctrl+C");
            this.jobServer.getServer().awaitTermination();
        }
        catch (InterruptedException e) {
            LOG.warn("Job server interrupted", (Throwable)e);
        }
        catch (Exception e) {
            LOG.warn("Exception during job server creation", (Throwable)e);
        }
        finally {
            this.stop();
        }
    }

    public synchronized void stop() {
        if (this.jobServer != null) {
            try {
                this.jobServer.close();
                LOG.info("JobServer stopped on {}", (Object)this.jobServer.getApiServiceDescriptor().getUrl());
                this.jobServer = null;
            }
            catch (Exception e) {
                LOG.error("Error while closing the jobServer.", (Throwable)e);
            }
        }
        if (this.artifactStagingServer != null) {
            try {
                this.artifactStagingServer.close();
                LOG.info("ArtifactStagingServer stopped on {}", (Object)this.artifactStagingServer.getApiServiceDescriptor().getUrl());
                this.artifactStagingServer = null;
            }
            catch (Exception e) {
                LOG.error("Error while closing the artifactStagingServer.", (Throwable)e);
            }
        }
        if (this.expansionServer != null) {
            try {
                this.expansionServer.close();
                LOG.info("Expansion stopped on {}:{}", (Object)this.expansionServer.getHost(), (Object)this.expansionServer.getPort());
                this.expansionServer = null;
            }
            catch (Exception e) {
                LOG.error("Error while closing the Expansion Service.", (Throwable)e);
            }
        }
    }

    protected String createSessionToken(String session) {
        return session;
    }

    private GrpcFnServer<InMemoryJobService> createJobServer() throws IOException {
        GrpcFnServer jobServiceGrpcFnServer;
        InMemoryJobService service = this.createJobService();
        if (this.configuration.port == 0) {
            jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)this.jobServerFactory);
        } else {
            Endpoints.ApiServiceDescriptor descriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.port).build();
            jobServiceGrpcFnServer = GrpcFnServer.create((FnService)service, (Endpoints.ApiServiceDescriptor)descriptor, (ServerFactory)this.jobServerFactory);
        }
        LOG.info("JobService started on {}", (Object)jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
        return jobServiceGrpcFnServer;
    }

    private GrpcFnServer<ArtifactStagingService> createArtifactStagingService() throws IOException {
        GrpcFnServer server;
        ArtifactStagingService service = new ArtifactStagingService(ArtifactStagingService.beamFilesystemArtifactDestinationProvider((String)this.configuration.artifactStagingPath));
        if (this.configuration.artifactPort == 0) {
            server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)this.artifactServerFactory);
        } else {
            Endpoints.ApiServiceDescriptor descriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.artifactPort).build();
            server = GrpcFnServer.create((FnService)service, (Endpoints.ApiServiceDescriptor)descriptor, (ServerFactory)this.artifactServerFactory);
        }
        LOG.info("ArtifactStagingService started on {}", (Object)server.getApiServiceDescriptor().getUrl());
        return server;
    }

    private ExpansionServer createExpansionService() throws IOException {
        ExpansionServer expansionServer = ExpansionServer.create((ExpansionService)new ExpansionService(), (String)this.configuration.host, (int)this.configuration.expansionPort);
        LOG.info("Java ExpansionService started on {}:{}", (Object)expansionServer.getHost(), (Object)expansionServer.getPort());
        return expansionServer;
    }

    public static class ServerConfiguration {
        @Option(name="--job-host", usage="The job server host name")
        private String host = "localhost";
        @Option(name="--job-port", usage="The job service port. 0 to use a dynamic port. (Default: 8099)")
        private int port = 8099;
        @Option(name="--artifact-port", usage="The artifact service port. 0 to use a dynamic port. (Default: 8098)")
        private int artifactPort = 8098;
        @Option(name="--expansion-port", usage="The Java expansion service port. 0 to use a dynamic port. (Default: 8097)")
        private int expansionPort = 8097;
        @Option(name="--artifacts-dir", usage="The location to store staged artifact files")
        private String artifactStagingPath = Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
        @Option(name="--clean-artifacts-per-job", usage="When true, remove each job's staged artifacts when it completes", handler=ExplicitBooleanOptionHandler.class)
        private boolean cleanArtifactsPerJob = true;
        @Option(name="--history-size", usage="The maximum number of completed jobs to keep.")
        private int maxInvocationHistory = 10;

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public int getArtifactPort() {
            return this.artifactPort;
        }

        public int getExpansionPort() {
            return this.expansionPort;
        }

        public String getArtifactStagingPath() {
            return this.artifactStagingPath;
        }

        public boolean isCleanArtifactsPerJob() {
            return this.cleanArtifactsPerJob;
        }

        public int getMaxInvocationHistory() {
            return this.maxInvocationHistory;
        }
    }

    public static interface JobInvokerFactory {
        public JobInvoker create();
    }
}

