/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import java.io.Serializable;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.LateDataUtils;
import org.apache.beam.runners.direct.repackaged.runners.core.MergingStateAccessor;
import org.apache.beam.runners.direct.repackaged.runners.core.ReduceFn;
import org.apache.beam.runners.direct.repackaged.runners.core.StateMerging;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTag;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTags;
import org.apache.beam.runners.direct.repackaged.runners.core.TimerInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class WatermarkHold<W extends BoundedWindow>
implements Serializable {
    @VisibleForTesting
    public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("extra", TimestampCombiner.EARLIEST));
    private final TimerInternals timerInternals;
    private final WindowingStrategy<?, W> windowingStrategy;
    private final StateTag<WatermarkHoldState> elementHoldTag;

    public static <W extends BoundedWindow> StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(TimestampCombiner timestampCombiner) {
        return StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", timestampCombiner));
    }

    public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
        this.timerInternals = timerInternals;
        this.windowingStrategy = windowingStrategy;
        this.elementHoldTag = WatermarkHold.watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
    }

    @Nullable
    public Instant addHolds(ReduceFn.ProcessValueContext context) {
        Instant hold = this.addElementHold(context);
        if (hold == null) {
            hold = this.addGarbageCollectionHold(context, false);
        }
        return hold;
    }

    private Instant shift(Instant timestamp, W window) {
        Instant shifted = this.windowingStrategy.getTimestampCombiner().assign(window, this.windowingStrategy.getWindowFn().getOutputTime(timestamp, window));
        Preconditions.checkState(!shifted.isBefore((ReadableInstant)timestamp), "TimestampCombiner moved element from %s to earlier time %s for window %s", (Object)BoundedWindow.formatTimestamp((Instant)timestamp), (Object)BoundedWindow.formatTimestamp((Instant)shifted), window);
        Preconditions.checkState(timestamp.isAfter((ReadableInstant)window.maxTimestamp()) || !shifted.isAfter((ReadableInstant)window.maxTimestamp()), "TimestampCombiner moved element from %s to %s which is beyond end of window %s", (Object)timestamp, (Object)shifted, window);
        return shifted;
    }

    @Nullable
    private Instant addElementHold(ReduceFn.ProcessValueContext context) {
        boolean tooLate;
        String which;
        Instant elementHold = this.shift(context.timestamp(), context.window());
        Instant outputWM = this.timerInternals.currentOutputWatermarkTime();
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        if (outputWM != null && elementHold.isBefore((ReadableInstant)outputWM)) {
            which = "too late to effect output watermark";
            tooLate = true;
        } else if (context.window().maxTimestamp().isBefore((ReadableInstant)inputWM)) {
            which = "too late for end-of-window timer";
            tooLate = true;
        } else {
            which = "on time";
            tooLate = false;
            Preconditions.checkState(!elementHold.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Element hold %s is beyond end-of-time", (Object)elementHold);
            context.state().access(this.elementHoldTag).add((Object)elementHold);
        }
        WindowTracing.trace((String)"WatermarkHold.addHolds: element hold at {} is {} for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", (Object[])new Object[]{elementHold, which, context.key(), context.window(), inputWM, outputWM});
        return tooLate ? null : elementHold;
    }

    @Nullable
    private Instant addGarbageCollectionHold(ReduceFn.Context context, boolean paneIsEmpty) {
        Instant outputWM = this.timerInternals.currentOutputWatermarkTime();
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), this.windowingStrategy);
        if (gcHold.isBefore((ReadableInstant)inputWM)) {
            WindowTracing.trace((String)"{}.addGarbageCollectionHold: gc hold would be before the input watermark for key:{}; window: {}; inputWatermark: {}; outputWatermark: {}", (Object[])new Object[]{this.getClass().getSimpleName(), context.key(), context.window(), inputWM, outputWM});
            return null;
        }
        if (paneIsEmpty && context.windowingStrategy().getClosingBehavior() == Window.ClosingBehavior.FIRE_IF_NON_EMPTY) {
            WindowTracing.trace((String)"WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", (Object[])new Object[]{gcHold, context.key(), context.window(), inputWM, outputWM});
            return null;
        }
        if (!gcHold.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus((ReadableDuration)Duration.millis((long)1L));
        }
        Preconditions.checkState(!gcHold.isBefore((ReadableInstant)inputWM), "Garbage collection hold %s cannot be before input watermark %s", (Object)gcHold, (Object)inputWM);
        Preconditions.checkState(!gcHold.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Garbage collection hold %s is beyond end-of-time", (Object)gcHold);
        context.state().access(EXTRA_HOLD_TAG).add((Object)gcHold);
        WindowTracing.trace((String)"WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", (Object[])new Object[]{gcHold, context.key(), context.window(), inputWM, outputWM});
        return gcHold;
    }

    public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
        StateMerging.prefetchWatermarks(state, this.elementHoldTag);
    }

    public void onMerge(ReduceFn.OnMergeContext context) {
        WindowTracing.debug((String)"WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", (Object[])new Object[]{context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        StateMerging.mergeWatermarks(context.state(), this.elementHoldTag, context.window());
        StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
        this.addGarbageCollectionHold(context, false);
    }

    public void prefetchExtract(ReduceFn.Context context) {
        context.state().access(this.elementHoldTag).readLater();
        context.state().access(EXTRA_HOLD_TAG).readLater();
    }

    public ReadableState<OldAndNewHolds> extractAndRelease(final ReduceFn.Context context, final boolean isFinished) {
        WindowTracing.debug((String)"WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", (Object[])new Object[]{context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        final WatermarkHoldState elementHoldState = context.state().access(this.elementHoldTag);
        final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
        return new ReadableState<OldAndNewHolds>(){

            public ReadableState<OldAndNewHolds> readLater() {
                elementHoldState.readLater();
                extraHoldState.readLater();
                return this;
            }

            public OldAndNewHolds read() {
                Instant elementHold = (Instant)elementHoldState.read();
                Instant extraHold = (Instant)extraHoldState.read();
                Instant oldHold = elementHold == null ? extraHold : (extraHold == null ? elementHold : (elementHold.isBefore((ReadableInstant)extraHold) ? elementHold : extraHold));
                if (oldHold == null || oldHold.isAfter((ReadableInstant)context.window().maxTimestamp())) {
                    WindowTracing.debug((String)"WatermarkHold.extractAndRelease.read: clipping from {} to end of window for key:{}; window:{}", (Object[])new Object[]{oldHold, context.key(), context.window()});
                    oldHold = context.window().maxTimestamp();
                }
                WindowTracing.debug((String)"WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", (Object[])new Object[]{context.key(), context.window()});
                elementHoldState.clear();
                extraHoldState.clear();
                Instant newHold = null;
                if (!isFinished) {
                    newHold = WatermarkHold.this.addGarbageCollectionHold(context, true);
                }
                return new OldAndNewHolds(oldHold, newHold);
            }
        };
    }

    public void clearHolds(ReduceFn.Context context) {
        WindowTracing.debug((String)"WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", (Object[])new Object[]{context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        context.state().access(this.elementHoldTag).clear();
        context.state().access(EXTRA_HOLD_TAG).clear();
    }

    @Nullable
    public Instant getDataCurrent(ReduceFn.Context context) {
        return (Instant)context.state().access(this.elementHoldTag).read();
    }

    public static class OldAndNewHolds {
        public final Instant oldHold;
        @Nullable
        public final Instant newHold;

        public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
            this.oldHold = oldHold;
            this.newHold = newHold;
        }
    }
}

