/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.ProcessFnRunner;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.utils.FlinkClassloading;
import org.apache.beam.runners.flink.translation.utils.NoopLock;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KeyedPushedBackElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.NonKeyedPushedBackElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.PushedBackElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.joda.time.Instant;

public class DoFnOperator<InputT, OutputT>
extends AbstractStreamOperator<WindowedValue<OutputT>>
implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>,
TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>,
Triggerable<Object, TimerInternals.TimerData> {
    protected DoFn<InputT, OutputT> doFn;
    protected final SerializablePipelineOptions serializedOptions;
    protected final TupleTag<OutputT> mainOutputTag;
    protected final List<TupleTag<?>> additionalOutputTags;
    protected final Collection<PCollectionView<?>> sideInputs;
    protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
    protected final WindowingStrategy<?, ?> windowingStrategy;
    protected final OutputManagerFactory<OutputT> outputManagerFactory;
    protected transient DoFnRunner<InputT, OutputT> doFnRunner;
    protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
    protected transient BufferingDoFnRunner<InputT, OutputT> bufferingDoFnRunner;
    protected transient SideInputHandler sideInputHandler;
    protected transient SideInputReader sideInputReader;
    protected transient BufferedOutputManager<OutputT> outputManager;
    private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
    protected transient long currentInputWatermark;
    protected transient long currentSideInputWatermark;
    protected transient long currentOutputWatermark;
    protected transient FlinkStateInternals<?> keyedStateInternals;
    protected final String stepName;
    private final Coder<WindowedValue<InputT>> windowedInputCoder;
    private final Coder<InputT> inputCoder;
    private final Map<TupleTag<?>, Coder<?>> outputCoders;
    protected final Coder<?> keyCoder;
    final KeySelector<WindowedValue<InputT>, ?> keySelector;
    private final TimerInternals.TimerDataCoder timerCoder;
    private final long maxBundleSize;
    private final long maxBundleTimeMills;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final boolean requiresStableInput;
    protected transient InternalTimerService<TimerInternals.TimerData> timerService;
    protected transient FlinkTimerInternals timerInternals;
    private transient long pushedBackWatermark;
    private transient PushedBackElementsHandler<WindowedValue<InputT>> pushedBackElementsHandler;
    private transient AtomicBoolean bundleStarted;
    private transient long elementCount;
    private transient ScheduledFuture<?> checkFinishBundleTimer;
    private transient long lastFinishBundleTime;
    private transient Runnable bundleFinishedCallback;

    public DoFnOperator(DoFn<InputT, OutputT> doFn, String stepName, Coder<WindowedValue<InputT>> inputWindowedCoder, Coder<InputT> inputCoder, Map<TupleTag<?>, Coder<?>> outputCoders, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> sideInputTagMapping, Collection<PCollectionView<?>> sideInputs, PipelineOptions options, Coder<?> keyCoder, KeySelector<WindowedValue<InputT>, ?> keySelector, DoFnSchemaInformation doFnSchemaInformation) {
        this.doFn = doFn;
        this.stepName = stepName;
        this.windowedInputCoder = inputWindowedCoder;
        this.inputCoder = inputCoder;
        this.outputCoders = outputCoders;
        this.mainOutputTag = mainOutputTag;
        this.additionalOutputTags = additionalOutputTags;
        this.sideInputTagMapping = sideInputTagMapping;
        this.sideInputs = sideInputs;
        this.serializedOptions = new SerializablePipelineOptions(options);
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
        this.keyCoder = keyCoder;
        this.keySelector = keySelector;
        this.timerCoder = TimerInternals.TimerDataCoder.of((Coder)windowingStrategy.getWindowFn().windowCoder());
        FlinkPipelineOptions flinkOptions = (FlinkPipelineOptions)options.as(FlinkPipelineOptions.class);
        this.maxBundleSize = flinkOptions.getMaxBundleSize();
        org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument((this.maxBundleSize > 0L ? 1 : 0) != 0, (Object)"Bundle size must be at least 1");
        this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills();
        org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument((this.maxBundleTimeMills > 0L ? 1 : 0) != 0, (Object)"Bundle time must be at least 1");
        this.doFnSchemaInformation = doFnSchemaInformation;
        boolean bl = this.requiresStableInput = doFn != null && DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput();
        if (this.requiresStableInput) {
            org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState((flinkOptions.getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE ? 1 : 0) != 0, (Object)"Checkpointing mode is not set to exactly once but @RequiresStableInput is used.");
            org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState((flinkOptions.getCheckpointingInterval() > 0L ? 1 : 0) != 0, (Object)"No checkpointing configured but pipeline uses @RequiresStableInput");
            LOG.warn("Enabling stable input for transform {}. Will only process elements at most every {} milliseconds.", (Object)stepName, (Object)(flinkOptions.getCheckpointingInterval() + Math.max(0L, flinkOptions.getMinPauseBetweenCheckpoints())));
        }
    }

    protected DoFn<InputT, OutputT> getDoFn() {
        return this.doFn;
    }

    protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(DoFnRunner<InputT, OutputT> wrappedRunner) {
        if (this.keyCoder != null) {
            StatefulDoFnRunner.TimeInternalsCleanupTimer cleanupTimer = new StatefulDoFnRunner.TimeInternalsCleanupTimer((TimerInternals)this.timerInternals, this.windowingStrategy);
            Coder windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
            StatefulDoFnRunner.StateInternalsStateCleaner stateCleaner = new StatefulDoFnRunner.StateInternalsStateCleaner(this.doFn, this.keyedStateInternals, windowCoder);
            return DoFnRunners.defaultStatefulDoFnRunner(this.doFn, wrappedRunner, this.windowingStrategy, (StatefulDoFnRunner.CleanupTimer)cleanupTimer, (StatefulDoFnRunner.StateCleaner)stateCleaner);
        }
        return this.doFnRunner;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<WindowedValue<OutputT>>> output) {
        FlinkPipelineOptions options = (FlinkPipelineOptions)this.serializedOptions.get().as(FlinkPipelineOptions.class);
        FileSystems.setDefaultPipelineOptions((PipelineOptions)options);
        super.setup(containingTask, config, output);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        ListStateDescriptor pushedBackStateDescriptor = new ListStateDescriptor("pushed-back-elements", new CoderTypeSerializer<WindowedValue<InputT>>(this.windowedInputCoder));
        if (this.keySelector != null) {
            this.pushedBackElementsHandler = KeyedPushedBackElementsHandler.create(this.keySelector, this.getKeyedStateBackend(), pushedBackStateDescriptor);
        } else {
            ListState listState = this.getOperatorStateBackend().getListState(pushedBackStateDescriptor);
            this.pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState);
        }
        this.setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        this.setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        this.setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        this.sideInputReader = NullSideInputReader.of(this.sideInputs);
        if (!this.sideInputs.isEmpty()) {
            FlinkBroadcastStateInternals sideInputStateInternals = new FlinkBroadcastStateInternals(this.getContainingTask().getIndexInSubtaskGroup(), this.getOperatorStateBackend());
            this.sideInputHandler = new SideInputHandler(this.sideInputs, sideInputStateInternals);
            this.sideInputReader = this.sideInputHandler;
            Stream<WindowedValue<InputT>> pushedBack = this.pushedBackElementsHandler.getElements();
            long min = pushedBack.map(v -> v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min);
            this.setPushedBackWatermark(min);
        } else {
            this.setPushedBackWatermark(Long.MAX_VALUE);
        }
        if (this.keyCoder != null) {
            this.keyedStateInternals = new FlinkStateInternals((KeyedStateBackend<ByteBuffer>)this.getKeyedStateBackend(), this.keyCoder);
            if (this.timerService == null) {
                this.timerService = this.getInternalTimerService("beam-timer", new CoderTypeSerializer(this.timerCoder), this);
            }
            this.timerInternals = new FlinkTimerInternals();
        }
        this.outputManager = this.outputManagerFactory.create(this.output, this.getLockToAcquireForStateAccessDuringBundles(), this.getOperatorStateBackend(), this.getKeyedStateBackend(), this.keySelector);
    }

    protected Lock getLockToAcquireForStateAccessDuringBundles() {
        return NoopLock.get();
    }

    public void open() throws Exception {
        this.doFn = this.getDoFn();
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        this.doFnInvoker.invokeSetup();
        FlinkPipelineOptions options = (FlinkPipelineOptions)this.serializedOptions.get().as(FlinkPipelineOptions.class);
        this.doFnRunner = DoFnRunners.simpleRunner((PipelineOptions)options, this.doFn, (SideInputReader)this.sideInputReader, this.outputManager, this.mainOutputTag, this.additionalOutputTags, (StepContext)new FlinkStepContext(), this.inputCoder, this.outputCoders, this.windowingStrategy, (DoFnSchemaInformation)this.doFnSchemaInformation);
        if (this.requiresStableInput) {
            this.bufferingDoFnRunner = BufferingDoFnRunner.create(this.doFnRunner, "stable-input-buffer", this.windowedInputCoder, this.windowingStrategy.getWindowFn().windowCoder(), this.getOperatorStateBackend(), (KeyedStateBackend<Object>)this.getKeyedStateBackend());
            this.doFnRunner = this.bufferingDoFnRunner;
        }
        this.doFnRunner = this.createWrappingDoFnRunner(this.doFnRunner);
        if (options.getEnableMetrics().booleanValue()) {
            this.doFnRunner = new DoFnRunnerWithMetricsUpdate<InputT, OutputT>(this.stepName, this.doFnRunner, (RuntimeContext)this.getRuntimeContext());
        }
        this.bundleStarted = new AtomicBoolean(false);
        this.elementCount = 0L;
        this.lastFinishBundleTime = this.getProcessingTimeService().getCurrentProcessingTime();
        long bundleCheckPeriod = Math.max(this.maxBundleTimeMills / 2L, 1L);
        this.checkFinishBundleTimer = this.getProcessingTimeService().scheduleAtFixedRate(timestamp -> this.checkInvokeFinishBundleByTime(), bundleCheckPeriod, bundleCheckPeriod);
        this.pushbackDoFnRunner = this.doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn ? new ProcessFnRunner(this.doFnRunner, this.sideInputs, (ReadyCheckingSideInputReader)this.sideInputHandler) : SimplePushbackSideInputDoFnRunner.create(this.doFnRunner, this.sideInputs, (ReadyCheckingSideInputReader)this.sideInputHandler);
    }

    public void dispose() throws Exception {
        try {
            Optional.ofNullable(this.checkFinishBundleTimer).ifPresent(timer -> timer.cancel(true));
            FlinkClassloading.deleteStaticCaches();
            Optional.ofNullable(this.doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown);
        }
        finally {
            super.dispose();
        }
    }

    public void close() throws Exception {
        List pushedBackElements;
        try {
            while (this.numProcessingTimeTimers() > 0) {
                this.getContainingTask().getCheckpointLock().wait(100L);
            }
            if (this.numProcessingTimeTimers() > 0) {
                throw new RuntimeException("There are still processing-time timers left, this indicates a bug");
            }
            this.processWatermark(new Watermark(Long.MAX_VALUE));
            this.invokeFinishBundle();
            if (this.currentOutputWatermark < Long.MAX_VALUE) {
                if (this.keyedStateInternals == null) {
                    throw new RuntimeException("Current watermark is still " + this.currentOutputWatermark + ".");
                }
                throw new RuntimeException("There are still watermark holds. Watermark held at " + this.keyedStateInternals.watermarkHold().getMillis() + ".");
            }
        }
        finally {
            super.close();
        }
        if (!this.sideInputs.isEmpty() && (pushedBackElements = this.pushedBackElementsHandler.getElements().collect(Collectors.toList())).size() > 0) {
            String pushedBackString = Joiner.on((String)",").join(pushedBackElements);
            throw new RuntimeException("Leftover pushed-back data: " + pushedBackString + ". This indicates a bug.");
        }
    }

    protected long getPushbackWatermarkHold() {
        return this.pushedBackWatermark;
    }

    protected void setPushedBackWatermark(long watermark) {
        this.pushedBackWatermark = watermark;
    }

    protected void setBundleFinishedCallback(Runnable callback) {
        this.bundleFinishedCallback = callback;
    }

    public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        this.checkInvokeStartBundle();
        this.doFnRunner.processElement((WindowedValue)streamRecord.getValue());
        this.checkInvokeFinishBundleByCount();
    }

    public final void processElement1(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        this.checkInvokeStartBundle();
        Iterable justPushedBack = this.pushbackDoFnRunner.processElementInReadyWindows((WindowedValue)streamRecord.getValue());
        long min = this.pushedBackWatermark;
        for (WindowedValue pushedBackValue : justPushedBack) {
            min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
            this.pushedBackElementsHandler.pushBack(pushedBackValue);
        }
        this.setPushedBackWatermark(min);
        this.checkInvokeFinishBundleByCount();
    }

    protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
        WindowedValue value = (WindowedValue)((RawUnionValue)streamRecord.getValue()).getValue();
        PCollectionView<?> sideInput = this.sideInputTagMapping.get(((RawUnionValue)streamRecord.getValue()).getUnionTag());
        this.sideInputHandler.addSideInputValue(sideInput, value);
    }

    public final void processElement2(StreamRecord<RawUnionValue> streamRecord) throws Exception {
        this.invokeFinishBundle();
        this.checkInvokeStartBundle();
        this.addSideInputValue(streamRecord);
        ArrayList newPushedBack = new ArrayList();
        Iterator it = this.pushedBackElementsHandler.getElements().iterator();
        while (it.hasNext()) {
            WindowedValue element = (WindowedValue)it.next();
            this.setKeyContextElement1(new StreamRecord((Object)element));
            Iterable justPushedBack = this.pushbackDoFnRunner.processElementInReadyWindows(element);
            Iterables.addAll(newPushedBack, (Iterable)justPushedBack);
        }
        this.pushedBackElementsHandler.clear();
        long min = Long.MAX_VALUE;
        for (WindowedValue pushedBackValue : newPushedBack) {
            min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
            this.pushedBackElementsHandler.pushBack(pushedBackValue);
        }
        this.setPushedBackWatermark(min);
        this.checkInvokeFinishBundleByCount();
        this.processWatermark1(new Watermark(this.currentInputWatermark));
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.processWatermark1(mark);
    }

    public void processWatermark1(Watermark mark) throws Exception {
        this.checkInvokeStartBundle();
        if (this.currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            this.emitAllPushedBackData();
        }
        this.setCurrentInputWatermark(mark.getTimestamp());
        if (this.keyCoder == null) {
            long potentialOutputWatermark = Math.min(this.getPushbackWatermarkHold(), this.currentInputWatermark);
            if (potentialOutputWatermark > this.currentOutputWatermark) {
                this.setCurrentOutputWatermark(potentialOutputWatermark);
                this.emitWatermark(this.currentOutputWatermark);
            }
        } else {
            long pushedBackInputWatermark = Math.min(this.getPushbackWatermarkHold(), mark.getTimestamp());
            this.timeServiceManager.advanceWatermark(new Watermark(pushedBackInputWatermark));
            Instant watermarkHold = this.keyedStateInternals.watermarkHold();
            long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), this.getPushbackWatermarkHold());
            long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold);
            if (potentialOutputWatermark > this.currentOutputWatermark) {
                this.setCurrentOutputWatermark(potentialOutputWatermark);
                this.emitWatermark(this.currentOutputWatermark);
            }
        }
    }

    private void emitWatermark(long watermark) {
        if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            this.invokeFinishBundle();
        }
        this.output.emitWatermark(new Watermark(watermark));
    }

    public void processWatermark2(Watermark mark) throws Exception {
        this.checkInvokeStartBundle();
        this.setCurrentSideInputWatermark(mark.getTimestamp());
        if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            this.emitAllPushedBackData();
            this.processWatermark1(new Watermark(this.currentInputWatermark));
        }
    }

    private void emitAllPushedBackData() throws Exception {
        Iterator it = this.pushedBackElementsHandler.getElements().iterator();
        while (it.hasNext()) {
            WindowedValue element = (WindowedValue)it.next();
            this.setKeyContextElement1(new StreamRecord((Object)element));
            this.doFnRunner.processElement(element);
        }
        this.pushedBackElementsHandler.clear();
        this.setPushedBackWatermark(Long.MAX_VALUE);
    }

    private void checkInvokeStartBundle() {
        if (this.bundleStarted.compareAndSet(false, true)) {
            this.outputManager.flushBuffer();
            this.pushbackDoFnRunner.startBundle();
        }
    }

    private void checkInvokeFinishBundleByCount() {
        ++this.elementCount;
        if (this.elementCount >= this.maxBundleSize) {
            this.invokeFinishBundle();
        }
    }

    private void checkInvokeFinishBundleByTime() {
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        if (now - this.lastFinishBundleTime >= this.maxBundleTimeMills) {
            this.invokeFinishBundle();
        }
    }

    protected final void invokeFinishBundle() {
        if (this.bundleStarted.compareAndSet(true, false)) {
            this.pushbackDoFnRunner.finishBundle();
            this.elementCount = 0L;
            this.lastFinishBundleTime = this.getProcessingTimeService().getCurrentProcessingTime();
            if (this.bundleFinishedCallback != null) {
                this.bundleFinishedCallback.run();
                this.bundleFinishedCallback = null;
            }
        }
    }

    public final void snapshotState(StateSnapshotContext context) throws Exception {
        if (this.requiresStableInput) {
            this.bufferingDoFnRunner.checkpoint(context.getCheckpointId());
        }
        this.outputManager.openBuffer();
        while (this.bundleStarted.get()) {
            this.invokeFinishBundle();
        }
        this.outputManager.closeBuffer();
        super.snapshotState(context);
    }

    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (this.requiresStableInput) {
            this.bufferingDoFnRunner.checkpointCompleted(checkpointId);
        }
    }

    public void onEventTime(InternalTimer<Object, TimerInternals.TimerData> timer) throws Exception {
        this.fireTimer(timer);
    }

    public void onProcessingTime(InternalTimer<Object, TimerInternals.TimerData> timer) throws Exception {
        this.checkInvokeStartBundle();
        this.fireTimer(timer);
    }

    public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
        TimerInternals.TimerData timerData = (TimerInternals.TimerData)timer.getNamespace();
        StateNamespace namespace = timerData.getNamespace();
        Preconditions.checkArgument((boolean)(namespace instanceof StateNamespaces.WindowNamespace));
        BoundedWindow window = ((StateNamespaces.WindowNamespace)namespace).getWindow();
        this.timerInternals.cleanupPendingTimer((TimerInternals.TimerData)timer.getNamespace());
        this.pushbackDoFnRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
    }

    private void setCurrentInputWatermark(long currentInputWatermark) {
        this.currentInputWatermark = currentInputWatermark;
    }

    private void setCurrentSideInputWatermark(long currentInputWatermark) {
        this.currentSideInputWatermark = currentInputWatermark;
    }

    private void setCurrentOutputWatermark(long currentOutputWatermark) {
        this.currentOutputWatermark = currentOutputWatermark;
    }

    class FlinkTimerInternals
    implements TimerInternals {
        final MapState<String, TimerInternals.TimerData> pendingTimersById;

        private FlinkTimerInternals() {
            MapStateDescriptor pendingTimersByIdStateDescriptor = new MapStateDescriptor("pending-timers", (TypeSerializer)new StringSerializer(), new CoderTypeSerializer(DoFnOperator.this.timerCoder));
            this.pendingTimersById = DoFnOperator.this.getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor);
        }

        public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
            this.setTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)target, (TimeDomain)timeDomain));
        }

        @Deprecated
        public void setTimer(TimerInternals.TimerData timer) {
            try {
                String contextTimerId = this.getContextTimerId(timer.getTimerId(), timer.getNamespace());
                this.cancelPendingTimerById(contextTimerId);
                this.registerTimer(timer, contextTimerId);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to set timer", e);
            }
        }

        private void registerTimer(TimerInternals.TimerData timer, String contextTimerId) throws Exception {
            long time = timer.getTimestamp().getMillis();
            switch (timer.getDomain()) {
                case EVENT_TIME: {
                    DoFnOperator.this.timerService.registerEventTimeTimer((Object)timer, this.adjustTimestampForFlink(time));
                    break;
                }
                case PROCESSING_TIME: 
                case SYNCHRONIZED_PROCESSING_TIME: {
                    DoFnOperator.this.timerService.registerProcessingTimeTimer((Object)timer, this.adjustTimestampForFlink(time));
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported time domain: " + timer.getDomain());
                }
            }
            this.pendingTimersById.put((Object)contextTimerId, (Object)timer);
        }

        private void cancelPendingTimerById(String contextTimerId) throws Exception {
            TimerInternals.TimerData oldTimer = (TimerInternals.TimerData)this.pendingTimersById.get((Object)contextTimerId);
            if (oldTimer != null) {
                this.deleteTimer(oldTimer);
            }
        }

        void cleanupPendingTimer(TimerInternals.TimerData timer) {
            try {
                this.pendingTimersById.remove((Object)this.getContextTimerId(timer.getTimerId(), timer.getNamespace()));
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to cleanup state with pending timers", e);
            }
        }

        private String getContextTimerId(String timerId, StateNamespace namespace) {
            return timerId + namespace.stringKey();
        }

        @Deprecated
        public void deleteTimer(StateNamespace namespace, String timerId) {
            throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
        }

        public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
            try {
                this.cancelPendingTimerById(this.getContextTimerId(timerId, namespace));
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to cancel timer", e);
            }
        }

        @Deprecated
        public void deleteTimer(TimerInternals.TimerData timerKey) {
            this.cleanupPendingTimer(timerKey);
            long time = timerKey.getTimestamp().getMillis();
            switch (timerKey.getDomain()) {
                case EVENT_TIME: {
                    DoFnOperator.this.timerService.deleteEventTimeTimer((Object)timerKey, this.adjustTimestampForFlink(time));
                    break;
                }
                case PROCESSING_TIME: 
                case SYNCHRONIZED_PROCESSING_TIME: {
                    DoFnOperator.this.timerService.deleteProcessingTimeTimer((Object)timerKey, this.adjustTimestampForFlink(time));
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported time domain: " + timerKey.getDomain());
                }
            }
        }

        public Instant currentProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        @Nullable
        public Instant currentSynchronizedProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        public Instant currentInputWatermarkTime() {
            return new Instant(Math.min(DoFnOperator.this.currentInputWatermark, DoFnOperator.this.getPushbackWatermarkHold()));
        }

        @Nullable
        public Instant currentOutputWatermarkTime() {
            return new Instant(DoFnOperator.this.currentOutputWatermark);
        }

        private long adjustTimestampForFlink(long beamTimerTimestamp) {
            if (beamTimerTimestamp == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            return beamTimerTimestamp + 1L;
        }
    }

    protected class FlinkStepContext
    implements StepContext {
        protected FlinkStepContext() {
        }

        public StateInternals stateInternals() {
            return DoFnOperator.this.keyedStateInternals;
        }

        public TimerInternals timerInternals() {
            return DoFnOperator.this.timerInternals;
        }
    }

    public static class MultiOutputOutputManagerFactory<OutputT>
    implements OutputManagerFactory<OutputT> {
        private TupleTag<OutputT> mainTag;
        private Map<TupleTag<?>, Integer> tagsToIds;
        private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
        private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;

        public MultiOutputOutputManagerFactory(TupleTag<OutputT> mainTag, Coder<WindowedValue<OutputT>> mainCoder) {
            this(mainTag, new HashMap(), (Map<TupleTag<?>, Coder<WindowedValue<?>>>)ImmutableMap.builder().put(mainTag, mainCoder).build(), (Map<TupleTag<?>, Integer>)ImmutableMap.builder().put(mainTag, (Object)0).build());
        }

        public MultiOutputOutputManagerFactory(TupleTag<OutputT> mainTag, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders, Map<TupleTag<?>, Integer> tagsToIds) {
            this.mainTag = mainTag;
            this.tagsToOutputTags = tagsToOutputTags;
            this.tagsToCoders = tagsToCoders;
            this.tagsToIds = tagsToIds;
        }

        @Override
        public BufferedOutputManager<OutputT> create(Output<StreamRecord<WindowedValue<OutputT>>> output, Lock bufferLock, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend, @Nullable KeySelector keySelector) throws Exception {
            NonKeyedPushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler;
            org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull(output);
            org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull((Object)bufferLock);
            org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull((Object)operatorStateBackend);
            org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState((keyedStateBackend == null == (keySelector == null) ? 1 : 0) != 0, (Object)"Either both KeyedStatebackend and Keyselector are provided or none.");
            TaggedKvCoder taggedKvCoder = this.buildTaggedKvCoder();
            ListStateDescriptor taggedOutputPushbackStateDescriptor = new ListStateDescriptor("bundle-buffer-tag", new CoderTypeSerializer(taggedKvCoder));
            if (keyedStateBackend != null) {
                KeySelector & Serializable taggedValueKeySelector = (KeySelector & Serializable)value -> keySelector.getKey(value.getValue());
                pushedBackElementsHandler = KeyedPushedBackElementsHandler.create(taggedValueKeySelector, keyedStateBackend, taggedOutputPushbackStateDescriptor);
            } else {
                ListState listState = operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor);
                pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState);
            }
            return new BufferedOutputManager<OutputT>(output, this.mainTag, this.tagsToOutputTags, this.tagsToIds, bufferLock, pushedBackElementsHandler);
        }

        private TaggedKvCoder buildTaggedKvCoder() {
            ImmutableMap.Builder idsToCodersBuilder = ImmutableMap.builder();
            for (Map.Entry<TupleTag<?>, Integer> entry : this.tagsToIds.entrySet()) {
                idsToCodersBuilder.put((Object)entry.getValue(), this.tagsToCoders.get(entry.getKey()));
            }
            return new TaggedKvCoder((Map<Integer, Coder<WindowedValue<?>>>)idsToCodersBuilder.build());
        }
    }

    private static class TaggedKvCoder
    extends StructuredCoder<KV<Integer, WindowedValue<?>>> {
        private Map<Integer, Coder<WindowedValue<?>>> idsToCoders;

        TaggedKvCoder(Map<Integer, Coder<WindowedValue<?>>> idsToCoders) {
            this.idsToCoders = idsToCoders;
        }

        public void encode(KV<Integer, WindowedValue<?>> kv, OutputStream out) throws IOException {
            Coder<WindowedValue<?>> coder = this.idsToCoders.get(kv.getKey());
            VarIntCoder.of().encode((Integer)kv.getKey(), out);
            coder.encode((Object)((WindowedValue)kv.getValue()), out);
        }

        public KV<Integer, WindowedValue<?>> decode(InputStream in) throws IOException {
            Integer id = VarIntCoder.of().decode(in);
            Coder<WindowedValue<?>> coder = this.idsToCoders.get(id);
            WindowedValue value = (WindowedValue)coder.decode(in);
            return KV.of((Object)id, (Object)value);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return new ArrayList(this.idsToCoders.values());
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
            for (Coder<WindowedValue<?>> coder : this.idsToCoders.values()) {
                TaggedKvCoder.verifyDeterministic((Coder)this, (String)"Coder must be deterministic", (Coder[])new Coder[]{coder});
            }
        }
    }

    public static class BufferedOutputManager<OutputT>
    implements DoFnRunners.OutputManager {
        private final TupleTag<OutputT> mainTag;
        private final Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
        private final Map<TupleTag<?>, Integer> tagsToIds;
        private final Lock bufferLock;
        private Map<Integer, TupleTag<?>> idsToTags;
        @VisibleForTesting
        final PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler;
        protected final Output<StreamRecord<WindowedValue<OutputT>>> output;
        private boolean openBuffer = false;

        BufferedOutputManager(Output<StreamRecord<WindowedValue<OutputT>>> output, TupleTag<OutputT> mainTag, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, Map<TupleTag<?>, Integer> tagsToIds, Lock bufferLock, PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler) {
            this.output = output;
            this.mainTag = mainTag;
            this.tagsToOutputTags = tagsToOutputTags;
            this.tagsToIds = tagsToIds;
            this.bufferLock = bufferLock;
            this.idsToTags = new HashMap();
            for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) {
                this.idsToTags.put(entry.getValue(), entry.getKey());
            }
            this.pushedBackElementsHandler = pushedBackElementsHandler;
        }

        void openBuffer() {
            this.openBuffer = true;
        }

        void closeBuffer() {
            this.openBuffer = false;
        }

        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
            if (!this.openBuffer) {
                this.emit(tag, value);
            } else {
                this.buffer(KV.of((Object)this.tagsToIds.get(tag), value));
            }
        }

        private void buffer(KV<Integer, WindowedValue<?>> taggedValue) {
            try {
                this.bufferLock.lock();
                this.pushedBackElementsHandler.pushBack(taggedValue);
            }
            catch (Exception e) {
                throw new RuntimeException("Couldn't pushback element.", e);
            }
            finally {
                this.bufferLock.unlock();
            }
        }

        void flushBuffer() {
            try {
                this.pushedBackElementsHandler.getElements().forEach(element -> this.emit(this.idsToTags.get(element.getKey()), (WindowedValue)element.getValue()));
                this.pushedBackElementsHandler.clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Couldn't flush pushed back elements.", e);
            }
        }

        private <T> void emit(TupleTag<T> tag, WindowedValue<T> value) {
            if (tag.equals(this.mainTag)) {
                WindowedValue<T> castValue = value;
                this.output.collect((Object)new StreamRecord(castValue));
            } else {
                OutputTag<WindowedValue<?>> outputTag = this.tagsToOutputTags.get(tag);
                this.output.collect(outputTag, new StreamRecord(value));
            }
        }
    }

    static interface OutputManagerFactory<OutputT>
    extends Serializable {
        public BufferedOutputManager<OutputT> create(Output<StreamRecord<WindowedValue<OutputT>>> var1, Lock var2, @Nullable OperatorStateBackend var3, @Nullable KeyedStateBackend var4, @Nullable KeySelector var5) throws Exception;
    }
}

