/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;

@Deprecated
public class SingleEnvironmentInstanceJobBundleFactory
implements JobBundleFactory {
    private final EnvironmentFactory environmentFactory;
    private final GrpcFnServer<GrpcDataService> dataService;
    private final GrpcFnServer<GrpcStateService> stateService;
    private final ConcurrentMap<ExecutableStage, StageBundleFactory> stageBundleFactories = new ConcurrentHashMap<ExecutableStage, StageBundleFactory>();
    private final ConcurrentMap<RunnerApi.Environment, RemoteEnvironment> environments = new ConcurrentHashMap<RunnerApi.Environment, RemoteEnvironment>();
    private final IdGenerator idGenerator = IdGenerators.incrementingLongs();

    public static JobBundleFactory create(EnvironmentFactory environmentFactory, GrpcFnServer<GrpcDataService> data, GrpcFnServer<GrpcStateService> state) {
        return new SingleEnvironmentInstanceJobBundleFactory(environmentFactory, data, state);
    }

    private SingleEnvironmentInstanceJobBundleFactory(EnvironmentFactory environmentFactory, GrpcFnServer<GrpcDataService> dataService, GrpcFnServer<GrpcStateService> stateService) {
        this.environmentFactory = environmentFactory;
        this.dataService = dataService;
        this.stateService = stateService;
    }

    @Override
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        return this.stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory);
    }

    private StageBundleFactory createBundleFactory(ExecutableStage stage) {
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor;
        RemoteEnvironment remoteEnv = this.environments.computeIfAbsent(stage.getEnvironment(), env -> {
            try {
                return this.environmentFactory.createEnvironment((RunnerApi.Environment)env);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        SdkHarnessClient sdkHarnessClient = SdkHarnessClient.usingFnApiClient(remoteEnv.getInstructionRequestHandler(), this.dataService.getService()).withIdGenerator(this.idGenerator);
        try {
            descriptor = ProcessBundleDescriptors.fromExecutableStage(this.idGenerator.getId(), stage, this.dataService.getApiServiceDescriptor());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        SdkHarnessClient.BundleProcessor bundleProcessor = sdkHarnessClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), this.stateService.getService());
        return new BundleProcessorStageBundleFactory(descriptor, bundleProcessor);
    }

    @Override
    public void close() throws Exception {
        Exception thrown = null;
        for (RemoteEnvironment remoteEnvironment : this.environments.values()) {
            try {
                remoteEnvironment.close();
            }
            catch (Exception e) {
                if (thrown == null) {
                    thrown = e;
                    continue;
                }
                thrown.addSuppressed(e);
            }
        }
        if (thrown != null) {
            throw thrown;
        }
    }

    private static class BundleProcessorStageBundleFactory
    implements StageBundleFactory {
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor;
        private final SdkHarnessClient.BundleProcessor processor;

        private BundleProcessorStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor, SdkHarnessClient.BundleProcessor processor) {
            this.descriptor = descriptor;
            this.processor = processor;
        }

        @Override
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) {
            HashMap outputReceivers = new HashMap();
            for (Map.Entry<BeamFnApi.Target, Coder<WindowedValue<?>>> targetCoders : this.descriptor.getOutputTargetCoders().entrySet()) {
                String bundleOutputPCollection = (String)Iterables.getOnlyElement(this.descriptor.getProcessBundleDescriptor().getTransformsOrThrow(targetCoders.getKey().getPrimitiveTransformReference()).getInputsMap().values());
                FnDataReceiver outputReceiver = outputReceiverFactory.create(bundleOutputPCollection);
                outputReceivers.put(targetCoders.getKey(), RemoteOutputReceiver.of(targetCoders.getValue(), outputReceiver));
            }
            return this.processor.newBundle(outputReceivers, stateRequestHandler, progressHandler);
        }

        @Override
        public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
            return this.descriptor;
        }

        @Override
        public void close() {
        }
    }
}

