/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.StreamObserverFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InProcessEnvironmentFactory
implements EnvironmentFactory {
    private static final Logger LOG = LoggerFactory.getLogger(InProcessEnvironmentFactory.class);
    private final PipelineOptions options;
    private final GrpcFnServer<GrpcLoggingService> loggingServer;
    private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private final ControlClientPool.Source clientSource;

    public static EnvironmentFactory create(PipelineOptions options, GrpcFnServer<GrpcLoggingService> loggingServer, GrpcFnServer<FnApiControlClientPoolService> controlServer, ControlClientPool.Source clientSource) {
        return new InProcessEnvironmentFactory(options, loggingServer, controlServer, clientSource);
    }

    private InProcessEnvironmentFactory(PipelineOptions options, GrpcFnServer<GrpcLoggingService> loggingServer, GrpcFnServer<FnApiControlClientPoolService> controlServer, ControlClientPool.Source clientSource) {
        this.options = options;
        this.loggingServer = loggingServer;
        this.controlServer = controlServer;
        Preconditions.checkArgument(loggingServer.getApiServiceDescriptor() != null, "Logging Server cannot have a null %s", (Object)Endpoints.ApiServiceDescriptor.class.getSimpleName());
        Preconditions.checkArgument(controlServer.getApiServiceDescriptor() != null, "Control Server cannot have a null %s", (Object)Endpoints.ApiServiceDescriptor.class.getSimpleName());
        this.clientSource = clientSource;
    }

    @Override
    public RemoteEnvironment createEnvironment(RunnerApi.Environment container) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<?> fnHarness = executor.submit(() -> {
            try {
                FnHarness.main((PipelineOptions)this.options, (Endpoints.ApiServiceDescriptor)this.loggingServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.controlServer.getApiServiceDescriptor(), (ManagedChannelFactory)InProcessManagedChannelFactory.create(), (StreamObserverFactory)StreamObserverFactory.direct());
            }
            catch (NoClassDefFoundError e) {
                LOG.error("{} while executing an in-process FnHarness. To use the {}, the 'org.apache.beam:beam-sdks-java-harness' artifact and its dependencies must be on the classpath", new Object[]{NoClassDefFoundError.class.getSimpleName(), InProcessEnvironmentFactory.class.getSimpleName(), e});
                throw e;
            }
        });
        executor.submit(() -> {
            try {
                fnHarness.get();
            }
            catch (Throwable t) {
                executor.shutdownNow();
            }
        });
        InstructionRequestHandler handler = this.clientSource.take("", Duration.ofMinutes(1L));
        return RemoteEnvironment.forHandler(container, handler);
    }
}

