/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.ExecutableStageTranslation;
import org.apache.beam.runners.core.construction.NativeTransforms;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.RunnerPCollectionView;
import org.apache.beam.runners.core.construction.TestStreamTranslation;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.flink.FlinkExecutionEnvironments;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator;
import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.StreamingImpulseSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashMultiset;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class FlinkStreamingPortablePipelineTranslator
implements FlinkPortablePipelineTranslator<StreamingTranslationContext> {
    @Deprecated
    private static final String STREAMING_IMPULSE_TRANSFORM_URN = "flink:transform:streaming_impulse:v1";
    private final Map<String, PTransformTranslator<StreamingTranslationContext>> urnToTransformTranslator;

    @Override
    public StreamingTranslationContext createTranslationContext(JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, String confDir, List<String> filesToStage) {
        StreamExecutionEnvironment executionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment(pipelineOptions, filesToStage, confDir);
        return new StreamingTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
    }

    FlinkStreamingPortablePipelineTranslator() {
        ImmutableMap.Builder translatorMap = ImmutableMap.builder();
        translatorMap.put((Object)PTransformTranslation.FLATTEN_TRANSFORM_URN, this::translateFlatten);
        translatorMap.put((Object)PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, this::translateGroupByKey);
        translatorMap.put((Object)PTransformTranslation.IMPULSE_TRANSFORM_URN, this::translateImpulse);
        translatorMap.put((Object)"beam:runner:executable_stage:v1", this::translateExecutableStage);
        translatorMap.put((Object)PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle);
        translatorMap.put((Object)STREAMING_IMPULSE_TRANSFORM_URN, this::translateStreamingImpulse);
        translatorMap.put((Object)PTransformTranslation.READ_TRANSFORM_URN, this::translateRead);
        translatorMap.put((Object)PTransformTranslation.TEST_STREAM_TRANSFORM_URN, this::translateTestStream);
        this.urnToTransformTranslator = translatorMap.build();
    }

    @Override
    public Set<String> knownUrns() {
        return Sets.difference(this.urnToTransformTranslator.keySet(), (Set)ImmutableSet.of((Object)PTransformTranslation.READ_TRANSFORM_URN));
    }

    @Override
    public FlinkPortablePipelineTranslator.Executor translate(StreamingTranslationContext context, RunnerApi.Pipeline pipeline) {
        QueryablePipeline p = QueryablePipeline.forTransforms((Collection)pipeline.getRootTransformIdsList(), (RunnerApi.Components)pipeline.getComponents());
        for (PipelineNode.PTransformNode transform : p.getTopologicallyOrderedTransforms()) {
            this.urnToTransformTranslator.getOrDefault(transform.getTransform().getSpec().getUrn(), this::urnNotFound).translate(transform.getId(), pipeline, context);
        }
        return context;
    }

    private void urnNotFound(String id, RunnerApi.Pipeline pipeline, FlinkPortablePipelineTranslator.TranslationContext context) {
        throw new IllegalArgumentException(String.format("Unknown type of URN %s for PTransform with id %s.", pipeline.getComponents().getTransformsOrThrow(id).getSpec().getUrn(), id));
    }

    private <K, V> void translateReshuffle(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
        DataStream inputDataStream = context.getDataStreamOrThrow((String)Iterables.getOnlyElement(transform.getInputsMap().values()));
        context.addDataStream((String)Iterables.getOnlyElement(transform.getOutputsMap().values()), inputDataStream.rebalance());
    }

    private <T> void translateFlatten(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
        Map allInputs = transform.getInputsMap();
        if (allInputs.isEmpty()) {
            boolean keepSourceAlive = context.getPipelineOptions().isShutdownSourcesOnFinalWatermark() == false;
            DataStreamSource dummySource = context.getExecutionEnvironment().addSource((SourceFunction)new ImpulseSourceFunction(keepSourceAlive));
            SingleOutputStreamOperator result = dummySource.flatMap((FlatMapFunction & Serializable)(s, collector) -> {}).returns(new CoderTypeInformation(WindowedValue.getFullCoder((Coder)VoidCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE)));
            context.addDataStream((String)Iterables.getOnlyElement(transform.getOutputsMap().values()), result);
        } else {
            DataStream current;
            DataStream result = null;
            HashMultiset inputCounts = HashMultiset.create();
            for (String input : allInputs.values()) {
                current = context.getDataStreamOrThrow(input);
                inputCounts.add(current, 1);
            }
            for (String input : allInputs.values()) {
                current = context.getDataStreamOrThrow(input);
                int timesRequired = inputCounts.count(current);
                if (timesRequired > 1) {
                    current = current.flatMap(new FlatMapFunction<T, T>(){
                        private static final long serialVersionUID = 1L;

                        public void flatMap(T t, Collector<T> collector) {
                            collector.collect(t);
                        }
                    });
                }
                result = result == null ? current : result.union(new DataStream[]{current});
            }
            context.addDataStream((String)Iterables.getOnlyElement(transform.getOutputsMap().values()), result);
        }
    }

    private <K, V> void translateGroupByKey(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        WindowingStrategy windowingStrategy;
        RunnerApi.PTransform pTransform = pipeline.getComponents().getTransformsOrThrow(id);
        String inputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getInputsMap().values());
        RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)pipeline.getComponents());
        RunnerApi.WindowingStrategy windowingStrategyProto = pipeline.getComponents().getWindowingStrategiesOrThrow(pipeline.getComponents().getPcollectionsOrThrow(inputPCollectionId).getWindowingStrategyId());
        try {
            windowingStrategy = WindowingStrategyTranslation.fromProto((RunnerApi.WindowingStrategy)windowingStrategyProto, (RehydratedComponents)rehydratedComponents);
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalStateException(String.format("Unable to hydrate GroupByKey windowing strategy %s.", windowingStrategyProto), e);
        }
        WindowedValue.WindowedValueCoder windowedInputCoder = (WindowedValue.WindowedValueCoder)PipelineTranslatorUtils.instantiateCoder((String)inputPCollectionId, (RunnerApi.Components)pipeline.getComponents());
        DataStream inputDataStream = context.getDataStreamOrThrow(inputPCollectionId);
        SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<V>>>> outputDataStream = this.addGBK(inputDataStream, windowingStrategy, windowedInputCoder, pTransform.getUniqueName(), context);
        outputDataStream.uid(pTransform.getUniqueName());
        context.addDataStream((String)Iterables.getOnlyElement(pTransform.getOutputsMap().values()), outputDataStream);
    }

    private <K, V> SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<V>>>> addGBK(DataStream<WindowedValue<KV<K, V>>> inputDataStream, WindowingStrategy<?, ?> windowingStrategy, WindowedValue.WindowedValueCoder<KV<K, V>> windowedInputCoder, String operatorName, StreamingTranslationContext context) {
        KvCoder inputElementCoder = (KvCoder)windowedInputCoder.getValueCoder();
        SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of(inputElementCoder.getKeyCoder(), inputElementCoder.getValueCoder(), (Coder<? extends BoundedWindow>)windowingStrategy.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder windowedWorkItemCoder = WindowedValue.getFullCoder(workItemCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        CoderTypeInformation workItemTypeInfo = new CoderTypeInformation(windowedWorkItemCoder);
        SingleOutputStreamOperator workItemStream = inputDataStream.flatMap(new FlinkStreamingTransformTranslators.ToKeyedWorkItem()).returns(workItemTypeInfo).name("ToKeyedWorkItem");
        WorkItemKeySelector keySelector = new WorkItemKeySelector(inputElementCoder.getKeyCoder());
        KeyedStream keyedWorkItemStream = workItemStream.keyBy(keySelector);
        SystemReduceFn reduceFn = SystemReduceFn.buffering((Coder)inputElementCoder.getValueCoder());
        IterableCoder accumulatorCoder = IterableCoder.of((Coder)inputElementCoder.getValueCoder());
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.getFullCoder((Coder)KvCoder.of((Coder)inputElementCoder.getKeyCoder(), (Coder)accumulatorCoder), (Coder)windowingStrategy.getWindowFn().windowCoder());
        CoderTypeInformation outputTypeInfo = new CoderTypeInformation(outputCoder);
        TupleTag mainTag = new TupleTag("main output");
        WindowDoFnOperator doFnOperator = new WindowDoFnOperator(reduceFn, operatorName, windowedWorkItemCoder, mainTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(mainTag, outputCoder), windowingStrategy, new HashMap(), (Collection<PCollectionView<?>>)Collections.emptyList(), context.getPipelineOptions(), inputElementCoder.getKeyCoder(), keySelector);
        SingleOutputStreamOperator outputDataStream = keyedWorkItemStream.transform(operatorName, outputTypeInfo, doFnOperator);
        return outputDataStream;
    }

    private <T> void translateRead(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        RunnerApi.ReadPayload payload;
        RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(id);
        String outputCollectionId = (String)Iterables.getOnlyElement(transform.getOutputsMap().values());
        try {
            payload = RunnerApi.ReadPayload.parseFrom((ByteString)transform.getSpec().getPayload());
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to parse ReadPayload from transform", e);
        }
        ReadSourceTranslator readTranslator = payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED ? new ReadBoundedTranslator() : new ReadUnboundedTranslator();
        DataStream source = readTranslator.translateRead(transform.getUniqueName(), outputCollectionId, payload, pipeline, context.getPipelineOptions(), context.getExecutionEnvironment());
        context.addDataStream(outputCollectionId, source);
    }

    private void translateImpulse(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        RunnerApi.PTransform pTransform = pipeline.getComponents().getTransformsOrThrow(id);
        CoderTypeInformation typeInfo = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)ByteArrayCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE));
        boolean keepSourceAlive = context.getPipelineOptions().isShutdownSourcesOnFinalWatermark() == false;
        SingleOutputStreamOperator source = context.getExecutionEnvironment().addSource((SourceFunction)new ImpulseSourceFunction(keepSourceAlive), "Impulse").returns(typeInfo);
        context.addDataStream((String)Iterables.getOnlyElement(pTransform.getOutputsMap().values()), source);
    }

    private void translateStreamingImpulse(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        int messageCount;
        int intervalMillis;
        RunnerApi.PTransform pTransform = pipeline.getComponents().getTransformsOrThrow(id);
        CoderTypeInformation typeInfo = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)ByteArrayCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE));
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            JsonNode config = objectMapper.readTree(pTransform.getSpec().getPayload().toByteArray());
            intervalMillis = config.path("interval_ms").asInt(100);
            messageCount = config.path("message_count").asInt(0);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to parse configuration for streaming impulse", e);
        }
        SingleOutputStreamOperator source = context.getExecutionEnvironment().addSource((SourceFunction)new StreamingImpulseSource(intervalMillis, messageCount), StreamingImpulseSource.class.getSimpleName()).returns(typeInfo);
        context.addDataStream((String)Iterables.getOnlyElement(pTransform.getOutputsMap().values()), source);
    }

    private <InputT, OutputT> void translateExecutableStage(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        SingleOutputStreamOperator outputStream;
        RunnerApi.ExecutableStagePayload stagePayload;
        RunnerApi.Components components = pipeline.getComponents();
        RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
        Map outputs = transform.getOutputsMap();
        try {
            stagePayload = RunnerApi.ExecutableStagePayload.parseFrom((ByteString)transform.getSpec().getPayload());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        String inputPCollectionId = stagePayload.getInput();
        TransformedSideInputs transformedSideInputs = stagePayload.getSideInputsCount() > 0 ? this.transformSideInputs(stagePayload, components, context) : new TransformedSideInputs(Collections.emptyMap(), null);
        LinkedHashMap tagsToOutputTags = Maps.newLinkedHashMap();
        LinkedHashMap tagsToCoders = Maps.newLinkedHashMap();
        TupleTag mainOutputTag = outputs.isEmpty() ? null : new TupleTag((String)outputs.keySet().iterator().next());
        BiMap outputIndexMap = PipelineTranslatorUtils.createOutputMap(outputs.keySet());
        HashMap outputCoders = Maps.newHashMap();
        HashMap tagsToIds = Maps.newHashMap();
        HashMap collectionIdToTupleTag = Maps.newHashMap();
        for (String localOutputName : new TreeMap(outputIndexMap).keySet()) {
            String collectionId = (String)outputs.get(localOutputName);
            Coder windowCoder = PipelineTranslatorUtils.instantiateCoder((String)collectionId, (RunnerApi.Components)components);
            outputCoders.put(localOutputName, windowCoder);
            TupleTag tupleTag = new TupleTag(localOutputName);
            CoderTypeInformation typeInformation = new CoderTypeInformation(windowCoder);
            tagsToOutputTags.put(tupleTag, new OutputTag(localOutputName, typeInformation));
            tagsToCoders.put(tupleTag, windowCoder);
            tagsToIds.put(tupleTag, (Integer)outputIndexMap.get((Object)localOutputName));
            collectionIdToTupleTag.put(collectionId, tupleTag);
        }
        KeyedStream inputDataStream = context.getDataStreamOrThrow(inputPCollectionId);
        CoderTypeInformation outputTypeInformation = !outputs.isEmpty() ? new CoderTypeInformation((Coder)outputCoders.get(mainOutputTag.getId())) : null;
        ArrayList additionalOutputTags = Lists.newArrayList();
        for (TupleTag tupleTag : tagsToCoders.keySet()) {
            if (mainOutputTag.getId().equals(tupleTag.getId())) continue;
            additionalOutputTags.add(tupleTag);
        }
        Coder windowedInputCoder = PipelineTranslatorUtils.instantiateCoder((String)inputPCollectionId, (RunnerApi.Components)components);
        boolean stateful = stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() > 0;
        Coder keyCoder = null;
        KvToByteBufferKeySelector keySelector = null;
        if (stateful) {
            Coder valueCoder = ((WindowedValue.FullWindowedValueCoder)windowedInputCoder).getValueCoder();
            if (!(valueCoder instanceof KvCoder)) {
                throw new IllegalStateException(String.format(Locale.ENGLISH, "The element coder for stateful DoFn '%s' must be KvCoder but is: %s", inputPCollectionId, valueCoder.getClass().getSimpleName()));
            }
            keyCoder = ((KvCoder)valueCoder).getKeyCoder();
            if (keyCoder instanceof LengthPrefixCoder) {
                keyCoder = ((LengthPrefixCoder)keyCoder).getValueCoder();
            }
            keySelector = new KvToByteBufferKeySelector(keyCoder);
            inputDataStream = inputDataStream.keyBy(keySelector);
        }
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds);
        ExecutableStageDoFnOperator doFnOperator = new ExecutableStageDoFnOperator(transform.getUniqueName(), windowedInputCoder, null, Collections.emptyMap(), mainOutputTag, additionalOutputTags, outputManagerFactory, transformedSideInputs.unionTagToView, new ArrayList(transformedSideInputs.unionTagToView.values()), FlinkStreamingPortablePipelineTranslator.getSideInputIdToPCollectionViewMap(stagePayload, components), context.getPipelineOptions(), stagePayload, context.getJobInfo(), FlinkExecutableStageContext.factory(context.getPipelineOptions()), collectionIdToTupleTag, PipelineTranslatorUtils.getWindowingStrategy((String)inputPCollectionId, (RunnerApi.Components)components), keyCoder, keySelector);
        String operatorName = ExecutableStageTranslation.generateNameFromStagePayload((RunnerApi.ExecutableStagePayload)stagePayload);
        if (transformedSideInputs.unionTagToView.isEmpty()) {
            outputStream = inputDataStream.transform(operatorName, outputTypeInformation, doFnOperator);
        } else {
            DataStream sideInputStream = transformedSideInputs.unionedSideInputs.broadcast();
            if (stateful) {
                TwoInputTransformation rawFlinkTransform = new TwoInputTransformation(inputDataStream.getTransformation(), sideInputStream.getTransformation(), transform.getUniqueName(), doFnOperator, outputTypeInformation, inputDataStream.getParallelism());
                rawFlinkTransform.setStateKeyType(inputDataStream.getKeyType());
                rawFlinkTransform.setStateKeySelectors(inputDataStream.getKeySelector(), null);
                outputStream = new SingleOutputStreamOperator(inputDataStream.getExecutionEnvironment(), (StreamTransformation)rawFlinkTransform){};
            } else {
                outputStream = inputDataStream.connect(sideInputStream).transform(operatorName, outputTypeInformation, doFnOperator);
            }
        }
        outputStream.uid(transform.getUniqueName());
        if (mainOutputTag != null) {
            context.addDataStream((String)outputs.get(mainOutputTag.getId()), outputStream);
        }
        for (TupleTag tupleTag : additionalOutputTags) {
            context.addDataStream((String)outputs.get(tupleTag.getId()), outputStream.getSideOutput((OutputTag)tagsToOutputTags.get(tupleTag)));
        }
    }

    private <T> void translateTestStream(String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) {
        RunnerApi.Components components = pipeline.getComponents();
        SerializableFunction & Serializable testStreamDecoder = (SerializableFunction & Serializable)bytes -> {
            try {
                RunnerApi.TestStreamPayload testStreamPayload = RunnerApi.TestStreamPayload.parseFrom((byte[])bytes);
                TestStream testStream = TestStreamTranslation.testStreamFromProtoPayload((RunnerApi.TestStreamPayload)testStreamPayload, (RehydratedComponents)RehydratedComponents.forComponents((RunnerApi.Components)components));
                return testStream;
            }
            catch (Exception e) {
                throw new RuntimeException("Can't decode TestStream payload.", e);
            }
        };
        RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
        String outputPCollectionId = (String)Iterables.getOnlyElement(transform.getOutputsMap().values());
        Coder coder = PipelineTranslatorUtils.instantiateCoder((String)outputPCollectionId, (RunnerApi.Components)components);
        DataStreamSource source = context.getExecutionEnvironment().addSource(new TestStreamSource(testStreamDecoder, transform.getSpec().getPayload().toByteArray()), new CoderTypeInformation(coder));
        context.addDataStream(outputPCollectionId, source);
    }

    private static LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> getSideInputIdToPCollectionViewMap(RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components components) {
        RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)components);
        LinkedHashMap sideInputs = new LinkedHashMap();
        PCollectionViews.MultimapViewFn viewFn = new PCollectionViews.MultimapViewFn();
        for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : stagePayload.getSideInputsList()) {
            WindowingStrategy windowingStrategy;
            String sideInputTag = sideInputId.getLocalName();
            String collectionId = components.getTransformsOrThrow(sideInputId.getTransformId()).getInputsOrThrow(sideInputId.getLocalName());
            RunnerApi.WindowingStrategy windowingStrategyProto = components.getWindowingStrategiesOrThrow(components.getPcollectionsOrThrow(collectionId).getWindowingStrategyId());
            try {
                windowingStrategy = WindowingStrategyTranslation.fromProto((RunnerApi.WindowingStrategy)windowingStrategyProto, (RehydratedComponents)rehydratedComponents);
            }
            catch (InvalidProtocolBufferException e) {
                throw new IllegalStateException(String.format("Unable to hydrate side input windowing strategy %s.", windowingStrategyProto), e);
            }
            Coder coder = PipelineTranslatorUtils.instantiateCoder((String)collectionId, (RunnerApi.Components)components);
            WindowedValue.WindowedValueCoder wvCoder = (WindowedValue.WindowedValueCoder)coder;
            coder = wvCoder.withValueCoder((Coder)IterableCoder.of((Coder)wvCoder.getValueCoder()));
            sideInputs.put(sideInputId, (PCollectionView<?>)new RunnerPCollectionView(null, new TupleTag(sideInputTag), (ViewFn)viewFn, windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), windowingStrategy, coder));
        }
        return sideInputs;
    }

    private TransformedSideInputs transformSideInputs(RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components components, StreamingTranslationContext context) {
        LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputs = FlinkStreamingPortablePipelineTranslator.getSideInputIdToPCollectionViewMap(stagePayload, components);
        HashMap<TupleTag, Integer> tagToIntMapping = new HashMap<TupleTag, Integer>();
        HashMap intToViewMapping = new HashMap();
        ArrayList<WindowedValue.WindowedValueCoder> kvCoders = new ArrayList<WindowedValue.WindowedValueCoder>();
        ArrayList<WindowedValue.WindowedValueCoder> viewCoders = new ArrayList<WindowedValue.WindowedValueCoder>();
        int count = 0;
        for (Map.Entry<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInput : sideInputs.entrySet()) {
            TupleTag tag = sideInput.getValue().getTagInternal();
            intToViewMapping.put(count, sideInput.getValue());
            tagToIntMapping.put(tag, count);
            ++count;
            String collectionId = components.getTransformsOrThrow(sideInput.getKey().getTransformId()).getInputsOrThrow(sideInput.getKey().getLocalName());
            DataStream sideInputStream = context.getDataStreamOrThrow(collectionId);
            TypeInformation tpe = sideInputStream.getType();
            if (!(tpe instanceof CoderTypeInformation)) {
                throw new IllegalStateException("Input Stream TypeInformation is no CoderTypeInformation.");
            }
            WindowedValue.WindowedValueCoder coder = (WindowedValue.WindowedValueCoder)((CoderTypeInformation)tpe).getCoder();
            KvCoder kvCoder = KvCoder.of((Coder)VoidCoder.of(), (Coder)coder.getValueCoder());
            kvCoders.add(coder.withValueCoder((Coder)kvCoder));
            WindowedValue.WindowedValueCoder viewCoder = coder.withValueCoder((Coder)KvCoder.of((Coder)VoidCoder.of(), (Coder)IterableCoder.of((Coder)coder.getValueCoder())));
            viewCoders.add(viewCoder);
        }
        UnionCoder unionCoder = UnionCoder.of(viewCoders);
        CoderTypeInformation unionTypeInformation = new CoderTypeInformation(unionCoder);
        SingleOutputStreamOperator sideInputUnion = null;
        for (Map.Entry<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInput : sideInputs.entrySet()) {
            TupleTag tag = sideInput.getValue().getTagInternal();
            int intTag = (Integer)tagToIntMapping.get(tag);
            RunnerApi.PTransform pTransform = components.getTransformsOrThrow(sideInput.getKey().getTransformId());
            String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
            DataStream sideInputStream = context.getDataStreamOrThrow(collectionId);
            String viewName = sideInput.getKey().getTransformId() + "-" + sideInput.getKey().getLocalName();
            WindowedValue.WindowedValueCoder kvCoder = (WindowedValue.WindowedValueCoder)kvCoders.get(intTag);
            SingleOutputStreamOperator keyedSideInputStream = sideInputStream.map(new ToVoidKeyValue());
            SingleOutputStreamOperator viewStream = this.addGBK((DataStream)keyedSideInputStream, (WindowingStrategy<?, ?>)sideInput.getValue().getWindowingStrategyInternal(), (WindowedValue.WindowedValueCoder)kvCoder, viewName, context);
            viewStream.uid(pTransform.getUniqueName() + "-" + sideInput.getKey().getLocalName());
            SingleOutputStreamOperator unionValueStream = viewStream.map(new FlinkStreamingTransformTranslators.ToRawUnion(intTag)).returns(unionTypeInformation);
            if (sideInputUnion == null) {
                sideInputUnion = unionValueStream;
                continue;
            }
            sideInputUnion = sideInputUnion.union(new DataStream[]{unionValueStream});
        }
        return new TransformedSideInputs(intToViewMapping, (DataStream<RawUnionValue>)sideInputUnion);
    }

    private static class ToVoidKeyValue<T>
    implements MapFunction<WindowedValue<T>, WindowedValue<KV<Void, T>>> {
        private ToVoidKeyValue() {
        }

        public WindowedValue<KV<Void, T>> map(WindowedValue<T> value) {
            return value.withValue((Object)KV.of(null, (Object)value.getValue()));
        }
    }

    private static class TransformedSideInputs {
        final Map<Integer, PCollectionView<?>> unionTagToView;
        final DataStream<RawUnionValue> unionedSideInputs;

        TransformedSideInputs(Map<Integer, PCollectionView<?>> unionTagToView, DataStream<RawUnionValue> unionedSideInputs) {
            this.unionTagToView = unionTagToView;
            this.unionedSideInputs = unionedSideInputs;
        }
    }

    @AutoService(value=NativeTransforms.IsNativeTransform.class)
    public static class IsFlinkNativeTransform
    implements NativeTransforms.IsNativeTransform {
        public boolean test(RunnerApi.PTransform pTransform) {
            return FlinkStreamingPortablePipelineTranslator.STREAMING_IMPULSE_TRANSFORM_URN.equals(PTransformTranslation.urnForTransformOrNull((RunnerApi.PTransform)pTransform));
        }
    }

    private static class ReadUnboundedTranslator<T>
    implements ReadSourceTranslator<T> {
        private ReadUnboundedTranslator() {
        }

        @Override
        public DataStream<WindowedValue<T>> translateRead(String transformName, String outputCollectionId, RunnerApi.ReadPayload payload, RunnerApi.Pipeline pipeline, PipelineOptions pipelineOptions, StreamExecutionEnvironment env) {
            SingleOutputStreamOperator source;
            UnboundedSource unboundedSource;
            Coder windowCoder = PipelineTranslatorUtils.instantiateCoder((String)outputCollectionId, (RunnerApi.Components)pipeline.getComponents());
            CoderTypeInformation outputTypeInfo = new CoderTypeInformation(windowCoder);
            WindowingStrategy windowStrategy = PipelineTranslatorUtils.getWindowingStrategy((String)outputCollectionId, (RunnerApi.Components)pipeline.getComponents());
            CoderTypeInformation withIdTypeInfo = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)ValueWithRecordId.ValueWithRecordIdCoder.of((Coder)((WindowedValue.WindowedValueCoder)windowCoder).getValueCoder()), (Coder)windowStrategy.getWindowFn().windowCoder()));
            try {
                unboundedSource = ReadTranslation.unboundedSourceFromProto((RunnerApi.ReadPayload)payload);
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to extract UnboundedSource from payload", e);
            }
            try {
                int parallelism = env.getMaxParallelism() > 0 ? env.getMaxParallelism() : env.getParallelism();
                UnboundedSourceWrapper sourceWrapper = new UnboundedSourceWrapper(transformName, pipelineOptions, unboundedSource, parallelism);
                SingleOutputStreamOperator nonDedupSource = env.addSource(sourceWrapper).name(transformName).uid(transformName).returns(withIdTypeInfo);
                source = unboundedSource.requiresDeduping() ? nonDedupSource.keyBy(new FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector()).transform("deduping", outputTypeInfo, new DedupingOperator()).uid(String.format("%s/__deduplicated__", transformName)) : nonDedupSource.flatMap(new FlinkStreamingTransformTranslators.StripIdsMap()).returns(outputTypeInfo);
            }
            catch (Exception e) {
                throw new RuntimeException("Error while translating UnboundedSource: " + unboundedSource, e);
            }
            return source;
        }
    }

    private static class ReadBoundedTranslator<T>
    implements ReadSourceTranslator<T> {
        private ReadBoundedTranslator() {
        }

        @Override
        public DataStream<WindowedValue<T>> translateRead(String transformName, String outputCollectionId, RunnerApi.ReadPayload payload, RunnerApi.Pipeline pipeline, PipelineOptions pipelineOptions, StreamExecutionEnvironment env) {
            UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter convertedSource;
            Coder windowCoder = PipelineTranslatorUtils.instantiateCoder((String)outputCollectionId, (RunnerApi.Components)pipeline.getComponents());
            CoderTypeInformation outputTypeInfo = new CoderTypeInformation(windowCoder);
            try {
                convertedSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(ReadTranslation.boundedSourceFromProto((RunnerApi.ReadPayload)payload));
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to extract UnboundedSource from payload", e);
            }
            try {
                int parallelism = env.getMaxParallelism() > 0 ? env.getMaxParallelism() : env.getParallelism();
                FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId sourceWrapper = new FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId(new UnboundedSourceWrapper(transformName, pipelineOptions, convertedSource, parallelism));
                return env.addSource(sourceWrapper).name(transformName).uid(transformName).returns(outputTypeInfo);
            }
            catch (Exception e) {
                throw new RuntimeException("Error while translating BoundedSource: " + convertedSource, e);
            }
        }
    }

    static interface ReadSourceTranslator<T> {
        public DataStream<WindowedValue<T>> translateRead(String var1, String var2, RunnerApi.ReadPayload var3, RunnerApi.Pipeline var4, PipelineOptions var5, StreamExecutionEnvironment var6);
    }

    static interface PTransformTranslator<T> {
        public void translate(String var1, RunnerApi.Pipeline var2, T var3);
    }

    public static class StreamingTranslationContext
    implements FlinkPortablePipelineTranslator.TranslationContext,
    FlinkPortablePipelineTranslator.Executor {
        private final JobInfo jobInfo;
        private final FlinkPipelineOptions options;
        private final StreamExecutionEnvironment executionEnvironment;
        private final Map<String, DataStream<?>> dataStreams;

        private StreamingTranslationContext(JobInfo jobInfo, FlinkPipelineOptions options, StreamExecutionEnvironment executionEnvironment) {
            this.jobInfo = jobInfo;
            this.options = options;
            this.executionEnvironment = executionEnvironment;
            this.dataStreams = new HashMap();
        }

        @Override
        public JobInfo getJobInfo() {
            return this.jobInfo;
        }

        @Override
        public FlinkPipelineOptions getPipelineOptions() {
            return this.options;
        }

        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
            return this.getExecutionEnvironment().execute(jobName);
        }

        public StreamExecutionEnvironment getExecutionEnvironment() {
            return this.executionEnvironment;
        }

        public <T> void addDataStream(String pCollectionId, DataStream<T> dataStream) {
            this.dataStreams.put(pCollectionId, dataStream);
        }

        public <T> DataStream<T> getDataStreamOrThrow(String pCollectionId) {
            DataStream<?> dataSet = this.dataStreams.get(pCollectionId);
            if (dataSet == null) {
                throw new IllegalArgumentException(String.format("Unknown datastream for id %s.", pCollectionId));
            }
            return dataSet;
        }
    }
}

