/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.construction.BeamUrns;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact.AbstractArtifactRetrievalService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact.ClassLoaderArtifactRetrievalService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.AutoValue_DefaultJobBundleFactory_ServerInfo;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.ExternalEnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.ProcessEnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class DefaultJobBundleFactory
implements JobBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);
    private static final IdGenerator factoryIdGenerator = IdGenerators.incrementingLongs();
    private final String factoryId = factoryIdGenerator.getId();
    private final ImmutableList<LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient>> environmentCaches;
    private final AtomicInteger stageBundleFactoryCount = new AtomicInteger();
    private final Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap;
    private final ExecutorService executor;
    private final MapControlClientPool clientPool;
    private final IdGenerator stageIdGenerator;
    private final int environmentExpirationMillis;
    private final Semaphore availableCachesSemaphore;
    private final LinkedBlockingDeque<LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient>> availableCaches;
    private final boolean loadBalanceBundles;

    public static DefaultJobBundleFactory create(JobInfo jobInfo) {
        PipelineOptions pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
        ImmutableMap environmentFactoryProviderMap = ImmutableMap.of((Object)BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardEnvironments.Environments.DOCKER), (Object)new DockerEnvironmentFactory.Provider(pipelineOptions), (Object)BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardEnvironments.Environments.PROCESS), (Object)new ProcessEnvironmentFactory.Provider(pipelineOptions), (Object)BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardEnvironments.Environments.EXTERNAL), (Object)new ExternalEnvironmentFactory.Provider(), (Object)"EMBEDDED", (Object)new EmbeddedEnvironmentFactory.Provider(pipelineOptions));
        return new DefaultJobBundleFactory(jobInfo, (Map<String, EnvironmentFactory.Provider>)environmentFactoryProviderMap);
    }

    public static DefaultJobBundleFactory create(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) {
        return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);
    }

    DefaultJobBundleFactory(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) {
        IdGenerator stageIdSuffixGenerator = IdGenerators.incrementingLongs();
        this.environmentFactoryProviderMap = environmentFactoryMap;
        this.executor = Executors.newCachedThreadPool();
        this.clientPool = MapControlClientPool.create();
        this.stageIdGenerator = () -> this.factoryId + "-" + stageIdSuffixGenerator.getId();
        this.environmentExpirationMillis = DefaultJobBundleFactory.getEnvironmentExpirationMillis(jobInfo);
        this.loadBalanceBundles = DefaultJobBundleFactory.shouldLoadBalanceBundles(jobInfo);
        this.environmentCaches = this.createEnvironmentCaches((ThrowingFunction<ServerFactory, ServerInfo>)((ThrowingFunction)serverFactory -> this.createServerInfo(jobInfo, (ServerFactory)serverFactory)), DefaultJobBundleFactory.getMaxEnvironmentClients(jobInfo));
        this.availableCachesSemaphore = new Semaphore(this.environmentCaches.size(), true);
        this.availableCaches = new LinkedBlockingDeque<LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient>>((Collection<LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient>>)this.environmentCaches);
    }

    @VisibleForTesting
    DefaultJobBundleFactory(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap, IdGenerator stageIdGenerator, ServerInfo serverInfo) {
        this.environmentFactoryProviderMap = environmentFactoryMap;
        this.executor = Executors.newCachedThreadPool();
        this.clientPool = MapControlClientPool.create();
        this.stageIdGenerator = stageIdGenerator;
        this.environmentExpirationMillis = DefaultJobBundleFactory.getEnvironmentExpirationMillis(jobInfo);
        this.loadBalanceBundles = DefaultJobBundleFactory.shouldLoadBalanceBundles(jobInfo);
        this.environmentCaches = this.createEnvironmentCaches((ThrowingFunction<ServerFactory, ServerInfo>)((ThrowingFunction)serverFactory -> serverInfo), DefaultJobBundleFactory.getMaxEnvironmentClients(jobInfo));
        this.availableCachesSemaphore = new Semaphore(this.environmentCaches.size(), true);
        this.availableCaches = new LinkedBlockingDeque<LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient>>((Collection<LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient>>)this.environmentCaches);
    }

    private ImmutableList<LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient>> createEnvironmentCaches(final ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator, int count) {
        CacheBuilder builder = CacheBuilder.newBuilder().removalListener(notification -> {
            int refCount = ((WrappedSdkHarnessClient)notification.getValue()).unref();
            LOG.debug("Removed environment {} with {} remaining bundle references.", notification.getKey(), (Object)refCount);
        });
        if (this.environmentExpirationMillis > 0) {
            builder = builder.expireAfterWrite((long)this.environmentExpirationMillis, TimeUnit.MILLISECONDS);
        }
        ImmutableList.Builder caches = ImmutableList.builder();
        for (int i = 0; i < count; ++i) {
            LoadingCache cache = builder.build((CacheLoader)new CacheLoader<RunnerApi.Environment, WrappedSdkHarnessClient>(){

                public WrappedSdkHarnessClient load(RunnerApi.Environment environment) throws Exception {
                    EnvironmentFactory.Provider environmentFactoryProvider = (EnvironmentFactory.Provider)DefaultJobBundleFactory.this.environmentFactoryProviderMap.get(environment.getUrn());
                    ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
                    ServerInfo serverInfo = (ServerInfo)serverInfoCreator.apply((Object)serverFactory);
                    EnvironmentFactory environmentFactory = environmentFactoryProvider.createEnvironmentFactory(serverInfo.getControlServer(), serverInfo.getLoggingServer(), serverInfo.getRetrievalServer(), serverInfo.getProvisioningServer(), DefaultJobBundleFactory.this.clientPool, DefaultJobBundleFactory.this.stageIdGenerator);
                    return WrappedSdkHarnessClient.wrapping(environmentFactory.createEnvironment(environment), serverInfo);
                }
            });
            caches.add((Object)cache);
        }
        return caches.build();
    }

    private static int getEnvironmentExpirationMillis(JobInfo jobInfo) {
        PipelineOptions pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
        return ((PortablePipelineOptions)pipelineOptions.as(PortablePipelineOptions.class)).getEnvironmentExpirationMillis();
    }

    private static int getMaxEnvironmentClients(JobInfo jobInfo) {
        PortablePipelineOptions pipelineOptions = (PortablePipelineOptions)PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()).as(PortablePipelineOptions.class);
        int maxEnvironments = (Integer)MoreObjects.firstNonNull((Object)pipelineOptions.getSdkWorkerParallelism(), (Object)1);
        Preconditions.checkArgument((maxEnvironments >= 0 ? 1 : 0) != 0, (Object)"sdk_worker_parallelism must be >= 0");
        if (maxEnvironments == 0) {
            maxEnvironments = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
        }
        return maxEnvironments;
    }

    private static boolean shouldLoadBalanceBundles(JobInfo jobInfo) {
        PipelineOptions pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
        boolean loadBalanceBundles = ((PortablePipelineOptions)pipelineOptions.as(PortablePipelineOptions.class)).getLoadBalanceBundles();
        if (loadBalanceBundles) {
            int stateCacheSize = Integer.parseInt((String)MoreObjects.firstNonNull((Object)ExperimentalOptions.getExperimentValue((PipelineOptions)pipelineOptions, (String)"state_cache_size"), (Object)"0"));
            Preconditions.checkArgument((stateCacheSize == 0 ? 1 : 0) != 0, (String)"%s must be 0 when using bundle load balancing", (Object)"state_cache_size");
        }
        return loadBalanceBundles;
    }

    @Override
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        return new SimpleStageBundleFactory(executableStage);
    }

    @Override
    public void close() throws Exception {
        for (LoadingCache environmentCache : this.environmentCaches) {
            environmentCache.invalidateAll();
            environmentCache.cleanUp();
        }
        this.executor.shutdown();
    }

    private static ImmutableMap.Builder<String, RemoteOutputReceiver<?>> getOutputReceivers(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, OutputReceiverFactory outputReceiverFactory) {
        ImmutableMap.Builder outputReceivers = ImmutableMap.builder();
        for (Map.Entry<String, Coder> remoteOutputCoder : processBundleDescriptor.getRemoteOutputCoders().entrySet()) {
            String outputTransform = remoteOutputCoder.getKey();
            Coder coder = remoteOutputCoder.getValue();
            String bundleOutputPCollection = (String)Iterables.getOnlyElement(processBundleDescriptor.getProcessBundleDescriptor().getTransformsOrThrow(outputTransform).getInputsMap().values());
            FnDataReceiver outputReceiver = outputReceiverFactory.create(bundleOutputPCollection);
            outputReceivers.put((Object)outputTransform, (Object)RemoteOutputReceiver.of(coder, outputReceiver));
        }
        return outputReceivers;
    }

    private PreparedClient prepare(WrappedSdkHarnessClient wrappedClient, ExecutableStage executableStage) {
        PreparedClient preparedClient = new PreparedClient();
        try {
            preparedClient.wrappedClient = wrappedClient;
            preparedClient.processBundleDescriptor = ProcessBundleDescriptors.fromExecutableStage(this.stageIdGenerator.getId(), executableStage, wrappedClient.getServerInfo().getDataServer().getApiServiceDescriptor(), wrappedClient.getServerInfo().getStateServer().getApiServiceDescriptor());
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create ProcessBundleDescriptor.", e);
        }
        preparedClient.processor = wrappedClient.getClient().getProcessor(preparedClient.processBundleDescriptor.getProcessBundleDescriptor(), preparedClient.processBundleDescriptor.getRemoteInputDestinations(), wrappedClient.getServerInfo().getStateServer().getService());
        return preparedClient;
    }

    private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory) throws IOException {
        Preconditions.checkNotNull((Object)serverFactory, (Object)"serverFactory can not be null");
        PortablePipelineOptions portableOptions = (PortablePipelineOptions)PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()).as(PortablePipelineOptions.class);
        AbstractArtifactRetrievalService artifactRetrievalService = portableOptions.getRetrievalServiceType() == PortablePipelineOptions.RetrievalServiceType.CLASSLOADER ? new ClassLoaderArtifactRetrievalService() : BeamFileSystemArtifactRetrievalService.create();
        GrpcFnServer<FnApiControlClientPoolService> controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(this.clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        GrpcFnServer<GrpcLoggingService> loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
        GrpcFnServer<ArtifactRetrievalService> retrievalServer = GrpcFnServer.allocatePortAndCreateFor(artifactRetrievalService, serverFactory);
        GrpcFnServer<StaticGrpcProvisionService> provisioningServer = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
        GrpcFnServer<GrpcDataService> dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create((PipelineOptions)portableOptions, this.executor, OutboundObserverFactory.serverDirect()), serverFactory);
        GrpcFnServer<GrpcStateService> stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
        ServerInfo serverInfo = new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder().setControlServer(controlServer).setLoggingServer(loggingServer).setRetrievalServer(retrievalServer).setProvisioningServer(provisioningServer).setDataServer(dataServer).setStateServer(stateServer).build();
        return serverInfo;
    }

    @AutoValue
    public static abstract class ServerInfo {
        abstract GrpcFnServer<FnApiControlClientPoolService> getControlServer();

        abstract GrpcFnServer<GrpcLoggingService> getLoggingServer();

        abstract GrpcFnServer<ArtifactRetrievalService> getRetrievalServer();

        abstract GrpcFnServer<StaticGrpcProvisionService> getProvisioningServer();

        abstract GrpcFnServer<GrpcDataService> getDataServer();

        abstract GrpcFnServer<GrpcStateService> getStateServer();

        abstract Builder toBuilder();

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setControlServer(GrpcFnServer<FnApiControlClientPoolService> var1);

            abstract Builder setLoggingServer(GrpcFnServer<GrpcLoggingService> var1);

            abstract Builder setRetrievalServer(GrpcFnServer<ArtifactRetrievalService> var1);

            abstract Builder setProvisioningServer(GrpcFnServer<StaticGrpcProvisionService> var1);

            abstract Builder setDataServer(GrpcFnServer<GrpcDataService> var1);

            abstract Builder setStateServer(GrpcFnServer<GrpcStateService> var1);

            abstract ServerInfo build();
        }
    }

    protected static class WrappedSdkHarnessClient {
        private final RemoteEnvironment environment;
        private final SdkHarnessClient client;
        private final ServerInfo serverInfo;
        private final AtomicInteger bundleRefCount = new AtomicInteger();

        static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) {
            SdkHarnessClient client = SdkHarnessClient.usingFnApiClient(environment.getInstructionRequestHandler(), serverInfo.getDataServer().getService());
            return new WrappedSdkHarnessClient(environment, client, serverInfo);
        }

        private WrappedSdkHarnessClient(RemoteEnvironment environment, SdkHarnessClient client, ServerInfo serverInfo) {
            this.environment = environment;
            this.client = client;
            this.serverInfo = serverInfo;
            this.ref();
        }

        SdkHarnessClient getClient() {
            return this.client;
        }

        ServerInfo getServerInfo() {
            return this.serverInfo;
        }

        public void close() {
            try (RemoteEnvironment envCloser = this.environment;
                 GrpcFnServer<GrpcStateService> stateServer = this.serverInfo.getStateServer();
                 GrpcFnServer<GrpcDataService> dateServer = this.serverInfo.getDataServer();
                 GrpcFnServer<FnApiControlClientPoolService> controlServer = this.serverInfo.getControlServer();
                 GrpcFnServer<GrpcLoggingService> loggingServer = this.serverInfo.getLoggingServer();
                 GrpcFnServer<ArtifactRetrievalService> retrievalServer = this.serverInfo.getRetrievalServer();){
                GrpcFnServer<StaticGrpcProvisionService> provisioningServer = this.serverInfo.getProvisioningServer();
                Throwable throwable = null;
                if (provisioningServer != null) {
                    WrappedSdkHarnessClient.$closeResource(throwable, provisioningServer);
                }
            }
            catch (Exception e) {
                LOG.warn("Error cleaning up servers {}", (Object)this.environment.getEnvironment(), (Object)e);
            }
        }

        private int ref() {
            return this.bundleRefCount.incrementAndGet();
        }

        private int unref() {
            int count = this.bundleRefCount.decrementAndGet();
            if (count == 0) {
                LOG.info("Closing environment {}", (Object)this.environment.getEnvironment());
                this.close();
            }
            return count;
        }
    }

    private class SimpleStageBundleFactory
    implements StageBundleFactory {
        private final ExecutableStage executableStage;
        private final int environmentIndex;
        private final HashMap<WrappedSdkHarnessClient, PreparedClient> preparedClients = new HashMap();
        private PreparedClient currentClient;

        private SimpleStageBundleFactory(ExecutableStage executableStage) {
            this.executableStage = executableStage;
            this.environmentIndex = DefaultJobBundleFactory.this.stageBundleFactoryCount.getAndIncrement() % DefaultJobBundleFactory.this.environmentCaches.size();
            WrappedSdkHarnessClient client = (WrappedSdkHarnessClient)((LoadingCache)DefaultJobBundleFactory.this.environmentCaches.get(this.environmentIndex)).getUnchecked((Object)executableStage.getEnvironment());
            this.currentClient = DefaultJobBundleFactory.this.prepare(client, executableStage);
            this.preparedClients.put(client, this.currentClient);
        }

        @Override
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) throws Exception {
            WrappedSdkHarnessClient client;
            LoadingCache currentCache;
            if (DefaultJobBundleFactory.this.environmentExpirationMillis == 0 && !DefaultJobBundleFactory.this.loadBalanceBundles) {
                return this.currentClient.processor.newBundle((Map<String, RemoteOutputReceiver<?>>)DefaultJobBundleFactory.getOutputReceivers(this.currentClient.processBundleDescriptor, outputReceiverFactory).build(), stateRequestHandler, progressHandler);
            }
            if (DefaultJobBundleFactory.this.loadBalanceBundles) {
                DefaultJobBundleFactory.this.availableCachesSemaphore.acquire();
                currentCache = (LoadingCache)DefaultJobBundleFactory.this.availableCaches.take();
                client = (WrappedSdkHarnessClient)currentCache.getUnchecked((Object)this.executableStage.getEnvironment());
                client.ref();
                this.currentClient = this.preparedClients.get(client);
                if (this.currentClient == null) {
                    this.currentClient = DefaultJobBundleFactory.this.prepare(client, this.executableStage);
                    this.preparedClients.put(client, this.currentClient);
                    this.preparedClients.keySet().removeIf(c -> ((WrappedSdkHarnessClient)c).bundleRefCount.get() == 0);
                }
            } else {
                currentCache = (LoadingCache)DefaultJobBundleFactory.this.environmentCaches.get(this.environmentIndex);
                client = (WrappedSdkHarnessClient)currentCache.getUnchecked((Object)this.executableStage.getEnvironment());
                client.ref();
                if (this.currentClient.wrappedClient != client) {
                    this.preparedClients.clear();
                    this.currentClient = DefaultJobBundleFactory.this.prepare(client, this.executableStage);
                    this.preparedClients.put(client, this.currentClient);
                }
            }
            final SdkHarnessClient.BundleProcessor.ActiveBundle bundle = this.currentClient.processor.newBundle((Map<String, RemoteOutputReceiver<?>>)DefaultJobBundleFactory.getOutputReceivers(this.currentClient.processBundleDescriptor, outputReceiverFactory).build(), stateRequestHandler, progressHandler);
            return new RemoteBundle(){

                @Override
                public String getId() {
                    return bundle.getId();
                }

                @Override
                public Map<String, FnDataReceiver> getInputReceivers() {
                    return bundle.getInputReceivers();
                }

                @Override
                public void split(double fractionOfRemainder) {
                    bundle.split(fractionOfRemainder);
                }

                @Override
                public void close() throws Exception {
                    bundle.close();
                    SimpleStageBundleFactory.this.currentClient.wrappedClient.unref();
                    if (DefaultJobBundleFactory.this.loadBalanceBundles) {
                        DefaultJobBundleFactory.this.availableCaches.offer(currentCache);
                        DefaultJobBundleFactory.this.availableCachesSemaphore.release();
                    }
                }
            };
        }

        @Override
        public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
            return this.currentClient.processBundleDescriptor;
        }

        @Override
        public void close() throws Exception {
            this.preparedClients.clear();
        }
    }

    private static class PreparedClient {
        private SdkHarnessClient.BundleProcessor processor;
        private ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        private WrappedSdkHarnessClient wrappedClient;

        private PreparedClient() {
        }
    }
}

