/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.fn.harness.AssignWindowsRunner;
import org.apache.beam.fn.harness.MapFnRunners;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactoryTestContext;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.WindowingStrategyTranslation;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class AssignWindowsRunnerTest
implements Serializable {
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();
    private transient AssignWindowsRunner.AssignWindowsMapFnFactory<?> factory = new AssignWindowsRunner.AssignWindowsMapFnFactory();

    @Test
    public void singleInputSingleOutputSucceeds() throws Exception {
        FixedWindows windowFn = FixedWindows.of((Duration)Duration.standardMinutes((long)10L));
        AssignWindowsRunner runner = AssignWindowsRunner.create((WindowFn)windowFn);
        MatcherAssert.assertThat(runner.assignWindows(WindowedValue.valueInGlobalWindow((Object)1)), Matchers.equalTo(WindowedValue.of((Object)1, (Instant)BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)windowFn.assignWindow(BoundedWindow.TIMESTAMP_MIN_VALUE), (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(runner.assignWindows(WindowedValue.of((Object)2, (Instant)new Instant(-10L), (BoundedWindow)new IntervalWindow(new Instant(-120000L), (ReadableDuration)Duration.standardMinutes((long)3L)), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING)), Matchers.equalTo(WindowedValue.of((Object)2, (Instant)new Instant(-10L), (BoundedWindow)windowFn.assignWindow(new Instant(-10L)), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING)));
    }

    @Test
    public void singleInputMultipleOutputSucceeds() throws Exception {
        SlidingWindows windowFn = SlidingWindows.of((Duration)Duration.standardMinutes((long)4L)).every(Duration.standardMinutes((long)2L));
        AssignWindowsRunner runner = AssignWindowsRunner.create((WindowFn)windowFn);
        IntervalWindow firstWindow = new IntervalWindow(new Instant(0L).minus((ReadableDuration)Duration.standardMinutes((long)4L)), (ReadableDuration)Duration.standardMinutes((long)4L));
        IntervalWindow secondWindow = new IntervalWindow(new Instant(0L).minus((ReadableDuration)Duration.standardMinutes((long)2L)), (ReadableDuration)Duration.standardMinutes((long)4L));
        IntervalWindow thirdWindow = new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.standardMinutes((long)4L));
        WindowedValue firstValue = WindowedValue.timestampedValueInGlobalWindow((Object)-3, (Instant)new Instant(-12L));
        MatcherAssert.assertThat(runner.assignWindows(firstValue), Matchers.equalTo(WindowedValue.of((Object)-3, (Instant)new Instant(-12L), (Collection)ImmutableSet.of((Object)firstWindow, (Object)secondWindow), (PaneInfo)firstValue.getPane())));
        WindowedValue secondValue = WindowedValue.of((Object)3, (Instant)new Instant(12L), (BoundedWindow)new IntervalWindow(new Instant(-12L), (ReadableDuration)Duration.standardMinutes((long)24L)), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING);
        MatcherAssert.assertThat(runner.assignWindows(secondValue), Matchers.equalTo(WindowedValue.of((Object)3, (Instant)new Instant(12L), (Collection)ImmutableSet.of((Object)secondWindow, (Object)thirdWindow), (PaneInfo)secondValue.getPane())));
    }

    @Test
    public void multipleInputWindowsAsMapFnSucceeds() throws Exception {
        WindowFn<Object, BoundedWindow> windowFn = new WindowFn<Object, BoundedWindow>(){

            public Collection<BoundedWindow> assignWindows(WindowFn.AssignContext c) {
                c.window();
                return ImmutableSet.of((Object)GlobalWindow.INSTANCE, (Object)new IntervalWindow(new Instant(-500L), (ReadableDuration)Duration.standardMinutes((long)3L)));
            }

            public void mergeWindows(WindowFn.MergeContext c) {
                throw new UnsupportedOperationException();
            }

            public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
                throw new UnsupportedOperationException();
            }

            public boolean isCompatible(WindowFn<?, ?> other) {
                throw new UnsupportedOperationException();
            }

            public Coder<BoundedWindow> windowCoder() {
                throw new UnsupportedOperationException();
            }
        };
        SdkComponents components = SdkComponents.create();
        components.registerEnvironment(Environments.createDockerEnvironment((String)"java"));
        RunnerApi.PCollection pCollection = RunnerApi.PCollection.newBuilder().setUniqueName("input").setCoderId("coder-id").build();
        RunnerApi.Coder coder = CoderTranslation.toProto((Coder)VarIntCoder.of()).getCoder();
        PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder("ptransform", RunnerApi.PTransform.newBuilder().putInputs("in", "input").putOutputs("out", "output").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:window_into:v1").setPayload(RunnerApi.WindowIntoPayload.newBuilder().setWindowFn(WindowingStrategyTranslation.toProto((WindowFn)windowFn, (SdkComponents)components)).build().toByteString())).build()).components(RunnerApi.Components.newBuilder().putAllPcollections(Collections.singletonMap("input", pCollection)).putAllCoders(Collections.singletonMap("coder-id", coder)).build()).build();
        ArrayList outputs = new ArrayList();
        context.addPCollectionConsumer("output", outputs::add);
        MapFnRunners.forWindowedValueMapFnFactory((MapFnRunners.WindowedValueMapFnFactory)new AssignWindowsRunner.AssignWindowsMapFnFactory()).createRunnerForPTransform((PTransformRunnerFactory.Context)context);
        WindowedValue value = WindowedValue.of((Object)2, (Instant)new Instant(-10L), (Collection)ImmutableList.of((Object)new IntervalWindow(new Instant(-22L), (ReadableDuration)Duration.standardMinutes((long)5L)), (Object)new IntervalWindow(new Instant(-120000L), (ReadableDuration)Duration.standardMinutes((long)3L))), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING);
        context.getPCollectionConsumer("input").accept((Object)value);
        MatcherAssert.assertThat(outputs, Matchers.containsInAnyOrder(WindowedValue.of((Object)2, (Instant)new Instant(-10L), (Collection)ImmutableSet.of((Object)GlobalWindow.INSTANCE, (Object)new IntervalWindow(new Instant(-500L), (ReadableDuration)Duration.standardMinutes((long)3L))), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.of((Object)2, (Instant)new Instant(-10L), (Collection)ImmutableSet.of((Object)GlobalWindow.INSTANCE, (Object)new IntervalWindow(new Instant(-500L), (ReadableDuration)Duration.standardMinutes((long)3L))), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING)));
    }

    @Test
    public void multipleInputWindowsThrows() throws Exception {
        WindowFn<Object, BoundedWindow> windowFn = new WindowFn<Object, BoundedWindow>(){

            public Collection<BoundedWindow> assignWindows(WindowFn.AssignContext c) throws Exception {
                return Collections.singleton(c.window());
            }

            public void mergeWindows(WindowFn.MergeContext c) throws Exception {
                throw new UnsupportedOperationException();
            }

            public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
                throw new UnsupportedOperationException();
            }

            public boolean isCompatible(WindowFn<?, ?> other) {
                throw new UnsupportedOperationException();
            }

            public Coder<BoundedWindow> windowCoder() {
                throw new UnsupportedOperationException();
            }
        };
        AssignWindowsRunner runner = AssignWindowsRunner.create((WindowFn)windowFn);
        this.thrown.expect(IllegalArgumentException.class);
        runner.assignWindows(WindowedValue.of((Object)2, (Instant)new Instant(-10L), (Collection)ImmutableList.of((Object)new IntervalWindow(new Instant(-22L), (ReadableDuration)Duration.standardMinutes((long)5L)), (Object)new IntervalWindow(new Instant(-120000L), (ReadableDuration)Duration.standardMinutes((long)3L))), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING));
    }

    @Test
    public void factoryCreatesFromJavaWindowFn() throws Exception {
        SdkComponents components = SdkComponents.create();
        components.registerEnvironment(Environments.createDockerEnvironment((String)"java"));
        RunnerApi.PTransform windowPTransform = RunnerApi.PTransform.newBuilder().putInputs("in", "input").putOutputs("out", "output").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:window_into:v1").setPayload(RunnerApi.WindowIntoPayload.newBuilder().setWindowFn(WindowingStrategyTranslation.toProto((WindowFn)new TestWindowFn(), (SdkComponents)components)).build().toByteString()).build()).build();
        ThrowingFunction fn = this.factory.forPTransform("transform", windowPTransform);
        MatcherAssert.assertThat((WindowedValue)fn.apply((Object)WindowedValue.of((Object)22L, (Instant)new Instant(5L), (BoundedWindow)new IntervalWindow(new Instant(0L), new Instant(20027L)), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING)), Matchers.equalTo(WindowedValue.of((Object)22L, (Instant)new Instant(5L), (BoundedWindow)new TestWindowFn().assignWindow(new Instant(5L)), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING)));
    }

    @Test
    public void factoryCreatesFromKnownWindowFn() throws Exception {
        SdkComponents components = SdkComponents.create();
        components.registerEnvironment(Environments.createDockerEnvironment((String)"java"));
        RunnerApi.PTransform windowPTransform = RunnerApi.PTransform.newBuilder().putInputs("in", "input").putOutputs("out", "output").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:window_into:v1").setPayload(RunnerApi.WindowIntoPayload.newBuilder().setWindowFn(WindowingStrategyTranslation.toProto((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)12L)), (SdkComponents)components)).build().toByteString()).build()).build();
        ThrowingFunction fn = this.factory.forPTransform("transform", windowPTransform);
        WindowedValue output = (WindowedValue)fn.apply((Object)WindowedValue.of((Object)22L, (Instant)new Instant(5L), (BoundedWindow)new IntervalWindow(new Instant(0L), new Instant(20027L)), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING));
        MatcherAssert.assertThat(output, Matchers.equalTo(WindowedValue.of((Object)22L, (Instant)new Instant(5L), (BoundedWindow)new IntervalWindow(new Instant(5L), (ReadableDuration)Duration.standardMinutes((long)12L)), (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING)));
    }

    private static class TestWindowFn
    extends PartitioningWindowFn<Object, IntervalWindow> {
        private TestWindowFn() {
        }

        public IntervalWindow assignWindow(Instant timestamp) {
            return new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp());
        }

        public boolean isCompatible(WindowFn<?, ?> other) {
            return ((Object)((Object)this)).equals(other);
        }

        public Coder<IntervalWindow> windowCoder() {
            return IntervalWindow.IntervalWindowCoder.of();
        }
    }
}

