/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.fn.harness.MapFnRunners;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;

class AssignWindowsRunner<T, W extends BoundedWindow> {
    private final WindowFn<T, W> windowFn;

    static <T, W extends BoundedWindow> AssignWindowsRunner<T, W> create(WindowFn<? super T, W> windowFn) {
        WindowFn<? super T, W> typedWindowFn = windowFn;
        return new AssignWindowsRunner<T, W>(typedWindowFn);
    }

    private AssignWindowsRunner(WindowFn<T, W> windowFn) {
        this.windowFn = windowFn;
    }

    WindowedValue<T> assignWindows(final WindowedValue<T> input) throws Exception {
        WindowFn<T, W> windowFn = this.windowFn;
        Objects.requireNonNull(windowFn);
        WindowFn.AssignContext ctxt = new WindowFn.AssignContext(windowFn){

            @Override
            public T element() {
                return input.getValue();
            }

            @Override
            public Instant timestamp() {
                return input.getTimestamp();
            }

            @Override
            public BoundedWindow window() {
                return Iterables.getOnlyElement(input.getWindows());
            }
        };
        Collection<W> windows = this.windowFn.assignWindows(ctxt);
        return WindowedValue.of(input.getValue(), input.getTimestamp(), windows, input.getPane());
    }

    @VisibleForTesting
    static class AssignWindowsMapFnFactory<T>
    implements MapFnRunners.WindowedValueMapFnFactory<T, T> {
        AssignWindowsMapFnFactory() {
        }

        @Override
        public ThrowingFunction<WindowedValue<T>, WindowedValue<T>> forPTransform(String ptransformId, RunnerApi.PTransform ptransform) throws IOException {
            Preconditions.checkArgument("beam:transform:window_into:v1".equals(ptransform.getSpec().getUrn()));
            Preconditions.checkArgument(ptransform.getInputsCount() == 1, "Expected only one input");
            Preconditions.checkArgument(ptransform.getOutputsCount() == 1, "Expected only one output");
            RunnerApi.WindowIntoPayload payload = RunnerApi.WindowIntoPayload.parseFrom(ptransform.getSpec().getPayload());
            WindowFn<?, ?> windowFn = WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn());
            return AssignWindowsRunner.create(windowFn)::assignWindows;
        }
    }

    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("beam:transform:window_into:v1", MapFnRunners.forWindowedValueMapFnFactory(new AssignWindowsMapFnFactory()));
        }
    }
}

