/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.environment;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalEnvironmentFactory
implements EnvironmentFactory {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ExternalEnvironmentFactory.class);
    private static final @UnknownKeyFor @NonNull @Initialized boolean IS_WORKER_POOL_IN_DOCKER_VM = System.getenv().containsKey("BEAM_WORKER_POOL_IN_DOCKER_VM");
    private final @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized FnApiControlClientPoolService> controlServiceServer;
    private final @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized GrpcLoggingService> loggingServiceServer;
    private final @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized ArtifactRetrievalService> retrievalServiceServer;
    private final @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized StaticGrpcProvisionService> provisioningServiceServer;
    private final @UnknownKeyFor @NonNull @Initialized ControlClientPool.Source clientSource;

    public static @UnknownKeyFor @NonNull @Initialized ExternalEnvironmentFactory create(@UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized FnApiControlClientPoolService> controlServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized GrpcLoggingService> loggingServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized ArtifactRetrievalService> retrievalServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized StaticGrpcProvisionService> provisioningServiceServer, @UnknownKeyFor @NonNull @Initialized ControlClientPool.Source clientSource, @UnknownKeyFor @NonNull @Initialized IdGenerator idGenerator) {
        return new ExternalEnvironmentFactory(controlServiceServer, loggingServiceServer, retrievalServiceServer, provisioningServiceServer, clientSource);
    }

    private ExternalEnvironmentFactory(@UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized FnApiControlClientPoolService> controlServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized GrpcLoggingService> loggingServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized ArtifactRetrievalService> retrievalServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized StaticGrpcProvisionService> provisioningServiceServer, @UnknownKeyFor @NonNull @Initialized ControlClientPool.Source clientSource) {
        this.controlServiceServer = controlServiceServer;
        this.loggingServiceServer = loggingServiceServer;
        this.retrievalServiceServer = retrievalServiceServer;
        this.provisioningServiceServer = provisioningServiceServer;
        this.clientSource = clientSource;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized RemoteEnvironment createEnvironment(final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.Environment environment, final @UnknownKeyFor @NonNull @Initialized String workerId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Preconditions.checkState((boolean)environment.getUrn().equals(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardEnvironments.Environments.EXTERNAL)), (Object)"The passed environment does not contain an ExternalPayload.");
        RunnerApi.ExternalPayload externalPayload = RunnerApi.ExternalPayload.parseFrom((ByteString)environment.getPayload());
        BeamFnApi.StartWorkerRequest startWorkerRequest = BeamFnApi.StartWorkerRequest.newBuilder().setWorkerId(workerId).setControlEndpoint(this.controlServiceServer.getApiServiceDescriptor()).setLoggingEndpoint(this.loggingServiceServer.getApiServiceDescriptor()).setArtifactEndpoint(this.retrievalServiceServer.getApiServiceDescriptor()).setProvisionEndpoint(this.provisioningServiceServer.getApiServiceDescriptor()).putAllParams(externalPayload.getParamsMap()).build();
        LOG.debug("Requesting worker ID {}", (Object)workerId);
        final ManagedChannel managedChannel = ManagedChannelFactory.createDefault().forDescriptor(externalPayload.getEndpoint());
        BeamFnApi.StartWorkerResponse startWorkerResponse = BeamFnExternalWorkerPoolGrpc.newBlockingStub((Channel)managedChannel).startWorker(startWorkerRequest);
        if (!startWorkerResponse.getError().isEmpty()) {
            throw new RuntimeException(startWorkerResponse.getError());
        }
        InstructionRequestHandler instructionHandler = null;
        while (instructionHandler == null) {
            try {
                instructionHandler = this.clientSource.take(workerId, Duration.ofMinutes(2L));
            }
            catch (TimeoutException timeoutEx) {
                LOG.info("Still waiting for startup of environment from {} for worker id {}", (Object)externalPayload.getEndpoint().getUrl(), (Object)workerId);
            }
            catch (InterruptedException interruptEx) {
                Thread.currentThread().interrupt();
                managedChannel.shutdownNow();
                throw new RuntimeException(interruptEx);
            }
        }
        final InstructionRequestHandler finalInstructionHandler = instructionHandler;
        return new RemoteEnvironment(){

            @Override
            public // Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized RunnerApi.Environment getEnvironment() {
                return environment;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized InstructionRequestHandler getInstructionRequestHandler() {
                return finalInstructionHandler;
            }

            @Override
            public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
                try {
                    finalInstructionHandler.close();
                    BeamFnApi.StopWorkerRequest stopWorkerRequest = BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
                    LOG.debug("Closing worker ID {}", (Object)workerId);
                    BeamFnApi.StopWorkerResponse stopWorkerResponse = BeamFnExternalWorkerPoolGrpc.newBlockingStub((Channel)managedChannel).stopWorker(stopWorkerRequest);
                    if (!stopWorkerResponse.getError().isEmpty()) {
                        throw new RuntimeException(stopWorkerResponse.getError());
                    }
                }
                finally {
                    managedChannel.shutdown();
                    managedChannel.awaitTermination(10L, TimeUnit.SECONDS);
                    if (!managedChannel.isTerminated()) {
                        managedChannel.shutdownNow();
                    }
                }
            }
        };
    }

    public static class Provider
    implements EnvironmentFactory.Provider {
        @Override
        public @UnknownKeyFor @NonNull @Initialized EnvironmentFactory createEnvironmentFactory(@UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized FnApiControlClientPoolService> controlServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized GrpcLoggingService> loggingServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized ArtifactRetrievalService> retrievalServiceServer, @UnknownKeyFor @NonNull @Initialized GrpcFnServer<@UnknownKeyFor @NonNull @Initialized StaticGrpcProvisionService> provisioningServiceServer, @UnknownKeyFor @NonNull @Initialized ControlClientPool clientPool, @UnknownKeyFor @NonNull @Initialized IdGenerator idGenerator) {
            return ExternalEnvironmentFactory.create(controlServiceServer, loggingServiceServer, retrievalServiceServer, provisioningServiceServer, clientPool.getSource(), idGenerator);
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized ServerFactory getServerFactory() {
            if (IS_WORKER_POOL_IN_DOCKER_VM) {
                return DockerEnvironmentFactory.DockerOnMac.getServerFactory();
            }
            return ServerFactory.createDefault();
        }
    }
}

