/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway;

import io.camunda.zeebe.gateway.EndpointManager;
import io.camunda.zeebe.gateway.GatewayGrpcService;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.health.GatewayHealthManager;
import io.camunda.zeebe.gateway.health.Status;
import io.camunda.zeebe.gateway.health.impl.GatewayHealthManagerImpl;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.configuration.AuthenticationCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.gateway.impl.configuration.NetworkCfg;
import io.camunda.zeebe.gateway.impl.configuration.SecurityCfg;
import io.camunda.zeebe.gateway.impl.configuration.ThreadsCfg;
import io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.LongPollingActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.RoundRobinActivateJobsHandler;
import io.camunda.zeebe.gateway.interceptors.impl.ContextInjectingInterceptor;
import io.camunda.zeebe.gateway.interceptors.impl.DecoratedInterceptor;
import io.camunda.zeebe.gateway.interceptors.impl.IdentityInterceptor;
import io.camunda.zeebe.gateway.interceptors.impl.InterceptorRepository;
import io.camunda.zeebe.gateway.query.impl.QueryApiImpl;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.util.CloseableSilently;
import io.camunda.zeebe.util.error.FatalErrorHandler;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.NettyServerBuilder;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import me.dinowernli.grpc.prometheus.Configuration;
import me.dinowernli.grpc.prometheus.MonitoringServerInterceptor;
import org.slf4j.Logger;

public final class Gateway
implements CloseableSilently {
    private static final Logger LOG = Loggers.GATEWAY_LOGGER;
    private static final MonitoringServerInterceptor MONITORING_SERVER_INTERCEPTOR = MonitoringServerInterceptor.create((Configuration)Configuration.allMetrics());
    private final GatewayCfg gatewayCfg;
    private final ActorSchedulingService actorSchedulingService;
    private final GatewayHealthManager healthManager;
    private final ClientStreamer<JobActivationProperties> jobStreamer;
    private Server server;
    private ExecutorService grpcExecutor;
    private final BrokerClient brokerClient;

    public Gateway(GatewayCfg gatewayCfg, BrokerClient brokerClient, ActorSchedulingService actorSchedulingService, ClientStreamer<JobActivationProperties> jobStreamer) {
        this.gatewayCfg = gatewayCfg;
        this.brokerClient = brokerClient;
        this.actorSchedulingService = actorSchedulingService;
        this.jobStreamer = jobStreamer;
        this.healthManager = new GatewayHealthManagerImpl();
    }

    public GatewayCfg getGatewayCfg() {
        return this.gatewayCfg;
    }

    public Status getStatus() {
        return this.healthManager.getStatus();
    }

    public BrokerClient getBrokerClient() {
        return this.brokerClient;
    }

    public ActorFuture<Gateway> start() {
        CompletableActorFuture resultFuture = new CompletableActorFuture();
        this.healthManager.setStatus(Status.STARTING);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.createAndStartActivateJobsHandler(this.brokerClient).thenApply(this::createServer)).thenAccept(this::startServer)).thenApply(ok -> this)).whenComplete((BiConsumer)resultFuture);
        return resultFuture;
    }

    private void startServer(Server server) {
        this.server = server;
        try {
            this.server.start();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        this.healthManager.setStatus(Status.RUNNING);
    }

    private Server createServer(ActivateJobsHandler activateJobsHandler) {
        NettyServerBuilder serverBuilder = this.applyNetworkConfig(this.gatewayCfg.getNetwork());
        this.applyExecutorConfiguration(serverBuilder);
        this.applySecurityConfiguration((ServerBuilder<?>)serverBuilder);
        EndpointManager endpointManager = new EndpointManager(this.brokerClient, activateJobsHandler, this.jobStreamer, this.grpcExecutor);
        GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
        return this.buildServer((ServerBuilder<?>)serverBuilder, (BindableService)gatewayGrpcService);
    }

    private void applyExecutorConfiguration(NettyServerBuilder builder) {
        ThreadsCfg config = this.gatewayCfg.getThreads();
        this.grpcExecutor = new ForkJoinPool(config.getGrpcMinThreads(), new NamedForkJoinPoolThreadFactory(), FatalErrorHandler.uncaughtExceptionHandler((Logger)LOG), true, 0, config.getGrpcMaxThreads(), 1, pool -> false, 1L, TimeUnit.MINUTES);
        builder.executor((Executor)this.grpcExecutor);
    }

    private void applySecurityConfiguration(ServerBuilder<?> serverBuilder) {
        SecurityCfg securityCfg = this.gatewayCfg.getSecurity();
        if (securityCfg.isEnabled()) {
            this.setSecurityConfig(serverBuilder, securityCfg);
        }
    }

    private Server buildServer(ServerBuilder<?> serverBuilder, BindableService interceptorService) {
        return serverBuilder.addService(this.applyInterceptors(interceptorService)).addService(ServerInterceptors.intercept((BindableService)this.healthManager.getHealthService(), (ServerInterceptor[])new ServerInterceptor[]{MONITORING_SERVER_INTERCEPTOR})).build();
    }

    private NettyServerBuilder applyNetworkConfig(NetworkCfg cfg) {
        Duration minKeepAliveInterval = cfg.getMinKeepAliveInterval();
        if (minKeepAliveInterval.isNegative() || minKeepAliveInterval.isZero()) {
            throw new IllegalArgumentException("Minimum keep alive interval must be positive.");
        }
        int maxMessageSize = (int)cfg.getMaxMessageSize().toBytes();
        if (maxMessageSize <= 0) {
            throw new IllegalArgumentException("maxMessageSize must be positive");
        }
        return NettyServerBuilder.forAddress((SocketAddress)new InetSocketAddress(cfg.getHost(), cfg.getPort())).maxInboundMessageSize(maxMessageSize).permitKeepAliveTime(minKeepAliveInterval.toMillis(), TimeUnit.MILLISECONDS).permitKeepAliveWithoutCalls(false);
    }

    private void setSecurityConfig(ServerBuilder<?> serverBuilder, SecurityCfg security) {
        File certificateChainPath = security.getCertificateChainPath();
        File privateKeyPath = security.getPrivateKeyPath();
        if (certificateChainPath == null) {
            throw new IllegalArgumentException("Expected to find a valid path to a certificate chain but none was found. Edit the gateway configuration file to provide one or to disable TLS.");
        }
        if (privateKeyPath == null) {
            throw new IllegalArgumentException("Expected to find a valid path to a private key but none was found. Edit the gateway configuration file to provide one or to disable TLS.");
        }
        if (!certificateChainPath.exists()) {
            throw new IllegalArgumentException(String.format("Expected to find a certificate chain file at the provided location '%s' but none was found.", certificateChainPath));
        }
        if (!privateKeyPath.exists()) {
            throw new IllegalArgumentException(String.format("Expected to find a private key file at the provided location '%s' but none was found.", privateKeyPath));
        }
        serverBuilder.useTransportSecurity(certificateChainPath, privateKeyPath);
    }

    public void close() {
        this.healthManager.setStatus(Status.SHUTDOWN);
        if (this.server != null && !this.server.isShutdown()) {
            this.server.shutdownNow();
            try {
                this.server.awaitTermination();
            }
            catch (InterruptedException e) {
                LOG.warn("Failed to await termination of gRPC server", (Throwable)e);
                Thread.currentThread().interrupt();
            }
            finally {
                this.server = null;
            }
        }
        if (this.grpcExecutor != null) {
            this.grpcExecutor.shutdownNow();
            try {
                this.grpcExecutor.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            finally {
                this.grpcExecutor = null;
            }
        }
    }

    private CompletableFuture<ActivateJobsHandler> createAndStartActivateJobsHandler(BrokerClient brokerClient) {
        ActivateJobsHandler handler = this.buildActivateJobsHandler(brokerClient);
        return this.submitActorToActivateJobs(handler);
    }

    private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(ActivateJobsHandler handler) {
        CompletableFuture<ActivateJobsHandler> future = new CompletableFuture<ActivateJobsHandler>();
        Actor actor = Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();
        this.actorSchedulingService.submitActor(actor);
        return future;
    }

    private ActivateJobsHandler buildActivateJobsHandler(BrokerClient brokerClient) {
        if (this.gatewayCfg.getLongPolling().isEnabled()) {
            return this.buildLongPollingHandler(brokerClient);
        }
        return new RoundRobinActivateJobsHandler(brokerClient);
    }

    private LongPollingActivateJobsHandler buildLongPollingHandler(BrokerClient brokerClient) {
        return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
    }

    private ServerServiceDefinition applyInterceptors(BindableService service) {
        InterceptorRepository repository = new InterceptorRepository().load(this.gatewayCfg.getInterceptors());
        QueryApiImpl queryApi = new QueryApiImpl(this.brokerClient);
        List interceptors = repository.instantiate().map(DecoratedInterceptor::decorate).collect(Collectors.toList());
        Collections.reverse(interceptors);
        interceptors.add(new ContextInjectingInterceptor(queryApi));
        interceptors.add(MONITORING_SERVER_INTERCEPTOR);
        if (AuthenticationCfg.AuthMode.IDENTITY == this.gatewayCfg.getSecurity().getAuthentication().getMode()) {
            interceptors.add(new IdentityInterceptor(this.gatewayCfg.getSecurity().getAuthentication().getIdentity()));
        }
        return ServerInterceptors.intercept((BindableService)service, interceptors);
    }

    private static final class NamedForkJoinPoolThreadFactory
    implements ForkJoinPool.ForkJoinWorkerThreadFactory {
        private NamedForkJoinPoolThreadFactory() {
        }

        @Override
        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("grpc-executor-" + worker.getPoolIndex());
            return worker;
        }
    }
}

