/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core;

import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner;
import org.apache.beam.repackaged.direct_java.runners.core.LateDataUtils;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<InputT, OutputT> {
    public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";
    private final DoFnRunner<InputT, OutputT> doFnRunner;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final Counter droppedDueToLateness = Metrics.counter(StatefulDoFnRunner.class, (String)"StatefulParDoDropped");
    private final CleanupTimer<InputT> cleanupTimer;
    private final StateCleaner stateCleaner;

    public StatefulDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner, WindowingStrategy<?, ?> windowingStrategy, CleanupTimer<InputT> cleanupTimer, StateCleaner<W> stateCleaner) {
        this.doFnRunner = doFnRunner;
        this.windowingStrategy = windowingStrategy;
        this.cleanupTimer = cleanupTimer;
        this.stateCleaner = stateCleaner;
        WindowFn windowFn = windowingStrategy.getWindowFn();
        this.rejectMergingWindowFn(windowFn);
    }

    private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
        if (!(windowFn instanceof NonMergingWindowFn)) {
            throw new UnsupportedOperationException("MergingWindowFn is not supported for stateful DoFns, WindowFn is: " + windowFn);
        }
    }

    @Override
    public DoFn<InputT, OutputT> getFn() {
        return this.doFnRunner.getFn();
    }

    @Override
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    @Override
    public void processElement(WindowedValue<InputT> input) {
        for (WindowedValue value : input.explodeWindows()) {
            BoundedWindow window = (BoundedWindow)value.getWindows().iterator().next();
            if (this.isLate(window)) {
                this.droppedDueToLateness.inc();
                WindowTracing.debug((String)"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} since too far behind inputWatermark:{}", (Object[])new Object[]{input.getTimestamp(), window, this.cleanupTimer.currentInputWatermarkTime()});
                continue;
            }
            this.cleanupTimer.setForWindow(value.getValue(), window);
            this.doFnRunner.processElement(value);
        }
    }

    private boolean isLate(BoundedWindow window) {
        Instant gcTime = LateDataUtils.garbageCollectionTime(window, this.windowingStrategy);
        Instant inputWM = this.cleanupTimer.currentInputWatermarkTime();
        return gcTime.isBefore((ReadableInstant)inputWM);
    }

    @Override
    public void onTimer(String timerId, String timerFamilyId, BoundedWindow window, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
        if (this.cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
            this.stateCleaner.clearForWindow(window);
        } else if (!timeDomain.equals((Object)TimeDomain.EVENT_TIME) && this.isLate(window)) {
            WindowTracing.debug((String)"StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} since window is too far behind inputWatermark:{}", (Object[])new Object[]{timestamp, window, this.cleanupTimer.currentInputWatermarkTime()});
        } else {
            this.doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
        }
    }

    @Override
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }

    public static class StateInternalsStateCleaner<W extends BoundedWindow>
    implements StateCleaner<W> {
        private final DoFn<?, ?> fn;
        private final DoFnSignature signature;
        private final StateInternals stateInternals;
        private final Coder<W> windowCoder;

        public StateInternalsStateCleaner(DoFn<?, ?> fn, StateInternals stateInternals, Coder<W> windowCoder) {
            this.fn = fn;
            this.signature = DoFnSignatures.getSignature(fn.getClass());
            this.stateInternals = stateInternals;
            this.windowCoder = windowCoder;
        }

        @Override
        public void clearForWindow(W window) {
            for (Map.Entry entry : this.signature.stateDeclarations().entrySet()) {
                try {
                    StateSpec spec = (StateSpec)((DoFnSignature.StateDeclaration)entry.getValue()).field().get(this.fn);
                    Object state = this.stateInternals.state(StateNamespaces.window(this.windowCoder, window), StateTags.tagForSpec((String)entry.getKey(), spec));
                    state.clear();
                }
                catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static class TimeInternalsCleanupTimer<InputT>
    implements CleanupTimer<InputT> {
        public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
        public static final long GC_DELAY_MS = 1L;
        private final TimerInternals timerInternals;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final Coder<BoundedWindow> windowCoder;

        public TimeInternalsCleanupTimer(TimerInternals timerInternals, WindowingStrategy<?, ?> windowingStrategy) {
            this.windowingStrategy = windowingStrategy;
            WindowFn windowFn = windowingStrategy.getWindowFn();
            this.windowCoder = windowFn.windowCoder();
            this.timerInternals = timerInternals;
        }

        @Override
        public Instant currentInputWatermarkTime() {
            return this.timerInternals.currentInputWatermarkTime();
        }

        @Override
        public void setForWindow(InputT input, BoundedWindow window) {
            Instant gcTime = LateDataUtils.garbageCollectionTime(window, this.windowingStrategy);
            gcTime = gcTime.plus(1L);
            this.timerInternals.setTimer(StateNamespaces.window(this.windowCoder, window), GC_TIMER_ID, "", gcTime, window.maxTimestamp(), TimeDomain.EVENT_TIME);
        }

        @Override
        public boolean isForWindow(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
            boolean isEventTimer = timeDomain.equals((Object)TimeDomain.EVENT_TIME);
            Instant gcTime = LateDataUtils.garbageCollectionTime(window, this.windowingStrategy);
            gcTime = gcTime.plus(1L);
            return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals((Object)timestamp);
        }
    }

    public static interface StateCleaner<W extends BoundedWindow> {
        public void clearForWindow(W var1);
    }

    public static interface CleanupTimer<InputT> {
        public Instant currentInputWatermarkTime();

        public void setForWindow(InputT var1, BoundedWindow var2);

        public boolean isForWindow(String var1, BoundedWindow var2, Instant var3, TimeDomain var4);
    }
}

