/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ExecutableStageTranslation;
import org.apache.beam.runners.core.construction.NativeTransforms;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.flink.FlinkExecutionEnvironments;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContextFactory;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.KvKeySelector;
import org.apache.beam.runners.flink.translation.utils.FlinkPortableRunnerUtils;
import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.Grouping;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.checkerframework.checker.nullness.qual.Nullable;

public class FlinkBatchPortablePipelineTranslator
implements FlinkPortablePipelineTranslator<BatchTranslationContext> {
    private final Map<String, PTransformTranslator> urnToTransformTranslator;

    @Override
    public BatchTranslationContext createTranslationContext(JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, @Nullable String confDir, List<String> filesToStage) {
        ExecutionEnvironment executionEnvironment = FlinkExecutionEnvironments.createBatchExecutionEnvironment(pipelineOptions, filesToStage, confDir);
        return new BatchTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
    }

    public static FlinkBatchPortablePipelineTranslator createTranslator() {
        ImmutableMap.Builder translatorMap = ImmutableMap.builder();
        translatorMap.put((Object)"beam:transform:flatten:v1", FlinkBatchPortablePipelineTranslator::translateFlatten);
        translatorMap.put((Object)"beam:transform:group_by_key:v1", FlinkBatchPortablePipelineTranslator::translateGroupByKey);
        translatorMap.put((Object)"beam:transform:impulse:v1", FlinkBatchPortablePipelineTranslator::translateImpulse);
        translatorMap.put((Object)"beam:runner:executable_stage:v1", FlinkBatchPortablePipelineTranslator::translateExecutableStage);
        translatorMap.put((Object)"beam:transform:reshuffle:v1", FlinkBatchPortablePipelineTranslator::translateReshuffle);
        return new FlinkBatchPortablePipelineTranslator((Map<String, PTransformTranslator>)translatorMap.build());
    }

    private FlinkBatchPortablePipelineTranslator(Map<String, PTransformTranslator> urnToTransformTranslator) {
        this.urnToTransformTranslator = urnToTransformTranslator;
    }

    @Override
    public Set<String> knownUrns() {
        return this.urnToTransformTranslator.keySet();
    }

    @Override
    public FlinkPortablePipelineTranslator.Executor translate(BatchTranslationContext context, RunnerApi.Pipeline pipeline) {
        QueryablePipeline p = QueryablePipeline.forTransforms((Collection)pipeline.getRootTransformIdsList(), (RunnerApi.Components)pipeline.getComponents());
        for (PipelineNode.PTransformNode pTransformNode : p.getTopologicallyOrderedTransforms()) {
            this.urnToTransformTranslator.getOrDefault(pTransformNode.getTransform().getSpec().getUrn(), FlinkBatchPortablePipelineTranslator::urnNotFound).translate(pTransformNode, pipeline, context);
        }
        for (DataSet dataSet : context.getDanglingDataSets()) {
            dataSet.output((OutputFormat)new DiscardingOutputFormat()).name("DiscardingOutput");
        }
        return context;
    }

    private static <K, V> void translateReshuffle(PipelineNode.PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
        DataSet inputDataSet = context.getDataSetOrThrow((String)Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
        context.addDataSet((String)Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), inputDataSet.rebalance());
    }

    private static <InputT> void translateExecutableStage(PipelineNode.PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
        MapPartitionOperator taggedDataset;
        RunnerApi.ExecutableStagePayload stagePayload;
        RunnerApi.Components components = pipeline.getComponents();
        Map outputs = transform.getTransform().getOutputsMap();
        BiMap outputMap = PipelineTranslatorUtils.createOutputMap(outputs.values());
        ArrayList unionCoders = Lists.newArrayList();
        HashMap outputCoders = Maps.newHashMap();
        for (String collectionId : new TreeMap(outputMap.inverse()).values()) {
            Coder coder;
            PipelineNode.PCollectionNode collectionNode = PipelineNode.pCollection((String)collectionId, (RunnerApi.PCollection)components.getPcollectionsOrThrow(collectionId));
            try {
                coder = WireCoders.instantiateRunnerWireCoder((PipelineNode.PCollectionNode)collectionNode, (RunnerApi.Components)components);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            outputCoders.put(collectionId, coder);
            unionCoders.add(coder);
        }
        UnionCoder unionCoder = UnionCoder.of((List)unionCoders);
        CoderTypeInformation typeInformation = new CoderTypeInformation(unionCoder, context.getPipelineOptions());
        try {
            stagePayload = RunnerApi.ExecutableStagePayload.parseFrom((ByteString)transform.getTransform().getSpec().getPayload());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        String inputPCollectionId = stagePayload.getInput();
        Coder windowedInputCoder = PipelineTranslatorUtils.instantiateCoder((String)inputPCollectionId, (RunnerApi.Components)components);
        DataSet inputDataSet = context.getDataSetOrThrow(inputPCollectionId);
        FlinkExecutableStageFunction function = new FlinkExecutableStageFunction(transform.getTransform().getUniqueName(), context.getPipelineOptions(), stagePayload, context.getJobInfo(), (Map<String, Integer>)outputMap, FlinkExecutableStageContextFactory.getInstance(), PipelineTranslatorUtils.getWindowingStrategy((String)inputPCollectionId, (RunnerApi.Components)components).getWindowFn().windowCoder(), windowedInputCoder);
        String operatorName = ExecutableStageTranslation.generateNameFromStagePayload((RunnerApi.ExecutableStagePayload)stagePayload);
        if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() > 0) {
            Coder valueCoder = ((WindowedValue.FullWindowedValueCoder)windowedInputCoder).getValueCoder();
            if (!(valueCoder instanceof KvCoder)) {
                throw new IllegalStateException(String.format(Locale.ENGLISH, "The element coder for stateful DoFn '%s' must be KvCoder but is: %s", inputPCollectionId, valueCoder.getClass().getSimpleName()));
            }
            Coder keyCoder = ((KvCoder)valueCoder).getKeyCoder();
            UnsortedGrouping groupedInput = inputDataSet.groupBy(new KvKeySelector(keyCoder));
            boolean requiresTimeSortedInput = FlinkPortableRunnerUtils.requiresTimeSortedInput(stagePayload, false);
            if (requiresTimeSortedInput) {
                groupedInput = groupedInput.sortGroup(WindowedValue::getTimestamp, Order.ASCENDING);
            }
            taggedDataset = new GroupReduceOperator((Grouping)groupedInput, typeInformation, function, operatorName);
        } else {
            taggedDataset = new MapPartitionOperator(inputDataSet, typeInformation, function, operatorName);
        }
        for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : stagePayload.getSideInputsList()) {
            String collectionId = stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()).getInputsOrThrow(sideInputId.getLocalName());
            taggedDataset.withBroadcastSet(context.getDataSetOrThrow(collectionId), collectionId);
        }
        for (String collectionId : outputs.values()) {
            FlinkBatchPortablePipelineTranslator.pruneOutput((DataSet<RawUnionValue>)taggedDataset, context, (Integer)outputMap.get((Object)collectionId), (Coder)outputCoders.get(collectionId), transform.getTransform().getUniqueName(), collectionId);
        }
        if (outputs.isEmpty()) {
            taggedDataset.output((OutputFormat)new DiscardingOutputFormat()).name("DiscardingOutput");
        }
    }

    private static <T> void translateFlatten(PipelineNode.PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
        Map allInputs = transform.getTransform().getInputsMap();
        UnionOperator result = null;
        if (allInputs.isEmpty()) {
            DataSource dummySource = context.getExecutionEnvironment().fromElements((Object[])new String[]{"dummy"});
            result = dummySource.flatMap((FlatMapFunction & Serializable)(s, collector) -> {}).returns(new CoderTypeInformation(WindowedValue.getFullCoder((Coder)VoidCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), context.getPipelineOptions()));
        } else {
            for (String pCollectionId : allInputs.values()) {
                UnionOperator current = context.getDataSetOrThrow(pCollectionId);
                if (result == null) {
                    result = current;
                    continue;
                }
                result = result.union(current);
            }
        }
        result = result.filter((FilterFunction & Serializable)tWindowedValue -> true).name("UnionFixFilter");
        context.addDataSet((String)Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), result);
    }

    private static <K, V> void translateGroupByKey(PipelineNode.PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
        WindowedValue.WindowedValueCoder inputCoder;
        WindowingStrategy windowingStrategy;
        RunnerApi.Components components = pipeline.getComponents();
        String inputPCollectionId = (String)Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
        PipelineNode.PCollectionNode inputCollection = PipelineNode.pCollection((String)inputPCollectionId, (RunnerApi.PCollection)components.getPcollectionsOrThrow(inputPCollectionId));
        DataSet inputDataSet = context.getDataSetOrThrow(inputPCollectionId);
        RunnerApi.WindowingStrategy windowingStrategyProto = pipeline.getComponents().getWindowingStrategiesOrThrow(pipeline.getComponents().getPcollectionsOrThrow(inputPCollectionId).getWindowingStrategyId());
        RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)pipeline.getComponents());
        try {
            windowingStrategy = WindowingStrategyTranslation.fromProto((RunnerApi.WindowingStrategy)windowingStrategyProto, (RehydratedComponents)rehydratedComponents);
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalStateException(String.format("Unable to hydrate GroupByKey windowing strategy %s.", windowingStrategyProto), e);
        }
        try {
            inputCoder = (WindowedValue.WindowedValueCoder)WireCoders.instantiateRunnerWireCoder((PipelineNode.PCollectionNode)inputCollection, (RunnerApi.Components)pipeline.getComponents());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        KvCoder inputElementCoder = (KvCoder)inputCoder.getValueCoder();
        Concatenate combineFn = new Concatenate();
        Coder accumulatorCoder = combineFn.getAccumulatorCoder(CoderRegistry.createDefault(), inputElementCoder.getValueCoder());
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.getFullCoder((Coder)KvCoder.of((Coder)inputElementCoder.getKeyCoder(), accumulatorCoder), (Coder)windowingStrategy.getWindowFn().windowCoder());
        CoderTypeInformation partialReduceTypeInfo = new CoderTypeInformation(outputCoder, context.getPipelineOptions());
        UnsortedGrouping inputGrouping = inputDataSet.groupBy(new KvKeySelector(inputElementCoder.getKeyCoder()));
        FlinkPartialReduceFunction partialReduceFunction = new FlinkPartialReduceFunction(combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions());
        FlinkReduceFunction reduceFunction = new FlinkReduceFunction(combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions());
        GroupCombineOperator groupCombine = new GroupCombineOperator((Grouping)inputGrouping, partialReduceTypeInfo, partialReduceFunction, "GroupCombine: " + transform.getTransform().getUniqueName());
        UnsortedGrouping intermediateGrouping = groupCombine.groupBy(new KvKeySelector(inputElementCoder.getKeyCoder()));
        GroupReduceOperator outputDataSet = new GroupReduceOperator((Grouping)intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getTransform().getUniqueName());
        context.addDataSet((String)Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), outputDataSet);
    }

    private static void translateImpulse(PipelineNode.PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
        CoderTypeInformation typeInformation = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)ByteArrayCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), context.getPipelineOptions());
        DataSource dataSource = (DataSource)new DataSource(context.getExecutionEnvironment(), (InputFormat)new ImpulseInputFormat(), typeInformation, transform.getTransform().getUniqueName()).name("Impulse");
        context.addDataSet((String)Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), dataSource);
    }

    private static void urnNotFound(PipelineNode.PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) {
        throw new IllegalArgumentException(String.format("Unknown type of URN %s for PTransform with id %s.", transform.getTransform().getSpec().getUrn(), transform.getId()));
    }

    private static void pruneOutput(DataSet<RawUnionValue> taggedDataset, BatchTranslationContext context, int unionTag, Coder<WindowedValue<?>> outputCoder, String transformName, String collectionId) {
        CoderTypeInformation outputType = new CoderTypeInformation(outputCoder, context.getPipelineOptions());
        FlinkExecutableStagePruningFunction pruningFunction = new FlinkExecutableStagePruningFunction(unionTag, context.getPipelineOptions());
        FlatMapOperator pruningOperator = new FlatMapOperator(taggedDataset, outputType, (FlatMapFunction)pruningFunction, String.format("ExtractOutput[%s]", unionTag));
        context.addDataSet(collectionId, pruningOperator);
    }

    private static class Concatenate<T>
    extends Combine.CombineFn<T, List<T>, List<T>> {
        private Concatenate() {
        }

        public List<T> createAccumulator() {
            return new ArrayList();
        }

        public List<T> addInput(List<T> accumulator, T input) {
            accumulator.add(input);
            return accumulator;
        }

        public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
            Object result = this.createAccumulator();
            for (List<T> accumulator : accumulators) {
                result.addAll(accumulator);
            }
            return result;
        }

        public List<T> extractOutput(List<T> accumulator) {
            return accumulator;
        }

        public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
            return ListCoder.of(inputCoder);
        }

        public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
            return ListCoder.of(inputCoder);
        }
    }

    public static class IsFlinkNativeTransform
    implements NativeTransforms.IsNativeTransform {
        public boolean test(RunnerApi.PTransform pTransform) {
            return "beam:transform:reshuffle:v1".equals(PTransformTranslation.urnForTransformOrNull((RunnerApi.PTransform)pTransform));
        }
    }

    @FunctionalInterface
    private static interface PTransformTranslator {
        public void translate(PipelineNode.PTransformNode var1, RunnerApi.Pipeline var2, BatchTranslationContext var3);
    }

    public static class BatchTranslationContext
    implements FlinkPortablePipelineTranslator.TranslationContext,
    FlinkPortablePipelineTranslator.Executor {
        private final JobInfo jobInfo;
        private final FlinkPipelineOptions options;
        private final ExecutionEnvironment executionEnvironment;
        private final Map<String, DataSet<?>> dataSets;
        private final Set<String> danglingDataSets;

        private BatchTranslationContext(JobInfo jobInfo, FlinkPipelineOptions options, ExecutionEnvironment executionEnvironment) {
            this.jobInfo = jobInfo;
            this.options = options;
            this.executionEnvironment = executionEnvironment;
            this.dataSets = new HashMap();
            this.danglingDataSets = new HashSet<String>();
        }

        @Override
        public JobInfo getJobInfo() {
            return this.jobInfo;
        }

        @Override
        public FlinkPipelineOptions getPipelineOptions() {
            return this.options;
        }

        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
            return this.getExecutionEnvironment().execute(jobName);
        }

        public ExecutionEnvironment getExecutionEnvironment() {
            return this.executionEnvironment;
        }

        public <T> void addDataSet(String pCollectionId, DataSet<T> dataSet) {
            Preconditions.checkArgument((!this.dataSets.containsKey(pCollectionId) ? 1 : 0) != 0);
            this.dataSets.put(pCollectionId, dataSet);
            this.danglingDataSets.add(pCollectionId);
        }

        public <T> DataSet<T> getDataSetOrThrow(String pCollectionId) {
            DataSet<?> dataSet = this.dataSets.get(pCollectionId);
            if (dataSet == null) {
                throw new IllegalArgumentException(String.format("Unknown dataset for id %s.", pCollectionId));
            }
            this.danglingDataSets.remove(pCollectionId);
            return dataSet;
        }

        public Collection<DataSet<?>> getDanglingDataSets() {
            return this.danglingDataSets.stream().map(id -> this.dataSets.get(id)).collect(Collectors.toList());
        }
    }
}

