/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.control.HarnessMonitoringInfosInstructionHandler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.status.BeamFnStatusClient;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FnHarness {
    private static final String HARNESS_ID = "HARNESS_ID";
    private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
    private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
    private static final String STATUS_API_SERVICE_DESCRIPTOR = "STATUS_API_SERVICE_DESCRIPTOR";
    private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
    private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
    private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);

    private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String descriptor) throws TextFormat.ParseException {
        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        TextFormat.merge(descriptor, (Message.Builder)apiServiceDescriptorBuilder);
        return apiServiceDescriptorBuilder.build();
    }

    public static void main(String[] args) throws Exception {
        FnHarness.main(System::getenv);
    }

    @VisibleForTesting
    public static void main(Function<String, String> environmentVarGetter) throws Exception {
        JvmInitializers.runOnStartup();
        System.out.format("SDK Fn Harness started%n", new Object[0]);
        System.out.format("Harness ID %s%n", environmentVarGetter.apply(HARNESS_ID));
        System.out.format("Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
        System.out.format("Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
        System.out.format("Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
        System.out.format("Pipeline options %s%n", environmentVarGetter.apply(PIPELINE_OPTIONS));
        String id = environmentVarGetter.apply(HARNESS_ID);
        PipelineOptions options = PipelineOptionsTranslation.fromJson(environmentVarGetter.apply(PIPELINE_OPTIONS));
        Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = FnHarness.getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
        Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = FnHarness.getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
        Endpoints.ApiServiceDescriptor statusApiServiceDescriptor = environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null ? null : FnHarness.getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
        String runnerCapabilitesOrNull = environmentVarGetter.apply(RUNNER_CAPABILITIES);
        Set<String> runnerCapabilites = runnerCapabilitesOrNull == null ? Collections.emptySet() : ImmutableSet.copyOf(runnerCapabilitesOrNull.split("\\s+"));
        FnHarness.main(id, options, runnerCapabilites, loggingApiServiceDescriptor, controlApiServiceDescriptor, statusApiServiceDescriptor);
    }

    public static void main(String id, PipelineOptions options, Set<String> runnerCapabilites, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, @Nullable Endpoints.ApiServiceDescriptor statusApiServiceDescriptor) throws Exception {
        List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
        ManagedChannelFactory channelFactory = experiments != null && experiments.contains("beam_fn_api_epoll") ? ManagedChannelFactory.createEpoll() : ManagedChannelFactory.createDefault();
        OutboundObserverFactory outboundObserverFactory = HarnessStreamObserverFactories.fromOptions(options);
        channelFactory = channelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)));
        FnHarness.main(id, options, runnerCapabilites, loggingApiServiceDescriptor, controlApiServiceDescriptor, statusApiServiceDescriptor, channelFactory, outboundObserverFactory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String id, PipelineOptions options, Set<String> runnerCapabilites, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, Endpoints.ApiServiceDescriptor statusApiServiceDescriptor, ManagedChannelFactory channelFactory, OutboundObserverFactory outboundObserverFactory) throws Exception {
        IdGenerator idGenerator = IdGenerators.decrementingLongs();
        ShortIdMap metricsShortIds = new ShortIdMap();
        ExecutorService executorService = options.as(GcsOptions.class).getExecutorService();
        try (BeamFnLoggingClient logging = new BeamFnLoggingClient(options, loggingApiServiceDescriptor, channelFactory::forDescriptor);){
            LOG.info("Fn Harness started");
            FileSystems.setDefaultPipelineOptions(options);
            EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers = new EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>>(BeamFnApi.InstructionRequest.RequestCase.class);
            ManagedChannel channel = channelFactory.forDescriptor(controlApiServiceDescriptor);
            BeamFnControlGrpc.BeamFnControlStub controlStub = BeamFnControlGrpc.newStub(channel);
            final BeamFnControlGrpc.BeamFnControlBlockingStub blockingControlStub = BeamFnControlGrpc.newBlockingStub(channel);
            BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(options, channelFactory::forDescriptor, outboundObserverFactory);
            BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = new BeamFnStateGrpcClientCache(idGenerator, channelFactory::forDescriptor, outboundObserverFactory);
            FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler(options.as(GcsOptions.class).getExecutorService());
            LoadingCache<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterAccess(10L, TimeUnit.MINUTES).build(new CacheLoader<String, BeamFnApi.ProcessBundleDescriptor>(){

                @Override
                public BeamFnApi.ProcessBundleDescriptor load(String id) {
                    return blockingControlStub.getProcessBundleDescriptor(BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder().setProcessBundleDescriptorId(id).build());
                }
            });
            MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
            ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(options, runnerCapabilites, processBundleDescriptors::getUnchecked, beamFnDataMultiplexer, beamFnStateGrpcClientCache, finalizeBundleHandler, metricsShortIds);
            BeamFnStatusClient beamFnStatusClient = null;
            if (statusApiServiceDescriptor != null) {
                beamFnStatusClient = new BeamFnStatusClient(statusApiServiceDescriptor, channelFactory::forDescriptor, processBundleHandler.getBundleProcessorCache(), options);
            }
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, request -> BeamFnApi.InstructionResponse.newBuilder().setRegister(BeamFnApi.RegisterResponse.getDefaultInstance()));
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.FINALIZE_BUNDLE, finalizeBundleHandler::finalizeBundle);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, processBundleHandler::processBundle);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE_PROGRESS, processBundleHandler::progress);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE_SPLIT, processBundleHandler::trySplit);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.MONITORING_INFOS, request -> BeamFnApi.InstructionResponse.newBuilder().setMonitoringInfos(BeamFnApi.MonitoringInfosMetadataResponse.newBuilder().putAllMonitoringInfo(StreamSupport.stream(request.getMonitoringInfos().getMonitoringInfoIdList().spliterator(), false).collect(Collectors.toMap(Function.identity(), metricsShortIds::get)))));
            HarnessMonitoringInfosInstructionHandler processWideHandler = new HarnessMonitoringInfosInstructionHandler(metricsShortIds);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS, processWideHandler::harnessMonitoringInfos);
            JvmInitializers.runBeforeProcessing(options);
            String samplingPeriodMills = ExperimentalOptions.getExperimentValue(options, "state_sampling_period_millis");
            if (samplingPeriodMills != null) {
                ExecutionStateSampler.setSamplingPeriod(Integer.parseInt(samplingPeriodMills));
            }
            ExecutionStateSampler.instance().start();
            LOG.info("Entering instruction processing loop");
            BeamFnControlClient control = new BeamFnControlClient((BeamFnControlGrpc.BeamFnControlStub)controlStub.withExecutor(MoreExecutors.directExecutor()), outboundObserverFactory, executorService, handlers);
            control.waitForTermination();
            if (beamFnStatusClient != null) {
                beamFnStatusClient.close();
            }
            processBundleHandler.shutdown();
        }
        finally {
            System.out.println("Shutting SDK harness down.");
            ExecutionStateSampler.instance().stop();
            executorService.shutdown();
        }
    }
}

