/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_DefaultJobBundleFactory_ServerInfo;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.function.ThrowingFunction;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class DefaultJobBundleFactory
implements JobBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);
    private final LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> environmentCache;
    private final Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap;
    private final ExecutorService executor;
    private final MapControlClientPool clientPool;
    private final IdGenerator stageIdGenerator;

    public static DefaultJobBundleFactory create(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) {
        return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);
    }

    DefaultJobBundleFactory(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) {
        IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
        this.environmentFactoryProviderMap = environmentFactoryMap;
        this.executor = Executors.newCachedThreadPool();
        this.clientPool = MapControlClientPool.create();
        this.stageIdGenerator = stageIdGenerator;
        this.environmentCache = this.createEnvironmentCache(serverFactory -> this.createServerInfo(jobInfo, (ServerFactory)serverFactory));
    }

    @VisibleForTesting
    DefaultJobBundleFactory(Map<String, EnvironmentFactory.Provider> environmentFactoryMap, IdGenerator stageIdGenerator, GrpcFnServer<FnApiControlClientPoolService> controlServer, GrpcFnServer<GrpcLoggingService> loggingServer, GrpcFnServer<ArtifactRetrievalService> retrievalServer, GrpcFnServer<StaticGrpcProvisionService> provisioningServer, GrpcFnServer<GrpcDataService> dataServer, GrpcFnServer<GrpcStateService> stateServer) {
        this.environmentFactoryProviderMap = environmentFactoryMap;
        this.executor = Executors.newCachedThreadPool();
        this.clientPool = MapControlClientPool.create();
        this.stageIdGenerator = stageIdGenerator;
        ServerInfo serverInfo = new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder().setControlServer(controlServer).setLoggingServer(loggingServer).setRetrievalServer(retrievalServer).setProvisioningServer(provisioningServer).setDataServer(dataServer).setStateServer(stateServer).build();
        this.environmentCache = this.createEnvironmentCache(serverFactory -> serverInfo);
    }

    private LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> createEnvironmentCache(final ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator) {
        return CacheBuilder.newBuilder().removalListener(notification -> {
            LOG.debug("Cleaning up for environment {}", (Object)((RunnerApi.Environment)notification.getKey()).getUrn());
            try {
                ((WrappedSdkHarnessClient)notification.getValue()).close();
            }
            catch (Exception e) {
                LOG.warn(String.format("Error cleaning up environment %s", notification.getKey()), (Throwable)e);
            }
        }).build((CacheLoader)new CacheLoader<RunnerApi.Environment, WrappedSdkHarnessClient>(){

            public WrappedSdkHarnessClient load(RunnerApi.Environment environment) throws Exception {
                EnvironmentFactory.Provider environmentFactoryProvider = (EnvironmentFactory.Provider)DefaultJobBundleFactory.this.environmentFactoryProviderMap.get(environment.getUrn());
                ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
                ServerInfo serverInfo = (ServerInfo)serverInfoCreator.apply(serverFactory);
                EnvironmentFactory environmentFactory = environmentFactoryProvider.createEnvironmentFactory(serverInfo.getControlServer(), serverInfo.getLoggingServer(), serverInfo.getRetrievalServer(), serverInfo.getProvisioningServer(), DefaultJobBundleFactory.this.clientPool, DefaultJobBundleFactory.this.stageIdGenerator);
                return WrappedSdkHarnessClient.wrapping(environmentFactory.createEnvironment(environment), serverInfo);
            }
        });
    }

    @Override
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        WrappedSdkHarnessClient wrappedClient = (WrappedSdkHarnessClient)this.environmentCache.getUnchecked((Object)executableStage.getEnvironment());
        try {
            processBundleDescriptor = ProcessBundleDescriptors.fromExecutableStage(this.stageIdGenerator.getId(), executableStage, wrappedClient.getServerInfo().getDataServer().getApiServiceDescriptor(), wrappedClient.getServerInfo().getStateServer().getApiServiceDescriptor());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return SimpleStageBundleFactory.create(wrappedClient, processBundleDescriptor, wrappedClient.getServerInfo().getStateServer());
    }

    @Override
    public void close() throws Exception {
        this.environmentCache.invalidateAll();
        this.environmentCache.cleanUp();
        this.executor.shutdown();
    }

    private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory) throws IOException {
        Preconditions.checkNotNull((Object)serverFactory, (Object)"serverFactory can not be null");
        GrpcFnServer<FnApiControlClientPoolService> controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(this.clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        GrpcFnServer<GrpcLoggingService> loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
        GrpcFnServer<ArtifactRetrievalService> retrievalServer = GrpcFnServer.allocatePortAndCreateFor(BeamFileSystemArtifactRetrievalService.create(), serverFactory);
        GrpcFnServer<StaticGrpcProvisionService> provisioningServer = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
        GrpcFnServer<GrpcDataService> dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(this.executor, OutboundObserverFactory.serverDirect()), serverFactory);
        GrpcFnServer<GrpcStateService> stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
        ServerInfo serverInfo = new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder().setControlServer(controlServer).setLoggingServer(loggingServer).setRetrievalServer(retrievalServer).setProvisioningServer(provisioningServer).setDataServer(dataServer).setStateServer(stateServer).build();
        return serverInfo;
    }

    @AutoValue
    public static abstract class ServerInfo {
        abstract GrpcFnServer<FnApiControlClientPoolService> getControlServer();

        abstract GrpcFnServer<GrpcLoggingService> getLoggingServer();

        abstract GrpcFnServer<ArtifactRetrievalService> getRetrievalServer();

        abstract GrpcFnServer<StaticGrpcProvisionService> getProvisioningServer();

        abstract GrpcFnServer<GrpcDataService> getDataServer();

        abstract GrpcFnServer<GrpcStateService> getStateServer();

        abstract Builder toBuilder();

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setControlServer(GrpcFnServer<FnApiControlClientPoolService> var1);

            abstract Builder setLoggingServer(GrpcFnServer<GrpcLoggingService> var1);

            abstract Builder setRetrievalServer(GrpcFnServer<ArtifactRetrievalService> var1);

            abstract Builder setProvisioningServer(GrpcFnServer<StaticGrpcProvisionService> var1);

            abstract Builder setDataServer(GrpcFnServer<GrpcDataService> var1);

            abstract Builder setStateServer(GrpcFnServer<GrpcStateService> var1);

            abstract ServerInfo build();
        }
    }

    protected static class WrappedSdkHarnessClient
    implements AutoCloseable {
        private final RemoteEnvironment environment;
        private final SdkHarnessClient client;
        private final ServerInfo serverInfo;

        static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) {
            SdkHarnessClient client = SdkHarnessClient.usingFnApiClient(environment.getInstructionRequestHandler(), serverInfo.getDataServer().getService());
            return new WrappedSdkHarnessClient(environment, client, serverInfo);
        }

        private WrappedSdkHarnessClient(RemoteEnvironment environment, SdkHarnessClient client, ServerInfo serverInfo) {
            this.environment = environment;
            this.client = client;
            this.serverInfo = serverInfo;
        }

        SdkHarnessClient getClient() {
            return this.client;
        }

        ServerInfo getServerInfo() {
            return this.serverInfo;
        }

        @Override
        public void close() throws Exception {
            RemoteEnvironment envCloser = this.environment;
            Throwable throwable = null;
            if (envCloser != null) {
                WrappedSdkHarnessClient.$closeResource(throwable, envCloser);
            }
            throwable = null;
            try (GrpcFnServer<GrpcStateService> stateServer = this.serverInfo.getStateServer();
                 GrpcFnServer<GrpcDataService> dateServer = this.serverInfo.getDataServer();
                 GrpcFnServer<FnApiControlClientPoolService> controlServer = this.serverInfo.getControlServer();
                 GrpcFnServer<GrpcLoggingService> loggingServer = this.serverInfo.getLoggingServer();
                 GrpcFnServer<ArtifactRetrievalService> retrievalServer = this.serverInfo.getRetrievalServer();){
                GrpcFnServer<StaticGrpcProvisionService> provisioningServer = this.serverInfo.getProvisioningServer();
                Throwable throwable2 = null;
                if (provisioningServer != null) {
                    WrappedSdkHarnessClient.$closeResource(throwable2, provisioningServer);
                }
            }
            catch (Throwable throwable3) {
                throwable = throwable3;
                throw throwable3;
            }
        }
    }

    protected static class SimpleStageBundleFactory
    implements StageBundleFactory {
        private final SdkHarnessClient.BundleProcessor processor;
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        @SuppressFBWarnings
        private WrappedSdkHarnessClient wrappedClient;

        static SimpleStageBundleFactory create(WrappedSdkHarnessClient wrappedClient, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, GrpcFnServer<GrpcStateService> stateServer) {
            SdkHarnessClient.BundleProcessor processor = wrappedClient.getClient().getProcessor(processBundleDescriptor.getProcessBundleDescriptor(), processBundleDescriptor.getRemoteInputDestinations(), stateServer.getService());
            return new SimpleStageBundleFactory(processBundleDescriptor, processor, wrappedClient);
        }

        SimpleStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, SdkHarnessClient.BundleProcessor processor, WrappedSdkHarnessClient wrappedClient) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.processor = processor;
            this.wrappedClient = wrappedClient;
        }

        @Override
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) throws Exception {
            ImmutableMap.Builder outputReceivers = ImmutableMap.builder();
            for (Map.Entry<BeamFnApi.Target, Coder<WindowedValue<?>>> targetCoder : this.processBundleDescriptor.getOutputTargetCoders().entrySet()) {
                BeamFnApi.Target target = targetCoder.getKey();
                Coder<WindowedValue<?>> coder = targetCoder.getValue();
                String bundleOutputPCollection = (String)Iterables.getOnlyElement(this.processBundleDescriptor.getProcessBundleDescriptor().getTransformsOrThrow(target.getPrimitiveTransformReference()).getInputsMap().values());
                FnDataReceiver outputReceiver = outputReceiverFactory.create(bundleOutputPCollection);
                outputReceivers.put((Object)target, (Object)RemoteOutputReceiver.of(coder, outputReceiver));
            }
            return this.processor.newBundle((Map<BeamFnApi.Target, RemoteOutputReceiver<?>>)outputReceivers.build(), stateRequestHandler, progressHandler);
        }

        @Override
        public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
            return this.processBundleDescriptor;
        }

        @Override
        public void close() throws Exception {
            this.wrappedClient = null;
        }
    }
}

