/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_direct_java.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SyntheticComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.SideInputReference;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_MultimapSideInputSpec;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_TargetEncoding;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableTable;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;

public class ProcessBundleDescriptors {
    public static ExecutableProcessBundleDescriptor fromExecutableStage(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint, Endpoints.ApiServiceDescriptor stateEndpoint) throws IOException {
        Preconditions.checkState(id != null, "id must be specified.");
        Preconditions.checkState(stage != null, "stage must be specified.");
        Preconditions.checkState(dataEndpoint != null, "dataEndpoint must be specified.");
        Preconditions.checkState(stateEndpoint != null, "stateEndpoint must be specified.");
        return ProcessBundleDescriptors.fromExecutableStageInternal(id, stage, dataEndpoint, stateEndpoint);
    }

    public static ExecutableProcessBundleDescriptor fromExecutableStage(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint) throws IOException {
        Preconditions.checkState(id != null, "id must be specified.");
        Preconditions.checkState(stage != null, "stage must be specified.");
        Preconditions.checkState(dataEndpoint != null, "dateEndpoint must be specified.");
        return ProcessBundleDescriptors.fromExecutableStageInternal(id, stage, dataEndpoint, null);
    }

    private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint, @Nullable Endpoints.ApiServiceDescriptor stateEndpoint) throws IOException {
        RunnerApi.Components components = stage.getComponents();
        BeamFnApi.ProcessBundleDescriptor.Builder bundleDescriptorBuilder = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(id).putAllCoders(components.getCodersMap()).putAllEnvironments(components.getEnvironmentsMap()).putAllPcollections(components.getPcollectionsMap()).putAllWindowingStrategies(components.getWindowingStrategiesMap()).putAllTransforms(stage.getTransforms().stream().collect(Collectors.toMap(PipelineNode.PTransformNode::getId, PipelineNode.PTransformNode::getTransform)));
        if (stateEndpoint != null) {
            bundleDescriptorBuilder.setStateApiServiceDescriptor(stateEndpoint);
        }
        RemoteInputDestination<WindowedValue<?>> inputDestination = ProcessBundleDescriptors.addStageInput(stage.getInputPCollection(), components, dataEndpoint, bundleDescriptorBuilder);
        LinkedHashMap outputTargetCoders = new LinkedHashMap();
        for (PipelineNode.PCollectionNode outputPCollection : stage.getOutputPCollections()) {
            TargetEncoding targetEncoding = ProcessBundleDescriptors.addStageOutput(components, dataEndpoint, bundleDescriptorBuilder, outputPCollection);
            outputTargetCoders.put(targetEncoding.getTarget(), targetEncoding.getCoder());
        }
        Map<String, Map<String, MultimapSideInputSpec>> multimapSideInputSpecs = ProcessBundleDescriptors.forMultimapSideInputs(stage, components, bundleDescriptorBuilder);
        return ExecutableProcessBundleDescriptor.of(bundleDescriptorBuilder.build(), inputDestination, outputTargetCoders, multimapSideInputSpecs);
    }

    private static RemoteInputDestination<WindowedValue<?>> addStageInput(PipelineNode.PCollectionNode inputPCollection, RunnerApi.Components components, Endpoints.ApiServiceDescriptor dataEndpoint, BeamFnApi.ProcessBundleDescriptor.Builder bundleDescriptorBuilder) throws IOException {
        String inputWireCoderId = ProcessBundleDescriptors.addWireCoder(inputPCollection, components, bundleDescriptorBuilder);
        Coder wireCoder = WireCoders.instantiateRunnerWireCoder(inputPCollection, components);
        BeamFnApi.RemoteGrpcPort inputPort = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(dataEndpoint).setCoderId(inputWireCoderId).build();
        String inputId = SyntheticComponents.uniqueId(String.format("fn/read/%s", inputPCollection.getId()), arg_0 -> ((BeamFnApi.ProcessBundleDescriptor.Builder)bundleDescriptorBuilder).containsTransforms(arg_0));
        RunnerApi.PTransform inputTransform = RemoteGrpcPortRead.readFromPort(inputPort, inputPCollection.getId()).toPTransform();
        bundleDescriptorBuilder.putTransforms(inputId, inputTransform);
        return RemoteInputDestination.of(wireCoder, BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(inputId).setName(Iterables.getOnlyElement(inputTransform.getOutputsMap().keySet())).build());
    }

    private static TargetEncoding addStageOutput(RunnerApi.Components components, Endpoints.ApiServiceDescriptor dataEndpoint, BeamFnApi.ProcessBundleDescriptor.Builder bundleDescriptorBuilder, PipelineNode.PCollectionNode outputPCollection) throws IOException {
        String outputWireCoderId = ProcessBundleDescriptors.addWireCoder(outputPCollection, components, bundleDescriptorBuilder);
        Coder wireCoder = WireCoders.instantiateRunnerWireCoder(outputPCollection, components);
        BeamFnApi.RemoteGrpcPort outputPort = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(dataEndpoint).setCoderId(outputWireCoderId).build();
        RemoteGrpcPortWrite outputWrite = RemoteGrpcPortWrite.writeToPort(outputPCollection.getId(), outputPort);
        String outputId = SyntheticComponents.uniqueId(String.format("fn/write/%s", outputPCollection.getId()), arg_0 -> ((BeamFnApi.ProcessBundleDescriptor.Builder)bundleDescriptorBuilder).containsTransforms(arg_0));
        RunnerApi.PTransform outputTransform = outputWrite.toPTransform();
        bundleDescriptorBuilder.putTransforms(outputId, outputTransform);
        return new AutoValue_ProcessBundleDescriptors_TargetEncoding(BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(outputId).setName(Iterables.getOnlyElement(outputTransform.getInputsMap().keySet())).build(), wireCoder);
    }

    private static Map<String, Map<String, MultimapSideInputSpec>> forMultimapSideInputs(ExecutableStage stage, RunnerApi.Components components, BeamFnApi.ProcessBundleDescriptor.Builder bundleDescriptorBuilder) throws IOException {
        ImmutableTable.Builder idsToSpec = ImmutableTable.builder();
        for (SideInputReference sideInputReference : stage.getSideInputs()) {
            String pCollectionId = sideInputReference.collection().getId();
            RunnerApi.MessageWithComponents lengthPrefixedSideInputCoder = LengthPrefixUnknownCoders.forCoder(components.getPcollectionsOrThrow(pCollectionId).getCoderId(), components, false);
            String lengthPrefixedSideInputCoderId = SyntheticComponents.uniqueId(String.format("fn/side_input/%s", components.getPcollectionsOrThrow(pCollectionId).getCoderId()), bundleDescriptorBuilder.getCodersMap().keySet()::contains);
            bundleDescriptorBuilder.putCoders(lengthPrefixedSideInputCoderId, lengthPrefixedSideInputCoder.getCoder());
            bundleDescriptorBuilder.putAllCoders(lengthPrefixedSideInputCoder.getComponents().getCodersMap());
            bundleDescriptorBuilder.putPcollections(pCollectionId, ((RunnerApi.PCollection)bundleDescriptorBuilder.getPcollectionsMap().get(pCollectionId)).toBuilder().setCoderId(lengthPrefixedSideInputCoderId).build());
            WindowedValue.FullWindowedValueCoder coder = (WindowedValue.FullWindowedValueCoder)WireCoders.instantiateRunnerWireCoder(sideInputReference.collection(), components);
            idsToSpec.put(sideInputReference.transform().getId(), sideInputReference.localName(), MultimapSideInputSpec.of(sideInputReference.transform().getId(), sideInputReference.localName(), ((KvCoder)coder.getValueCoder()).getKeyCoder(), ((KvCoder)coder.getValueCoder()).getValueCoder(), coder.getWindowCoder()));
        }
        return idsToSpec.build().rowMap();
    }

    private static String addWireCoder(PipelineNode.PCollectionNode pCollection, RunnerApi.Components components, BeamFnApi.ProcessBundleDescriptor.Builder bundleDescriptorBuilder) {
        RunnerApi.MessageWithComponents wireCoder = WireCoders.createSdkWireCoder(pCollection, components, arg_0 -> ((BeamFnApi.ProcessBundleDescriptor.Builder)bundleDescriptorBuilder).containsCoders(arg_0));
        bundleDescriptorBuilder.putAllCoders(wireCoder.getComponents().getCodersMap());
        String wireCoderId = SyntheticComponents.uniqueId(String.format("fn/wire/%s", pCollection.getId()), arg_0 -> ((BeamFnApi.ProcessBundleDescriptor.Builder)bundleDescriptorBuilder).containsCoders(arg_0));
        bundleDescriptorBuilder.putCoders(wireCoderId, wireCoder.getCoder());
        return wireCoderId;
    }

    @AutoValue
    public static abstract class ExecutableProcessBundleDescriptor {
        public static ExecutableProcessBundleDescriptor of(BeamFnApi.ProcessBundleDescriptor descriptor, RemoteInputDestination<WindowedValue<?>> inputDestination, Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputTargetCoders, Map<String, Map<String, MultimapSideInputSpec>> multimapSideInputSpecs) {
            ImmutableTable.Builder<String, String, MultimapSideInputSpec> copyOfMultimapSideInputSpecs = ImmutableTable.builder();
            for (Map.Entry<String, Map<String, MultimapSideInputSpec>> outer : multimapSideInputSpecs.entrySet()) {
                for (Map.Entry<String, MultimapSideInputSpec> inner : outer.getValue().entrySet()) {
                    copyOfMultimapSideInputSpecs.put(outer.getKey(), inner.getKey(), inner.getValue());
                }
            }
            return new AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor(descriptor, inputDestination, Collections.unmodifiableMap(outputTargetCoders), copyOfMultimapSideInputSpecs.build().rowMap());
        }

        public abstract BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor();

        public abstract RemoteInputDestination<WindowedValue<?>> getRemoteInputDestination();

        public abstract Map<BeamFnApi.Target, Coder<WindowedValue<?>>> getOutputTargetCoders();

        public abstract Map<String, Map<String, MultimapSideInputSpec>> getMultimapSideInputSpecs();
    }

    @AutoValue
    public static abstract class MultimapSideInputSpec<K, V, W extends BoundedWindow> {
        static <K, V, W extends BoundedWindow> MultimapSideInputSpec<K, V, W> of(String transformId, String sideInputId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
            return new AutoValue_ProcessBundleDescriptors_MultimapSideInputSpec<K, V, W>(transformId, sideInputId, keyCoder, valueCoder, windowCoder);
        }

        public abstract String transformId();

        public abstract String sideInputId();

        public abstract Coder<K> keyCoder();

        public abstract Coder<V> valueCoder();

        public abstract Coder<W> windowCoder();
    }

    @AutoValue
    static abstract class TargetEncoding {
        TargetEncoding() {
        }

        abstract BeamFnApi.Target getTarget();

        abstract Coder<WindowedValue<?>> getCoder();
    }
}

