/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.transforms;

import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Maps;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.DoFnReflector;
import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.DirectModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.DirectSideInputReader;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.DoFnRunnerBase;
import com.google.cloud.dataflow.sdk.util.DoFnRunners;
import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
import com.google.cloud.dataflow.sdk.util.MutationDetector;
import com.google.cloud.dataflow.sdk.util.MutationDetectors;
import com.google.cloud.dataflow.sdk.util.PTuple;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.StringUtils;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
import com.google.cloud.dataflow.sdk.values.TypedPValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;

public class ParDo {
    public static Unbound named(String name) {
        return new Unbound().named(name);
    }

    public static Unbound withSideInputs(PCollectionView<?> ... sideInputs) {
        return new Unbound().withSideInputs(sideInputs);
    }

    public static Unbound withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
        return new Unbound().withSideInputs(sideInputs);
    }

    public static <OutputT> UnboundMulti<OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
        return new Unbound().withOutputTags(mainOutputTag, sideOutputTags);
    }

    public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
        return ParDo.of(fn, fn.getClass());
    }

    private static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn, Class<?> fnClass) {
        return new Unbound().of(fn, fnClass);
    }

    private static <InputT, OutputT> DoFn<InputT, OutputT> adapt(DoFnWithContext<InputT, OutputT> fn) {
        return DoFnReflector.of(fn.getClass()).toDoFn(fn);
    }

    @Experimental
    public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
        return ParDo.of(ParDo.adapt(fn), fn.getClass());
    }

    private static <InputT, OutputT> void evaluateSingleHelper(Bound<InputT, OutputT> transform, DirectPipelineRunner.EvaluationContext context) {
        TupleTag mainOutputTag = new TupleTag("out");
        DirectModeExecutionContext executionContext = DirectModeExecutionContext.create();
        PCollectionTuple outputs = PCollectionTuple.of(mainOutputTag, (PCollection)context.getOutput(transform));
        ParDo.evaluateHelper(((Bound)transform).fn, context.getStepName(transform), (PCollection)context.getInput(transform), ((Bound)transform).sideInputs, mainOutputTag, Collections.emptyList(), outputs, context, executionContext);
        context.setPCollectionValuesWithMetadata((PCollection)context.getOutput(transform), executionContext.getOutput(mainOutputTag));
    }

    private static <InputT, OutputT> void evaluateMultiHelper(BoundMulti<InputT, OutputT> transform, DirectPipelineRunner.EvaluationContext context) {
        DirectModeExecutionContext executionContext = DirectModeExecutionContext.create();
        ParDo.evaluateHelper(((BoundMulti)transform).fn, context.getStepName(transform), (PCollection)context.getInput(transform), ((BoundMulti)transform).sideInputs, ((BoundMulti)transform).mainOutputTag, ((BoundMulti)transform).sideOutputTags.getAll(), (PCollectionTuple)context.getOutput(transform), context, executionContext);
        for (Map.Entry<TupleTag<?>, PCollection<?>> entry : ((PCollectionTuple)context.getOutput(transform)).getAll().entrySet()) {
            TupleTag<?> tag = entry.getKey();
            PCollection<?> pc = entry.getValue();
            context.setPCollectionValuesWithMetadata(pc, tag == ((BoundMulti)transform).mainOutputTag ? executionContext.getOutput(tag) : executionContext.getSideOutput(tag));
        }
    }

    private static <InputT, OutputT, ActualInputT extends InputT> void evaluateHelper(DoFn<InputT, OutputT> doFn, String stepName, PCollection<ActualInputT> input, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, PCollectionTuple outputs, DirectPipelineRunner.EvaluationContext context, DirectModeExecutionContext executionContext) {
        DoFn<InputT, OutputT> fn = context.ensureSerializable(doFn);
        SideInputReader sideInputReader = ParDo.makeSideInputReader(context, sideInputs);
        ImmutabilityCheckingOutputManager outputManager = new ImmutabilityCheckingOutputManager(fn.getClass().getSimpleName(), new DoFnRunnerBase.ListOutputManager(), outputs);
        DoFnRunner<ActualInputT, OutputT> fnRunner = DoFnRunners.createDefault(context.getPipelineOptions(), fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, executionContext.getOrCreateStepContext(stepName, stepName, null), context.getAddCounterMutator(), input.getWindowingStrategy());
        fnRunner.startBundle();
        for (DirectPipelineRunner.ValueWithMetadata<ActualInputT> elem : context.getPCollectionValuesWithMetadata(input)) {
            if (elem.getValue() instanceof KV) {
                KV kvElem = (KV)elem.getValue();
                executionContext.setKey(kvElem.getKey());
            } else {
                executionContext.setKey(elem.getKey());
            }
            try {
                MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder(elem.getWindowedValue().getValue(), input.getCoder());
                WindowedValue<ActualInputT> windowedElem = elem.getWindowedValue();
                fnRunner.processElement(windowedElem);
                inputMutationDetector.verifyUnmodified();
            }
            catch (CoderException e) {
                throw UserCodeException.wrap(e);
            }
            catch (IllegalMutationException exn) {
                throw new IllegalMutationException(String.format("DoFn %s mutated input value %s of class %s (new value was %s). Input values must not be mutated in any way.", fn.getClass().getSimpleName(), exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()), exn.getSavedValue(), exn.getNewValue(), exn);
            }
        }
        fnRunner.finishBundle();
        outputManager.verifyLatestOutputsUnmodified();
    }

    private static SideInputReader makeSideInputReader(DirectPipelineRunner.EvaluationContext context, List<PCollectionView<?>> sideInputs) {
        PTuple sideInputValues = PTuple.empty();
        for (PCollectionView<?> view : sideInputs) {
            sideInputValues = sideInputValues.and(view.getTagInternal(), context.getPCollectionView(view));
        }
        return DirectSideInputReader.of(sideInputValues);
    }

    private static void populateDisplayData(DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) {
        builder.include(fn).add(DisplayData.item("fn", fnClass).withLabel("Transform Function"));
    }

    static {
        DirectPipelineRunner.registerDefaultTransformEvaluator(Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>(){

            @Override
            public void evaluate(Bound transform, DirectPipelineRunner.EvaluationContext context) {
                ParDo.evaluateSingleHelper(transform, context);
            }
        });
        DirectPipelineRunner.registerDefaultTransformEvaluator(BoundMulti.class, new DirectPipelineRunner.TransformEvaluator<BoundMulti>(){

            @Override
            public void evaluate(BoundMulti transform, DirectPipelineRunner.EvaluationContext context) {
                ParDo.evaluateMultiHelper(transform, context);
            }
        });
    }

    private static class ImmutabilityCheckingOutputManager<InputT>
    implements DoFnRunners.OutputManager,
    AutoCloseable {
        private final DoFnRunners.OutputManager underlyingOutputManager;
        private final ConcurrentMap<TupleTag<?>, MutationDetector> mutationDetectorForTag;
        private final PCollectionTuple outputs;
        private String doFnName;

        public ImmutabilityCheckingOutputManager(String doFnName, DoFnRunners.OutputManager underlyingOutputManager, PCollectionTuple outputs) {
            this.doFnName = doFnName;
            this.underlyingOutputManager = underlyingOutputManager;
            this.outputs = outputs;
            this.mutationDetectorForTag = Maps.newConcurrentMap();
        }

        @Override
        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            if (this.outputs.has(tag)) {
                try {
                    MutationDetector newDetector = MutationDetectors.forValueWithCoder(output.getValue(), this.outputs.get(tag).getCoder());
                    MutationDetector priorDetector = this.mutationDetectorForTag.put(tag, newDetector);
                    this.verifyOutputUnmodified(priorDetector);
                }
                catch (CoderException e) {
                    throw UserCodeException.wrap(e);
                }
            }
            this.underlyingOutputManager.output(tag, output);
        }

        public void verifyLatestOutputsUnmodified() {
            for (MutationDetector detector : this.mutationDetectorForTag.values()) {
                this.verifyOutputUnmodified(detector);
            }
        }

        private <T> void verifyOutputUnmodified(@Nullable MutationDetector detector) {
            if (detector == null) {
                return;
            }
            try {
                detector.verifyUnmodified();
            }
            catch (IllegalMutationException exn) {
                throw new IllegalMutationException(String.format("DoFn %s mutated value %s after it was output (new value was %s). Values must not be mutated in any way after being output.", this.doFnName, exn.getSavedValue(), exn.getNewValue()), exn.getSavedValue(), exn.getNewValue(), exn);
            }
        }

        @Override
        public void close() {
            this.verifyLatestOutputsUnmodified();
        }
    }

    public static class BoundMulti<InputT, OutputT>
    extends PTransform<PCollection<? extends InputT>, PCollectionTuple> {
        private final List<PCollectionView<?>> sideInputs;
        private final TupleTag<OutputT> mainOutputTag;
        private final TupleTagList sideOutputTags;
        private final DoFn<InputT, OutputT> fn;
        private final Class<?> fnClass;

        BoundMulti(String name, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags, DoFn<InputT, OutputT> fn, Class<?> fnClass) {
            super(name);
            this.sideInputs = sideInputs;
            this.mainOutputTag = mainOutputTag;
            this.sideOutputTags = sideOutputTags;
            this.fn = SerializableUtils.clone(fn);
            this.fnClass = fnClass;
        }

        public BoundMulti<InputT, OutputT> named(String name) {
            return new BoundMulti<InputT, OutputT>(name, this.sideInputs, this.mainOutputTag, this.sideOutputTags, this.fn, this.fnClass);
        }

        public BoundMulti<InputT, OutputT> withSideInputs(PCollectionView<?> ... sideInputs) {
            return this.withSideInputs(Arrays.asList(sideInputs));
        }

        public BoundMulti<InputT, OutputT> withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.addAll(this.sideInputs);
            builder.addAll(sideInputs);
            return new BoundMulti<InputT, OutputT>(this.name, (List<PCollectionView<?>>)((Object)builder.build()), this.mainOutputTag, this.sideOutputTags, this.fn, this.fnClass);
        }

        @Override
        public PCollectionTuple apply(PCollection<? extends InputT> input) {
            PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(input.getPipeline(), TupleTagList.of(this.mainOutputTag).and(this.sideOutputTags.getAll()), input.getWindowingStrategy(), input.isBounded());
            outputs.get(this.mainOutputTag).setTypeDescriptorInternal((TypeDescriptor)this.fn.getOutputTypeDescriptor());
            return outputs;
        }

        @Override
        protected Coder<OutputT> getDefaultOutputCoder() {
            throw new RuntimeException("internal error: shouldn't be calling this on a multi-output ParDo");
        }

        @Override
        public <T> Coder<T> getDefaultOutputCoder(PCollection<? extends InputT> input, TypedPValue<T> output) throws CannotProvideCoderException {
            Coder<? extends InputT> inputCoder = input.getCoder();
            return input.getPipeline().getCoderRegistry().getDefaultCoder(output.getTypeDescriptor(), this.fn.getInputTypeDescriptor(), inputCoder);
        }

        @Override
        protected String getKindString() {
            Class<?> clazz = DoFnReflector.getDoFnClass(this.fn);
            if (this.fn.getClass().isAnonymousClass()) {
                return "AnonymousParMultiDo";
            }
            return String.format("ParMultiDo(%s)", StringUtils.approximateSimpleName(clazz));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            ParDo.populateDisplayData(builder, this.fn, this.fnClass);
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public TupleTag<OutputT> getMainOutputTag() {
            return this.mainOutputTag;
        }

        public TupleTagList getSideOutputTags() {
            return this.sideOutputTags;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }
    }

    public static class UnboundMulti<OutputT> {
        private final String name;
        private final List<PCollectionView<?>> sideInputs;
        private final TupleTag<OutputT> mainOutputTag;
        private final TupleTagList sideOutputTags;

        UnboundMulti(String name, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
            this.name = name;
            this.sideInputs = sideInputs;
            this.mainOutputTag = mainOutputTag;
            this.sideOutputTags = sideOutputTags;
        }

        public UnboundMulti<OutputT> named(String name) {
            return new UnboundMulti<OutputT>(name, this.sideInputs, this.mainOutputTag, this.sideOutputTags);
        }

        public UnboundMulti<OutputT> withSideInputs(PCollectionView<?> ... sideInputs) {
            return this.withSideInputs(Arrays.asList(sideInputs));
        }

        public UnboundMulti<OutputT> withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.addAll(this.sideInputs);
            builder.addAll(sideInputs);
            return new UnboundMulti<OutputT>(this.name, (List<PCollectionView<?>>)((Object)builder.build()), this.mainOutputTag, this.sideOutputTags);
        }

        public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
            return this.of(fn, fn.getClass());
        }

        public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn, Class<?> fnClass) {
            return new BoundMulti<InputT, OutputT>(this.name, this.sideInputs, this.mainOutputTag, this.sideOutputTags, fn, fnClass);
        }

        public <InputT> BoundMulti<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
            return this.of(ParDo.adapt(fn), fn.getClass());
        }
    }

    public static class Bound<InputT, OutputT>
    extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
        private final List<PCollectionView<?>> sideInputs;
        private final DoFn<InputT, OutputT> fn;
        private final Class<?> fnClass;

        Bound(String name, List<PCollectionView<?>> sideInputs, DoFn<InputT, OutputT> fn, Class<?> fnClass) {
            super(name);
            this.sideInputs = sideInputs;
            this.fn = SerializableUtils.clone(fn);
            this.fnClass = fnClass;
        }

        public Bound<InputT, OutputT> named(String name) {
            return new Bound<InputT, OutputT>(name, this.sideInputs, this.fn, this.fnClass);
        }

        public Bound<InputT, OutputT> withSideInputs(PCollectionView<?> ... sideInputs) {
            return this.withSideInputs(Arrays.asList(sideInputs));
        }

        public Bound<InputT, OutputT> withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.addAll(this.sideInputs);
            builder.addAll(sideInputs);
            return new Bound<InputT, OutputT>(this.name, (List<PCollectionView<?>>)((Object)builder.build()), this.fn, this.fnClass);
        }

        public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
            return new BoundMulti<InputT, OutputT>(this.name, this.sideInputs, mainOutputTag, sideOutputTags, this.fn, this.fnClass);
        }

        @Override
        public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
            return PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded()).setTypeDescriptorInternal((TypeDescriptor)this.fn.getOutputTypeDescriptor());
        }

        @Override
        protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input) throws CannotProvideCoderException {
            return input.getPipeline().getCoderRegistry().getDefaultCoder(this.fn.getOutputTypeDescriptor(), this.fn.getInputTypeDescriptor(), input.getCoder());
        }

        @Override
        protected String getKindString() {
            Class<?> clazz = DoFnReflector.getDoFnClass(this.fn);
            if (clazz.isAnonymousClass()) {
                return "AnonymousParDo";
            }
            return String.format("ParDo(%s)", StringUtils.approximateSimpleName(clazz));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            ParDo.populateDisplayData(builder, this.fn, this.fnClass);
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }
    }

    public static class Unbound {
        private final String name;
        private final List<PCollectionView<?>> sideInputs;

        Unbound() {
            this(null, ImmutableList.of());
        }

        Unbound(String name, List<PCollectionView<?>> sideInputs) {
            this.name = name;
            this.sideInputs = sideInputs;
        }

        public Unbound named(String name) {
            return new Unbound(name, this.sideInputs);
        }

        public Unbound withSideInputs(PCollectionView<?> ... sideInputs) {
            return this.withSideInputs(Arrays.asList(sideInputs));
        }

        public Unbound withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.addAll(this.sideInputs);
            builder.addAll(sideInputs);
            return new Unbound(this.name, (List<PCollectionView<?>>)((Object)builder.build()));
        }

        public <OutputT> UnboundMulti<OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
            return new UnboundMulti<OutputT>(this.name, this.sideInputs, mainOutputTag, sideOutputTags);
        }

        public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
            return this.of(fn, fn.getClass());
        }

        private <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn, Class<?> fnClass) {
            return new Bound<InputT, OutputT>(this.name, this.sideInputs, fn, fnClass);
        }

        public <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
            return this.of(ParDo.adapt(fn), fn.getClass());
        }
    }
}

