/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineTranslator;
import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators;
import org.apache.beam.runners.flink.FlinkStreamingTranslationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlinkStreamingPipelineTranslator
extends FlinkPipelineTranslator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
    private final FlinkStreamingTranslationContext streamingContext;
    private int depth = 0;

    public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
        this.streamingContext = new FlinkStreamingTranslationContext(env, options);
    }

    @Override
    public void translate(Pipeline pipeline) {
        UnconsumedReads.ensureAllReadsConsumed((Pipeline)pipeline);
        super.translate(pipeline);
    }

    public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
        StreamTransformTranslator<?> translator;
        LOG.info("{} enterCompositeTransform- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
        ++this.depth;
        PTransform transform = node.getTransform();
        if (transform != null && (translator = FlinkStreamingTransformTranslators.getTranslator(transform)) != null && this.applyCanTranslate(transform, node, translator)) {
            this.applyStreamingTransform(transform, node, translator);
            LOG.info("{} translated- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
            return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
        }
        return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
    }

    public void leaveCompositeTransform(TransformHierarchy.Node node) {
        --this.depth;
        LOG.info("{} leaveCompositeTransform- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
    }

    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
        LOG.info("{} visitPrimitiveTransform- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
        PTransform transform = node.getTransform();
        StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
        if (translator == null || !this.applyCanTranslate(transform, node, translator)) {
            String transformUrn = PTransformTranslation.urnForTransform((PTransform)transform);
            LOG.info(transformUrn);
            throw new UnsupportedOperationException("The transform " + transformUrn + " is currently not supported.");
        }
        this.applyStreamingTransform(transform, node, translator);
    }

    public void visitValue(PValue value, TransformHierarchy.Node producer) {
    }

    private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node, StreamTransformTranslator<?> translator) {
        PTransform<?, ?> typedTransform = transform;
        StreamTransformTranslator<?> typedTranslator = translator;
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform(this.getPipeline()));
        typedTranslator.translateNode(typedTransform, this.streamingContext);
    }

    private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, StreamTransformTranslator<?> translator) {
        PTransform<?, ?> typedTransform = transform;
        StreamTransformTranslator<?> typedTranslator = translator;
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform(this.getPipeline()));
        return typedTranslator.canTranslate(typedTransform, this.streamingContext);
    }

    @VisibleForTesting
    static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
    implements PTransformOverrideFactory<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> {
        FlinkPipelineOptions options;

        StreamingShardedWriteFactory(PipelineOptions options) {
            this.options = (FlinkPipelineOptions)options.as(FlinkPipelineOptions.class);
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>> getReplacementTransform(AppliedPTransform<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> transform) {
            Integer jobParallelism = this.options.getParallelism();
            Preconditions.checkArgument((jobParallelism > 0 ? 1 : 0) != 0, (String)"Parallelism of a job should be greater than 0. Currently set: %s", (Object[])new Object[]{jobParallelism});
            int numShards = jobParallelism * 2;
            try {
                List sideInputs = WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
                FileBasedSink sink = WriteFilesTranslation.getSink(transform);
                WriteFiles replacement = WriteFiles.to((FileBasedSink)sink).withSideInputs(sideInputs);
                if (WriteFilesTranslation.isWindowedWrites(transform)) {
                    replacement = replacement.withWindowedWrites();
                }
                return PTransformOverrideFactory.PTransformReplacement.of((PInput)PTransformReplacements.getSingletonMainInput(transform), (PTransform)replacement.withNumShards(numShards));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
            return Collections.emptyMap();
        }
    }

    static abstract class StreamTransformTranslator<T extends PTransform> {
        StreamTransformTranslator() {
        }

        abstract void translateNode(T var1, FlinkStreamingTranslationContext var2);

        boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
            return true;
        }
    }
}

