/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core;

import java.util.ArrayList;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;

public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
    private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
    private final LateDataFilter lateDataFilter;
    public static final String DROPPED_DUE_TO_LATENESS = "droppedDueToLateness";

    public LateDataDroppingDoFnRunner(DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner, WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals) {
        this.doFnRunner = doFnRunner;
        this.lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals);
    }

    @Override
    public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getFn() {
        return this.doFnRunner.getFn();
    }

    @Override
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    @Override
    public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
        Iterable nonLateElements = this.lateDataFilter.filter(elem.getValue().key(), elem.getValue().elementsIterable());
        KeyedWorkItem keyedWorkItem = KeyedWorkItems.workItem(elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
        this.doFnRunner.processElement(elem.withValue(keyedWorkItem));
    }

    @Override
    public <KeyT> void onTimer(String timerId, String timerFamilyId, KeyT key, BoundedWindow window, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
        this.doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
    }

    @Override
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }

    @Override
    public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
        this.doFnRunner.onWindowExpiration(window, timestamp, key);
    }

    @VisibleForTesting
    static class LateDataFilter {
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final TimerInternals timerInternals;
        private final Counter droppedDueToLateness;

        public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals) {
            this.windowingStrategy = windowingStrategy;
            this.timerInternals = timerInternals;
            this.droppedDueToLateness = Metrics.counter(LateDataDroppingDoFnRunner.class, LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS);
        }

        public <K, InputT> Iterable<WindowedValue<InputT>> filter(K key, Iterable<WindowedValue<InputT>> elements) {
            ArrayList<WindowedValue<InputT>> nonLateElements = new ArrayList<WindowedValue<InputT>>();
            for (WindowedValue<InputT> element : elements) {
                for (BoundedWindow window : element.getWindows()) {
                    if (this.canDropDueToExpiredWindow(window)) {
                        this.droppedDueToLateness.inc();
                        WindowTracing.debug("{}: Dropping element at {} for key:{}; window:{} since too far behind inputWatermark:{}; outputWatermark:{}", LateDataFilter.class.getSimpleName(), element.getTimestamp(), key, window, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
                        continue;
                    }
                    nonLateElements.add(WindowedValue.of(element.getValue(), element.getTimestamp(), window, element.getPane()));
                }
            }
            return nonLateElements;
        }

        private boolean canDropDueToExpiredWindow(BoundedWindow window) {
            Instant inputWM = this.timerInternals.currentInputWatermarkTime();
            return LateDataUtils.garbageCollectionTime(window, this.windowingStrategy).isBefore(inputWM);
        }
    }
}

