/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.functions.FlinkNoOpStepContext;
import org.apache.beam.runners.flink.translation.functions.FlinkSideInputReader;
import org.apache.beam.runners.flink.translation.utils.FlinkClassloading;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;

public class FlinkDoFnFunction<InputT, OutputT>
extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
    private final SerializablePipelineOptions serializedOptions;
    private final DoFn<InputT, OutputT> doFn;
    private final String stepName;
    private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final Map<TupleTag<?>, Integer> outputMap;
    private final TupleTag<OutputT> mainOutputTag;
    private final Coder<InputT> inputCoder;
    private final Map<TupleTag<?>, Coder<?>> outputCoderMap;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private transient DoFnInvoker<InputT, OutputT> doFnInvoker;

    public FlinkDoFnFunction(DoFn<InputT, OutputT> doFn, String stepName, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap, TupleTag<OutputT> mainOutputTag, Coder<InputT> inputCoder, Map<TupleTag<?>, Coder<?>> outputCoderMap, DoFnSchemaInformation doFnSchemaInformation) {
        this.doFn = doFn;
        this.stepName = stepName;
        this.sideInputs = sideInputs;
        this.serializedOptions = new SerializablePipelineOptions(options);
        this.windowingStrategy = windowingStrategy;
        this.outputMap = outputMap;
        this.mainOutputTag = mainOutputTag;
        this.inputCoder = inputCoder;
        this.outputCoderMap = outputCoderMap;
        this.doFnSchemaInformation = doFnSchemaInformation;
    }

    public void mapPartition(Iterable<WindowedValue<InputT>> values, Collector<WindowedValue<OutputT>> out) throws Exception {
        RuntimeContext runtimeContext = this.getRuntimeContext();
        Object outputManager = this.outputMap.size() == 1 ? new DoFnOutputManager(out) : new MultiDoFnOutputManager(out, this.outputMap);
        ArrayList additionalOutputTags = Lists.newArrayList(this.outputMap.keySet());
        DoFnRunnerWithMetricsUpdate doFnRunner = DoFnRunners.simpleRunner((PipelineOptions)this.serializedOptions.get(), this.doFn, (SideInputReader)new FlinkSideInputReader(this.sideInputs, runtimeContext), (DoFnRunners.OutputManager)outputManager, this.mainOutputTag, (List)additionalOutputTags, (StepContext)new FlinkNoOpStepContext(), this.inputCoder, this.outputCoderMap, this.windowingStrategy, (DoFnSchemaInformation)this.doFnSchemaInformation);
        if (((FlinkPipelineOptions)this.serializedOptions.get().as(FlinkPipelineOptions.class)).getEnableMetrics().booleanValue()) {
            doFnRunner = new DoFnRunnerWithMetricsUpdate(this.stepName, doFnRunner, this.getRuntimeContext());
        }
        doFnRunner.startBundle();
        for (WindowedValue<InputT> value : values) {
            doFnRunner.processElement(value);
        }
        doFnRunner.finishBundle();
    }

    public void open(Configuration parameters) throws Exception {
        this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(this.doFn);
    }

    public void close() throws Exception {
        try {
            Optional.ofNullable(this.doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown);
        }
        finally {
            FlinkClassloading.deleteStaticCaches();
        }
    }

    static class MultiDoFnOutputManager
    implements DoFnRunners.OutputManager {
        private Collector<WindowedValue<RawUnionValue>> collector;
        private Map<TupleTag<?>, Integer> outputMap;

        MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector, Map<TupleTag<?>, Integer> outputMap) {
            this.collector = collector;
            this.outputMap = outputMap;
        }

        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            this.collector.collect((Object)WindowedValue.of((Object)new RawUnionValue(this.outputMap.get(tag).intValue(), output.getValue()), (Instant)output.getTimestamp(), (Collection)output.getWindows(), (PaneInfo)output.getPane()));
        }
    }

    static class DoFnOutputManager
    implements DoFnRunners.OutputManager {
        private Collector collector;

        DoFnOutputManager(Collector collector) {
            this.collector = collector;
        }

        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            this.collector.collect((Object)WindowedValue.of((Object)new RawUnionValue(0, output.getValue()), (Instant)output.getTimestamp(), (Collection)output.getWindows(), (PaneInfo)output.getPane()));
        }
    }
}

