/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.KeyedWorkItem;
import org.apache.beam.runners.direct.repackaged.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.direct.repackaged.runners.core.StateNamespaces;
import org.apache.beam.runners.direct.repackaged.runners.core.TimerInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;

public class ProcessFnRunner<InputT, OutputT, RestrictionT>
implements PushbackSideInputDoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
    private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying;
    private final Collection<PCollectionView<?>> views;
    private final ReadyCheckingSideInputReader sideInputReader;

    ProcessFnRunner(DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying, Collection<PCollectionView<?>> views, ReadyCheckingSideInputReader sideInputReader) {
        this.underlying = underlying;
        this.views = views;
        this.sideInputReader = sideInputReader;
    }

    @Override
    public void startBundle() {
        this.underlying.startBundle();
    }

    @Override
    public Iterable<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> processElementInReadyWindows(WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>> windowedKWI) {
        ProcessFnRunner.checkTrivialOuterWindows(windowedKWI);
        BoundedWindow window = ProcessFnRunner.getUnderlyingWindow((KeyedWorkItem)windowedKWI.getValue());
        if (!this.isReady(window)) {
            return Collections.singletonList(windowedKWI);
        }
        this.underlying.processElement(windowedKWI);
        return Collections.emptyList();
    }

    @Override
    public void finishBundle() {
        this.underlying.finishBundle();
    }

    @Override
    public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
    }

    private static <T> void checkTrivialOuterWindows(WindowedValue<KeyedWorkItem<String, T>> windowedKWI) {
        Collection outerWindows = windowedKWI.getWindows();
        if (!outerWindows.isEmpty()) {
            Preconditions.checkArgument(outerWindows.size() == 1, "The KeyedWorkItem itself must not be in multiple windows, but was in: %s", (Object)outerWindows);
            BoundedWindow onlyWindow = (BoundedWindow)Iterables.getOnlyElement(outerWindows);
            Preconditions.checkArgument(onlyWindow instanceof GlobalWindow, "KeyedWorkItem must be in the Global window, but was in: %s", (Object)onlyWindow);
        }
    }

    private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<String, T> kwi) {
        if (Iterables.isEmpty(kwi.elementsIterable())) {
            TimerInternals.TimerData timer = Iterables.getOnlyElement(kwi.timersIterable());
            return ((StateNamespaces.WindowNamespace)timer.getNamespace()).getWindow();
        }
        WindowedValue<T> value = Iterables.getOnlyElement(kwi.elementsIterable());
        return (BoundedWindow)Iterables.getOnlyElement(value.getWindows());
    }

    private boolean isReady(BoundedWindow mainInputWindow) {
        for (PCollectionView<?> view : this.views) {
            BoundedWindow sideInputWindow;
            if (this.sideInputReader.isReady(view, sideInputWindow = view.getWindowMappingFn().getSideInputWindow(mainInputWindow))) continue;
            return false;
        }
        return true;
    }
}

