/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.FlatMapFunction;

public class DoFnFunction<InputT, OutputT>
implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> {
    private final Accumulator<NamedAggregators> aggregatorsAccum;
    private final Accumulator<SparkMetricsContainer> metricsAccum;
    private final String stepName;
    private final DoFn<InputT, OutputT> doFn;
    private final SparkRuntimeContext runtimeContext;
    private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
    private final WindowingStrategy<?, ?> windowingStrategy;

    public DoFnFunction(Accumulator<NamedAggregators> aggregatorsAccum, Accumulator<SparkMetricsContainer> metricsAccum, String stepName, DoFn<InputT, OutputT> doFn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) {
        this.aggregatorsAccum = aggregatorsAccum;
        this.metricsAccum = metricsAccum;
        this.stepName = stepName;
        this.doFn = doFn;
        this.runtimeContext = runtimeContext;
        this.sideInputs = sideInputs;
        this.windowingStrategy = windowingStrategy;
    }

    public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> iter) throws Exception {
        DoFnOutputManager outputManager = new DoFnOutputManager();
        DoFnRunner doFnRunner = DoFnRunners.simpleRunner((PipelineOptions)this.runtimeContext.getPipelineOptions(), this.doFn, (SideInputReader)new SparkSideInputReader(this.sideInputs), (DoFnRunners.OutputManager)outputManager, (TupleTag)new TupleTag<OutputT>(){}, Collections.emptyList(), (ExecutionContext.StepContext)new SparkProcessContext.NoOpStepContext(), (AggregatorFactory)new SparkAggregators.Factory(this.runtimeContext, this.aggregatorsAccum), this.windowingStrategy);
        DoFnRunnerWithMetrics doFnRunnerWithMetrics = new DoFnRunnerWithMetrics(this.stepName, doFnRunner, this.metricsAccum);
        return new SparkProcessContext(this.doFn, doFnRunnerWithMetrics, outputManager).processPartition(iter);
    }

    private class DoFnOutputManager
    implements SparkProcessContext.SparkOutputManager<WindowedValue<OutputT>> {
        private final List<WindowedValue<OutputT>> outputs = new LinkedList();

        private DoFnOutputManager() {
        }

        @Override
        public void clear() {
            this.outputs.clear();
        }

        @Override
        public Iterator<WindowedValue<OutputT>> iterator() {
            return this.outputs.iterator();
        }

        public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            this.outputs.add(output);
        }
    }
}

