/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.FnApiDoFnRunner;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactoryTestContext;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.ParDoTranslation;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.util.construction.RehydratedComponents;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.Timer;
import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.PeriodFormat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=Enclosed.class)
public class FnApiDoFnRunnerTest
implements Serializable {

    @RunWith(value=JUnit4.class)
    public static class SplitTest {
        @Rule
        public final ExpectedException expected = ExpectedException.none();
        private IntervalWindow window1;
        private IntervalWindow window2;
        private IntervalWindow window3;
        private WindowedValue<String> currentElement;
        private OffsetRange currentRestriction;
        private Instant currentWatermarkEstimatorState;
        private Instant initialWatermark;
        KV<Instant, Instant> watermarkAndState;
        private static final String PROCESS_TRANSFORM_ID = "processPTransformId";
        private static final String TRUNCATE_TRANSFORM_ID = "truncatePTransformId";
        private static final String PROCESS_INPUT_ID = "processInputId";
        private static final String TRUNCATE_INPUT_ID = "truncateInputId";
        private static final String PROCESS_OUTPUT_ID = "processOutputId";
        private static final String TRUNCATE_OUTPUT_ID = "truncateOutputId";

        private KV<WindowedValue, WindowedValue> createSplitInWindow(OffsetRange primaryRestriction, OffsetRange residualRestriction, BoundedWindow window) {
            return KV.of((Object)WindowedValue.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)primaryRestriction, (Object)this.currentWatermarkEstimatorState)), (Instant)this.currentElement.getTimestamp(), (BoundedWindow)window, (PaneInfo)this.currentElement.getPane()), (Object)WindowedValue.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)residualRestriction, (Object)((Instant)this.watermarkAndState.getValue()))), (Instant)this.currentElement.getTimestamp(), (BoundedWindow)window, (PaneInfo)this.currentElement.getPane()));
        }

        private KV<WindowedValue, WindowedValue> createSplitAcrossWindows(List<BoundedWindow> primaryWindows, List<BoundedWindow> residualWindows) {
            return KV.of(primaryWindows.isEmpty() ? null : WindowedValue.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)this.currentRestriction, (Object)this.currentWatermarkEstimatorState)), (Instant)this.currentElement.getTimestamp(), primaryWindows, (PaneInfo)this.currentElement.getPane()), residualWindows.isEmpty() ? null : WindowedValue.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)this.currentRestriction, (Object)this.currentWatermarkEstimatorState)), (Instant)this.currentElement.getTimestamp(), residualWindows, (PaneInfo)this.currentElement.getPane()));
        }

        private KV<WindowedValue, WindowedValue> createSplitWithSizeInWindow(OffsetRange primaryRestriction, OffsetRange residualRestriction, BoundedWindow window) {
            return KV.of((Object)WindowedValue.of((Object)KV.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)primaryRestriction, (Object)this.currentWatermarkEstimatorState)), (Object)(primaryRestriction.getTo() - primaryRestriction.getFrom())), (Instant)this.currentElement.getTimestamp(), (BoundedWindow)window, (PaneInfo)this.currentElement.getPane()), (Object)WindowedValue.of((Object)KV.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)residualRestriction, (Object)((Instant)this.watermarkAndState.getValue()))), (Object)(residualRestriction.getTo() - residualRestriction.getFrom())), (Instant)this.currentElement.getTimestamp(), (BoundedWindow)window, (PaneInfo)this.currentElement.getPane()));
        }

        private KV<WindowedValue, WindowedValue> createSplitWithSizeAcrossWindows(List<BoundedWindow> primaryWindows, List<BoundedWindow> residualWindows) {
            return KV.of(primaryWindows.isEmpty() ? null : WindowedValue.of((Object)KV.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)this.currentRestriction, (Object)this.currentWatermarkEstimatorState)), (Object)(this.currentRestriction.getTo() - this.currentRestriction.getFrom())), (Instant)this.currentElement.getTimestamp(), primaryWindows, (PaneInfo)this.currentElement.getPane()), residualWindows.isEmpty() ? null : WindowedValue.of((Object)KV.of((Object)KV.of((Object)((String)this.currentElement.getValue()), (Object)KV.of((Object)this.currentRestriction, (Object)this.currentWatermarkEstimatorState)), (Object)(this.currentRestriction.getTo() - this.currentRestriction.getFrom())), (Instant)this.currentElement.getTimestamp(), residualWindows, (PaneInfo)this.currentElement.getPane()));
        }

        @Before
        public void setUp() {
            this.window1 = new IntervalWindow(Instant.ofEpochMilli((long)0L), Instant.ofEpochMilli((long)10L));
            this.window2 = new IntervalWindow(Instant.ofEpochMilli((long)10L), Instant.ofEpochMilli((long)20L));
            this.window3 = new IntervalWindow(Instant.ofEpochMilli((long)20L), Instant.ofEpochMilli((long)30L));
            this.currentElement = WindowedValue.of((Object)"a", (Instant)Instant.ofEpochMilli((long)57L), (Collection)ImmutableList.of((Object)this.window1, (Object)this.window2, (Object)this.window3), (PaneInfo)PaneInfo.NO_FIRING);
            this.currentRestriction = new OffsetRange(0L, 100L);
            this.currentWatermarkEstimatorState = Instant.ofEpochMilli((long)21L);
            this.initialWatermark = Instant.ofEpochMilli((long)25L);
            this.watermarkAndState = KV.of((Object)Instant.ofEpochMilli((long)42L), (Object)Instant.ofEpochMilli((long)42L));
        }

        @Test
        public void testScaledProgress() throws Exception {
            RestrictionTracker.Progress elementProgress = RestrictionTracker.Progress.from((double)2.0, (double)8.0);
            RestrictionTracker.Progress scaledResult = FnApiDoFnRunner.scaleProgress((RestrictionTracker.Progress)elementProgress, (int)0, (int)1);
            Assert.assertEquals(2.0, scaledResult.getWorkCompleted(), 0.0);
            Assert.assertEquals(8.0, scaledResult.getWorkRemaining(), 0.0);
            scaledResult = FnApiDoFnRunner.scaleProgress((RestrictionTracker.Progress)elementProgress, (int)0, (int)3);
            Assert.assertEquals(2.0, scaledResult.getWorkCompleted(), 0.0);
            Assert.assertEquals(28.0, scaledResult.getWorkRemaining(), 0.0);
            scaledResult = FnApiDoFnRunner.scaleProgress((RestrictionTracker.Progress)elementProgress, (int)1, (int)3);
            Assert.assertEquals(12.0, scaledResult.getWorkCompleted(), 0.0);
            Assert.assertEquals(18.0, scaledResult.getWorkRemaining(), 0.0);
            scaledResult = FnApiDoFnRunner.scaleProgress((RestrictionTracker.Progress)elementProgress, (int)2, (int)3);
            Assert.assertEquals(22.0, scaledResult.getWorkCompleted(), 0.0);
            Assert.assertEquals(8.0, scaledResult.getWorkRemaining(), 0.0);
        }

        @Test
        public void testComputeSplitForProcessOrTruncateWithNullTrackerAndSplitDelegate() throws Exception {
            this.expected.expect(IllegalArgumentException.class);
            FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)ImmutableList.copyOf((Collection)this.currentElement.getWindows()), (Object)this.currentWatermarkEstimatorState, (double)0.0, null, null, null, (int)0, (int)3);
        }

        @Test
        public void testComputeSplitForProcessOrTruncateWithNotNullTrackerAndDelegate() throws Exception {
            this.expected.expect(IllegalArgumentException.class);
            FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)ImmutableList.copyOf((Collection)this.currentElement.getWindows()), (Object)this.currentWatermarkEstimatorState, (double)0.0, (RestrictionTracker)new OffsetRangeTracker(this.currentRestriction), (HandlesSplits)this.createSplitDelegate(0.3, 0.0, null), null, (int)0, (int)3);
        }

        @Test
        public void testComputeSplitForProcessOrTruncateWithInvalidWatermarkAndState() throws Exception {
            this.expected.expect(NullPointerException.class);
            FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)ImmutableList.copyOf((Collection)this.currentElement.getWindows()), (Object)this.currentWatermarkEstimatorState, (double)0.0, (RestrictionTracker)new OffsetRangeTracker(this.currentRestriction), null, null, (int)0, (int)3);
        }

        @Test
        public void testTrySplitForProcessCheckpointOnFirstWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedElementSplit = this.createSplitInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), (BoundedWindow)this.window1);
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of(), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessCheckpointOnFirstWindowAfterOneSplit() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)0, (int)2);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedElementSplit = this.createSplitInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), (BoundedWindow)this.window1);
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of(), (List<BoundedWindow>)ImmutableList.of((Object)this.window2));
            Assert.assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnFirstWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.2, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedElementSplit = this.createSplitInWindow(new OffsetRange(0L, 84L), new OffsetRange(84L, 100L), (BoundedWindow)this.window1);
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of(), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnMiddleWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window2, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.2, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)1, (int)3);
            Assert.assertEquals(2L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedElementSplit = this.createSplitInWindow(new OffsetRange(0L, 63L), new OffsetRange(63L, 100L), (BoundedWindow)this.window2);
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            Assert.assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnLastWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window3, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.2, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)2, (int)3);
            Assert.assertEquals(3L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedElementSplit = this.createSplitInWindow(new OffsetRange(0L, 44L), new OffsetRange(44L, 100L), (BoundedWindow)this.window3);
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1, (Object)this.window2), (List<BoundedWindow>)ImmutableList.of());
            Assert.assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnFirstWindowFallback() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(100L));
            Assert.assertNull(tracker.trySplit(0.0));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window3, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnLastWindowWhenNoElementSplit() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(100L));
            Assert.assertNull(tracker.trySplit(0.0));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window3, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)2, (int)3);
            Assert.assertNull(result);
        }

        @Test
        public void testTrySplitForProcessOnWindowBoundaryRoundUp() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window2, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.6, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)0, (int)3);
            Assert.assertEquals(2L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1, (Object)this.window2), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessOnWindowBoundaryRoundDown() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window2, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.3, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessOnWindowBoundaryRoundDownOnLastWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            OffsetRangeTracker tracker = new OffsetRangeTracker(this.currentRestriction);
            tracker.tryClaim(Long.valueOf(30L));
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window2, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.9, (RestrictionTracker)tracker, null, this.watermarkAndState, (int)0, (int)3);
            Assert.assertEquals(2L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1, (Object)this.window2), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        private HandlesSplits createSplitDelegate(final double progress, final double expectedFraction, final HandlesSplits.SplitResult result) {
            return new HandlesSplits(){

                public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
                    Preconditions.checkArgument((fractionOfRemainder == expectedFraction ? 1 : 0) != 0);
                    return result;
                }

                public double getProgress() {
                    return progress;
                }
            };
        }

        @Test
        public void testTrySplitForTruncateCheckpointOnFirstWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult splitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.0, splitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, null, (HandlesSplits)splitDelegate, null, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of(), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertEquals(splitResult, result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateCheckpointOnFirstWindowAfterOneSplit() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult splitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.0, splitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, null, (HandlesSplits)splitDelegate, null, (int)0, (int)2);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of(), (List<BoundedWindow>)ImmutableList.of((Object)this.window2));
            Assert.assertEquals(splitResult, result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnFirstWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult splitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.54, splitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.2, null, (HandlesSplits)splitDelegate, null, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of(), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertEquals(splitResult, result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnMiddleWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult splitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.34, splitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.2, null, (HandlesSplits)splitDelegate, null, (int)1, (int)3);
            Assert.assertEquals(2L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            Assert.assertEquals(splitResult, result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnLastWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult splitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.2, splitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.2, null, (HandlesSplits)splitDelegate, null, (int)2, (int)3);
            Assert.assertEquals(3L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1, (Object)this.window2), (List<BoundedWindow>)ImmutableList.of());
            Assert.assertEquals(splitResult, result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnFirstWindowFallback() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult unusedSplitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(1.0, 0.0, unusedSplitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, null, (HandlesSplits)splitDelegate, null, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertNull(result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnLastWindowWhenNoElementSplit() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits splitDelegate = this.createSplitDelegate(1.0, 0.0, null);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.0, null, (HandlesSplits)splitDelegate, null, (int)2, (int)3);
            Assert.assertNull(result);
        }

        @Test
        public void testTrySplitForTruncateOnWindowBoundaryRoundUp() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult unusedSplitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.0, unusedSplitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.6, null, (HandlesSplits)splitDelegate, null, (int)0, (int)3);
            Assert.assertEquals(2L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1, (Object)this.window2), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            Assert.assertNull(result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateOnWindowBoundaryRoundDown() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult unusedSplitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.0, unusedSplitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.3, null, (HandlesSplits)splitDelegate, null, (int)0, (int)3);
            Assert.assertEquals(1L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            Assert.assertNull(result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateOnWindowBoundaryRoundDownOnLastWindow() throws Exception {
            ImmutableList windows = ImmutableList.copyOf((Collection)this.currentElement.getWindows());
            HandlesSplits.SplitResult unusedSplitResult = HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            HandlesSplits splitDelegate = this.createSplitDelegate(0.3, 0.0, unusedSplitResult);
            FnApiDoFnRunner.SplitResultsWithStopIndex result = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, (Object)this.currentRestriction, (BoundedWindow)this.window1, (List)windows, (Object)this.currentWatermarkEstimatorState, (double)0.6, null, (HandlesSplits)splitDelegate, null, (int)0, (int)3);
            Assert.assertEquals(2L, result.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> expectedWindowSplit = this.createSplitAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1, (Object)this.window2), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            Assert.assertNull(result.getDownstreamSplit());
            Assert.assertNull(result.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(result.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(expectedWindowSplit.getKey(), result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(expectedWindowSplit.getValue(), result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testConstructSplitResultWithInvalidElementSplits() throws Exception {
            this.expected.expect(IllegalArgumentException.class);
            FnApiDoFnRunner.constructSplitResult((FnApiDoFnRunner.WindowedSplitResult)FnApiDoFnRunner.WindowedSplitResult.forRoots(null, (WindowedValue)WindowedValue.valueInGlobalWindow((Object)"elementPrimary"), (WindowedValue)WindowedValue.valueInGlobalWindow((Object)"elementResidual"), null), (HandlesSplits.SplitResult)HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.getDefaultInstance()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.getDefaultInstance())), (Coder)WindowedValue.getFullCoder((Coder)VoidCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (Instant)Instant.now(), null, (String)"ptransformId", (String)"inputId", (Collection)ImmutableList.of((Object)"outputId"), null);
        }

        private Coder getFullInputCoder(Coder elementCoder, Coder restrictionCoder, Coder watermarkStateCoder, Coder windowCoder) {
            KvCoder inputCoder = KvCoder.of((Coder)KvCoder.of((Coder)elementCoder, (Coder)KvCoder.of((Coder)restrictionCoder, (Coder)watermarkStateCoder)), (Coder)DoubleCoder.of());
            return WindowedValue.getFullCoder((Coder)inputCoder, (Coder)windowCoder);
        }

        private HandlesSplits.SplitResult getProcessElementSplit(String transformId, String inputId) {
            return HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.newBuilder().setTransformId(transformId).setInputId(inputId).build()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setTransformId(transformId).setInputId(inputId).build()).setRequestedTimeDelay(Durations.fromMillis((long)1000L)).build()));
        }

        private Timestamp toTimestamp(Instant time) {
            return Timestamp.newBuilder().setSeconds(time.getMillis() / 1000L).setNanos((int)(time.getMillis() % 1000L) * 1000000).build();
        }

        @Test
        public void testConstructSplitResultWithElementSplitFromDelegate() throws Exception {
            Coder fullInputCoder = this.getFullInputCoder((Coder)StringUtf8Coder.of(), (Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of(), IntervalWindow.getCoder());
            HandlesSplits.SplitResult elementSplit = this.getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
            HandlesSplits.SplitResult result = FnApiDoFnRunner.constructSplitResult(null, (HandlesSplits.SplitResult)elementSplit, (Coder)fullInputCoder, null, null, (String)TRUNCATE_TRANSFORM_ID, (String)TRUNCATE_INPUT_ID, (Collection)ImmutableList.of((Object)TRUNCATE_OUTPUT_ID), null);
            Assert.assertEquals(elementSplit.getPrimaryRoots(), result.getPrimaryRoots());
            Assert.assertEquals(elementSplit.getResidualRoots(), result.getResidualRoots());
        }

        @Test
        public void testConstructSplitResultWithElementSplitFromTracker() throws Exception {
            Coder fullInputCoder = this.getFullInputCoder((Coder)StringUtf8Coder.of(), (Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> elementSplit = this.createSplitWithSizeInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), (BoundedWindow)this.window1);
            HandlesSplits.SplitResult result = FnApiDoFnRunner.constructSplitResult((FnApiDoFnRunner.WindowedSplitResult)FnApiDoFnRunner.WindowedSplitResult.forRoots(null, (WindowedValue)((WindowedValue)elementSplit.getKey()), (WindowedValue)((WindowedValue)elementSplit.getValue()), null), null, (Coder)fullInputCoder, null, this.watermarkAndState, (String)PROCESS_TRANSFORM_ID, (String)PROCESS_INPUT_ID, (Collection)ImmutableList.of((Object)PROCESS_OUTPUT_ID), (Duration)Duration.millis((long)100L));
            Assert.assertEquals(1L, result.getPrimaryRoots().size());
            BeamFnApi.BundleApplication primaryRoot = (BeamFnApi.BundleApplication)result.getPrimaryRoots().get(0);
            Assert.assertEquals(PROCESS_TRANSFORM_ID, primaryRoot.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, primaryRoot.getInputId());
            Assert.assertEquals(elementSplit.getKey(), fullInputCoder.decode(primaryRoot.getElement().newInput()));
            Assert.assertEquals(1L, result.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication residualRoot = (BeamFnApi.DelayedBundleApplication)result.getResidualRoots().get(0);
            Assert.assertEquals(Durations.fromMillis((long)100L), residualRoot.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, residualRoot.getApplication().getInputId());
            Assert.assertEquals(this.toTimestamp((Instant)this.watermarkAndState.getValue()), residualRoot.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(elementSplit.getValue(), fullInputCoder.decode(residualRoot.getApplication().getElement().newInput()));
        }

        @Test
        public void testConstructSplitResultWithOnlyWindowSplits() throws Exception {
            Coder fullInputCoder = this.getFullInputCoder((Coder)StringUtf8Coder.of(), (Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> windowSplit = this.createSplitWithSizeAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window2, (Object)this.window3));
            HandlesSplits.SplitResult result = FnApiDoFnRunner.constructSplitResult((FnApiDoFnRunner.WindowedSplitResult)FnApiDoFnRunner.WindowedSplitResult.forRoots((WindowedValue)((WindowedValue)windowSplit.getKey()), null, null, (WindowedValue)((WindowedValue)windowSplit.getValue())), null, (Coder)fullInputCoder, (Instant)this.initialWatermark, this.watermarkAndState, (String)PROCESS_TRANSFORM_ID, (String)PROCESS_INPUT_ID, (Collection)ImmutableList.of((Object)PROCESS_OUTPUT_ID), (Duration)Duration.millis((long)100L));
            Assert.assertEquals(1L, result.getPrimaryRoots().size());
            BeamFnApi.BundleApplication primaryRoot = (BeamFnApi.BundleApplication)result.getPrimaryRoots().get(0);
            Assert.assertEquals(PROCESS_TRANSFORM_ID, primaryRoot.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, primaryRoot.getInputId());
            Assert.assertEquals(windowSplit.getKey(), fullInputCoder.decode(primaryRoot.getElement().newInput()));
            Assert.assertEquals(1L, result.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication residualRoot = (BeamFnApi.DelayedBundleApplication)result.getResidualRoots().get(0);
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Duration.getDefaultInstance(), residualRoot.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, residualRoot.getApplication().getInputId());
            Assert.assertEquals(this.toTimestamp(this.initialWatermark), residualRoot.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(windowSplit.getValue(), fullInputCoder.decode(residualRoot.getApplication().getElement().newInput()));
        }

        @Test
        public void testConstructSplitResultWithElementAndWindowSplitFromProcess() throws Exception {
            Coder fullInputCoder = this.getFullInputCoder((Coder)StringUtf8Coder.of(), (Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> windowSplit = this.createSplitWithSizeAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            KV<WindowedValue, WindowedValue> elementSplit = this.createSplitWithSizeInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), (BoundedWindow)this.window2);
            HandlesSplits.SplitResult result = FnApiDoFnRunner.constructSplitResult((FnApiDoFnRunner.WindowedSplitResult)FnApiDoFnRunner.WindowedSplitResult.forRoots((WindowedValue)((WindowedValue)windowSplit.getKey()), (WindowedValue)((WindowedValue)elementSplit.getKey()), (WindowedValue)((WindowedValue)elementSplit.getValue()), (WindowedValue)((WindowedValue)windowSplit.getValue())), null, (Coder)fullInputCoder, (Instant)this.initialWatermark, this.watermarkAndState, (String)PROCESS_TRANSFORM_ID, (String)PROCESS_INPUT_ID, (Collection)ImmutableList.of((Object)PROCESS_OUTPUT_ID), (Duration)Duration.millis((long)100L));
            Assert.assertEquals(2L, result.getPrimaryRoots().size());
            BeamFnApi.BundleApplication windowPrimary = (BeamFnApi.BundleApplication)result.getPrimaryRoots().get(0);
            BeamFnApi.BundleApplication elementPrimary = (BeamFnApi.BundleApplication)result.getPrimaryRoots().get(1);
            Assert.assertEquals(PROCESS_TRANSFORM_ID, windowPrimary.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, windowPrimary.getInputId());
            Assert.assertEquals(windowSplit.getKey(), fullInputCoder.decode(windowPrimary.getElement().newInput()));
            Assert.assertEquals(PROCESS_TRANSFORM_ID, elementPrimary.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, elementPrimary.getInputId());
            Assert.assertEquals(elementSplit.getKey(), fullInputCoder.decode(elementPrimary.getElement().newInput()));
            Assert.assertEquals(2L, result.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication windowResidual = (BeamFnApi.DelayedBundleApplication)result.getResidualRoots().get(0);
            BeamFnApi.DelayedBundleApplication elementResidual = (BeamFnApi.DelayedBundleApplication)result.getResidualRoots().get(1);
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Duration.getDefaultInstance(), windowResidual.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, windowResidual.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, windowResidual.getApplication().getInputId());
            Assert.assertEquals(this.toTimestamp(this.initialWatermark), windowResidual.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(windowSplit.getValue(), fullInputCoder.decode(windowResidual.getApplication().getElement().newInput()));
            Assert.assertEquals(Durations.fromMillis((long)100L), elementResidual.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, elementResidual.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, elementResidual.getApplication().getInputId());
            Assert.assertEquals(this.toTimestamp((Instant)this.watermarkAndState.getValue()), elementResidual.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(elementSplit.getValue(), fullInputCoder.decode(elementResidual.getApplication().getElement().newInput()));
        }

        @Test
        public void testConstructSplitResultWithElementAndWindowSplitFromTruncate() throws Exception {
            Coder fullInputCoder = this.getFullInputCoder((Coder)StringUtf8Coder.of(), (Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> windowSplit = this.createSplitWithSizeAcrossWindows((List<BoundedWindow>)ImmutableList.of((Object)this.window1), (List<BoundedWindow>)ImmutableList.of((Object)this.window3));
            HandlesSplits.SplitResult elementSplit = this.getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
            HandlesSplits.SplitResult result = FnApiDoFnRunner.constructSplitResult((FnApiDoFnRunner.WindowedSplitResult)FnApiDoFnRunner.WindowedSplitResult.forRoots((WindowedValue)((WindowedValue)windowSplit.getKey()), null, null, (WindowedValue)((WindowedValue)windowSplit.getValue())), (HandlesSplits.SplitResult)elementSplit, (Coder)fullInputCoder, (Instant)this.initialWatermark, this.watermarkAndState, (String)TRUNCATE_TRANSFORM_ID, (String)TRUNCATE_INPUT_ID, (Collection)ImmutableList.of((Object)TRUNCATE_OUTPUT_ID), (Duration)Duration.millis((long)100L));
            Assert.assertEquals(2L, result.getPrimaryRoots().size());
            BeamFnApi.BundleApplication windowPrimary = (BeamFnApi.BundleApplication)result.getPrimaryRoots().get(0);
            BeamFnApi.BundleApplication elementPrimary = (BeamFnApi.BundleApplication)result.getPrimaryRoots().get(1);
            Assert.assertEquals(TRUNCATE_TRANSFORM_ID, windowPrimary.getTransformId());
            Assert.assertEquals(TRUNCATE_INPUT_ID, windowPrimary.getInputId());
            Assert.assertEquals(windowSplit.getKey(), fullInputCoder.decode(windowPrimary.getElement().newInput()));
            Assert.assertEquals(elementSplit.getPrimaryRoots().get(0), elementPrimary);
            Assert.assertEquals(2L, result.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication windowResidual = (BeamFnApi.DelayedBundleApplication)result.getResidualRoots().get(0);
            BeamFnApi.DelayedBundleApplication elementResidual = (BeamFnApi.DelayedBundleApplication)result.getResidualRoots().get(1);
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Duration.getDefaultInstance(), windowResidual.getRequestedTimeDelay());
            Assert.assertEquals(TRUNCATE_TRANSFORM_ID, windowResidual.getApplication().getTransformId());
            Assert.assertEquals(TRUNCATE_INPUT_ID, windowResidual.getApplication().getInputId());
            Assert.assertEquals(this.toTimestamp(this.initialWatermark), windowResidual.getApplication().getOutputWatermarksMap().get(TRUNCATE_OUTPUT_ID));
            Assert.assertEquals(windowSplit.getValue(), fullInputCoder.decode(windowResidual.getApplication().getElement().newInput()));
            Assert.assertEquals(elementSplit.getResidualRoots().get(0), elementResidual);
        }
    }

    @RunWith(value=JUnit4.class)
    public static class ExecutionTest
    implements Serializable {
        @Rule
        public transient ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();
        public static final String TEST_TRANSFORM_ID = "pTransformId";
        private static final MetricsApi.MonitoringInfo WORK_COMPLETED_MI = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.WORK_COMPLETED).setType("beam:metrics:progress:v1").putLabels("PTRANSFORM", "pTransformId").build();
        private static final MetricsApi.MonitoringInfo WORK_REMAINING_MI = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.WORK_REMAINING).setType("beam:metrics:progress:v1").putLabels("PTRANSFORM", "pTransformId").build();

        @Test
        public void testUsingUserState() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)KV.of((Object)"unused", (Object)"unused"), (Object[])new KV[0]));
            PCollection outputPCollection = (PCollection)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new TestStatefulDoFn()));
            SdkComponents sdkComponents = SdkComponents.create((PipelineOptions)p.getOptions());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.bagUserStateKey("value", "X"), Arrays.asList("X0"), (Object)this.bagUserStateKey("bag", "X"), Arrays.asList("X0"), (Object)this.bagUserStateKey("combine", "X"), Arrays.asList("X0")));
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"X1")));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"Y1")));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"X2")));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"Y2")));
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"value:X0"), WindowedValue.valueInGlobalWindow((Object)"bag:[X0]"), WindowedValue.valueInGlobalWindow((Object)"combine:X0"), WindowedValue.valueInGlobalWindow((Object)"value:null"), WindowedValue.valueInGlobalWindow((Object)"bag:[]"), WindowedValue.valueInGlobalWindow((Object)"combine:"), WindowedValue.valueInGlobalWindow((Object)"value:X1"), WindowedValue.valueInGlobalWindow((Object)"bag:[X0, X1]"), WindowedValue.valueInGlobalWindow((Object)"combine:X0X1"), WindowedValue.valueInGlobalWindow((Object)"value:Y1"), WindowedValue.valueInGlobalWindow((Object)"bag:[Y1]"), WindowedValue.valueInGlobalWindow((Object)"combine:Y1")));
            mainOutputValues.clear();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            Assert.assertEquals(new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.builder().put((Object)this.bagUserStateKey("value", "X"), Arrays.asList("X2")).put((Object)this.bagUserStateKey("bag", "X"), Arrays.asList("X0", "X1", "X2")).put((Object)this.bagUserStateKey("combine", "X"), Arrays.asList("X0X1X2")).put((Object)this.bagUserStateKey("value", "Y"), Arrays.asList("Y2")).put((Object)this.bagUserStateKey("bag", "Y"), Arrays.asList("Y1", "Y2")).put((Object)this.bagUserStateKey("combine", "Y"), Arrays.asList("Y1Y2")).build()).getData(), fakeClient.getData());
        }

        private BeamFnApi.StateKey bagUserStateKey(String userStateId, String key) throws IOException {
            return BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setTransformId(TEST_TRANSFORM_ID).setUserStateId(userStateId).setKey(this.encode(key)).setWindow(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)GlobalWindow.Coder.INSTANCE, (Object)GlobalWindow.INSTANCE)))).build();
        }

        @Test
        public void testProcessElementWithSideInputsAndOutputs() throws Exception {
            Pipeline p = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView defaultSingletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton().withDefaultValue((Object)"defaultSingletonValue"));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            PCollectionView iterableSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asIterable());
            TupleTag<String> mainOutput = new TupleTag<String>("main"){};
            TupleTag<String> additionalOutput = new TupleTag<String>("additional"){};
            PCollectionTuple outputPCollection = (PCollectionTuple)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new TestSideInputDoFn(defaultSingletonSideInputView, singletonSideInputView, iterableSideInputView, (TupleTag)additionalOutput)).withSideInputs(new PCollectionView[]{defaultSingletonSideInputView, singletonSideInputView, iterableSideInputView}).withOutputTags((TupleTag)mainOutput, TupleTagList.of((TupleTag)additionalOutput)));
            SdkComponents sdkComponents = SdkComponents.create((PipelineOptions)p.getOptions());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents, (boolean)true);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection.get((TupleTag)mainOutput));
            String additionalPCollectionId = sdkComponents.registerPCollection(outputPCollection.get((TupleTag)additionalOutput));
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID);
            ImmutableMap stateData = ImmutableMap.of((Object)this.iterableSideInputKey(singletonSideInputView.getTagInternal().getId()), Arrays.asList("singletonValue"), (Object)this.iterableSideInputKey(iterableSideInputView.getTagInternal().getId()), Arrays.asList("iterableValue1", "iterableValue2", "iterableValue3"));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData, 1000);
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            ArrayList additionalOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            context.addPCollectionConsumer(additionalPCollectionId, additionalOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId, additionalPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)"X"));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)"Y"));
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"X:defaultSingletonValue"), WindowedValue.valueInGlobalWindow((Object)"X:singletonValue"), WindowedValue.valueInGlobalWindow((Object)"X:iterableValue1"), WindowedValue.valueInGlobalWindow((Object)"X:iterableValue2"), WindowedValue.valueInGlobalWindow((Object)"X:iterableValue3"), WindowedValue.valueInGlobalWindow((Object)"Y:defaultSingletonValue"), WindowedValue.valueInGlobalWindow((Object)"Y:singletonValue"), WindowedValue.valueInGlobalWindow((Object)"Y:iterableValue1"), WindowedValue.valueInGlobalWindow((Object)"Y:iterableValue2"), WindowedValue.valueInGlobalWindow((Object)"Y:iterableValue3")));
            MatcherAssert.assertThat(additionalOutputValues, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"X:additional"), WindowedValue.valueInGlobalWindow((Object)"Y:additional")));
            mainOutputValues.clear();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            Assert.assertEquals(new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData).getData(), fakeClient.getData());
        }

        @Test
        public void testProcessElementWithNonWindowObservingOptimization() throws Exception {
            Pipeline p = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
            PCollection valuePCollection = (PCollection)((PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))));
            TupleTag<String> mainOutput = new TupleTag<String>("main"){};
            TupleTag<String> additionalOutput = new TupleTag<String>("additional"){};
            PCollectionTuple outputPCollection = (PCollectionTuple)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new TestNonWindowObservingDoFn((TupleTag)additionalOutput)).withOutputTags((TupleTag)mainOutput, TupleTagList.of((TupleTag)additionalOutput)));
            SdkComponents sdkComponents = SdkComponents.create((PipelineOptions)p.getOptions());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents, (boolean)true);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection.get((TupleTag)mainOutput));
            String additionalPCollectionId = sdkComponents.registerPCollection(outputPCollection.get((TupleTag)additionalOutput));
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID);
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            ArrayList additionalOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            context.addPCollectionConsumer(additionalPCollectionId, additionalOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId, additionalPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept(this.valueInWindows("X", (BoundedWindow)new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.standardMinutes((long)1L)), new BoundedWindow[]{new IntervalWindow(new Instant(10L), (ReadableDuration)Duration.standardMinutes((long)1L))}));
            mainInput.accept(this.valueInWindows("Y", (BoundedWindow)new IntervalWindow(new Instant(1000L), (ReadableDuration)Duration.standardMinutes((long)1L)), new BoundedWindow[]{new IntervalWindow(new Instant(1010L), (ReadableDuration)Duration.standardMinutes((long)1L))}));
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(this.valueInWindows("X:main", (BoundedWindow)new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.standardMinutes((long)1L)), new BoundedWindow[]{new IntervalWindow(new Instant(10L), (ReadableDuration)Duration.standardMinutes((long)1L))}), this.valueInWindows("Y:main", (BoundedWindow)new IntervalWindow(new Instant(1000L), (ReadableDuration)Duration.standardMinutes((long)1L)), new BoundedWindow[]{new IntervalWindow(new Instant(1010L), (ReadableDuration)Duration.standardMinutes((long)1L))})));
            MatcherAssert.assertThat(additionalOutputValues, Matchers.contains(this.valueInWindows("X:additional", (BoundedWindow)new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.standardMinutes((long)1L)), new BoundedWindow[]{new IntervalWindow(new Instant(10L), (ReadableDuration)Duration.standardMinutes((long)1L))}), this.valueInWindows("Y:additional", (BoundedWindow)new IntervalWindow(new Instant(1000L), (ReadableDuration)Duration.standardMinutes((long)1L)), new BoundedWindow[]{new IntervalWindow(new Instant(1010L), (ReadableDuration)Duration.standardMinutes((long)1L))})));
            mainOutputValues.clear();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testSideInputIsAccessibleForDownstreamCallers() throws Exception {
            FixedWindows windowFn = FixedWindows.of((Duration)Duration.millis((long)1L));
            IntervalWindow windowA = windowFn.assignWindow(new Instant(1L));
            IntervalWindow windowB = windowFn.assignWindow(new Instant(2L));
            ByteString encodedWindowA = ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)windowFn.windowCoder(), (Object)windowA));
            ByteString encodedWindowB = ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)windowFn.windowCoder(), (Object)windowB));
            Pipeline p = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
            PCollection valuePCollection = (PCollection)((PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]))).apply((PTransform)Window.into((WindowFn)windowFn));
            PCollectionView iterableSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asIterable());
            PCollection outputPCollection = (PCollection)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new TestSideInputIsAccessibleForDownstreamCallersDoFn(iterableSideInputView)).withSideInputs(new PCollectionView[]{iterableSideInputView}));
            SdkComponents sdkComponents = SdkComponents.create((PipelineOptions)p.getOptions());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents, (boolean)true);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            ImmutableMap stateData = ImmutableMap.of((Object)this.iterableSideInputKey(iterableSideInputView.getTagInternal().getId(), encodedWindowA), Arrays.asList("iterableValue1A", "iterableValue2A", "iterableValue3A"), (Object)this.iterableSideInputKey(iterableSideInputView.getTagInternal().getId(), encodedWindowB), Arrays.asList("iterableValue1B", "iterableValue2B", "iterableValue3B"));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData, 1000);
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer((String)Iterables.getOnlyElement(pTransform.getOutputsMap().values()), mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept(this.valueInWindows("X", (BoundedWindow)windowA, new BoundedWindow[0]));
            mainInput.accept(this.valueInWindows("Y", (BoundedWindow)windowB, new BoundedWindow[0]));
            MatcherAssert.assertThat(mainOutputValues, Matchers.hasSize(2));
            MatcherAssert.assertThat((Iterable)((WindowedValue)mainOutputValues.get(0)).getValue(), Matchers.contains("iterableValue1A", "iterableValue2A", "iterableValue3A"));
            MatcherAssert.assertThat((Iterable)((WindowedValue)mainOutputValues.get(1)).getValue(), Matchers.contains("iterableValue1B", "iterableValue2B", "iterableValue3B"));
            mainOutputValues.clear();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            Assert.assertEquals(new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData).getData(), fakeClient.getData());
        }

        public MetricUpdates.MetricUpdate create(String stepName, MetricName name, long value) {
            return MetricUpdates.MetricUpdate.create(MetricKey.create((String)stepName, (MetricName)name), value);
        }

        @Test
        @Ignore(value="https://github.com/apache/beam/issues/20872")
        public void testUsingMetrics() throws Exception {
            MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
            MetricsContainerImpl metricsContainer = metricsContainerRegistry.getUnboundContainer();
            Closeable closeable = MetricsEnvironment.scopedMetricsContainer((MetricsContainer)metricsContainer);
            FixedWindows windowFn = FixedWindows.of((Duration)Duration.millis((long)1L));
            IntervalWindow windowA = windowFn.assignWindow(new Instant(1L));
            IntervalWindow windowB = windowFn.assignWindow(new Instant(2L));
            ByteString encodedWindowA = ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)windowFn.windowCoder(), (Object)windowA));
            ByteString encodedWindowB = ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)windowFn.windowCoder(), (Object)windowB));
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)((PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]))).apply((PTransform)Window.into((WindowFn)windowFn));
            PCollectionView iterableSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asIterable());
            PCollection outputPCollection = (PCollection)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new TestSideInputIsAccessibleForDownstreamCallersDoFn(iterableSideInputView)).withSideInputs(new PCollectionView[]{iterableSideInputView}));
            SdkComponents sdkComponents = SdkComponents.create((PipelineOptions)p.getOptions());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents, (boolean)true);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            ImmutableMap stateData = ImmutableMap.of((Object)this.iterableSideInputKey(iterableSideInputView.getTagInternal().getId(), encodedWindowA), Arrays.asList("iterableValue1A", "iterableValue2A", "iterableValue3A"), (Object)this.iterableSideInputKey(iterableSideInputView.getTagInternal().getId(), encodedWindowB), Arrays.asList("iterableValue1B", "iterableValue2B", "iterableValue3B"));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData);
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer((String)Iterables.getOnlyElement(pTransform.getOutputsMap().values()), mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept(this.valueInWindows("X", (BoundedWindow)windowA, new BoundedWindow[0]));
            mainInput.accept(this.valueInWindows("Y", (BoundedWindow)windowB, new BoundedWindow[0]));
            mainOutputValues.clear();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ArrayList<MetricsApi.MonitoringInfo> expected = new ArrayList<MetricsApi.MonitoringInfo>();
            SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
            builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
            builder.setLabel("PCOLLECTION", "Window.Into()/Window.Assign.out");
            builder.setInt64SumValue(2L);
            expected.add(builder.build());
            builder = new SimpleMonitoringInfoBuilder();
            builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
            builder.setLabel("PCOLLECTION", "pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
            builder.setInt64SumValue(2L);
            expected.add(builder.build());
            builder = new SimpleMonitoringInfoBuilder();
            builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", TestSideInputIsAccessibleForDownstreamCallersDoFn.class.getName()).setLabel("NAME", "userCountedElems");
            builder.setLabel("PTRANSFORM", TEST_TRANSFORM_ID);
            builder.setInt64SumValue(2L);
            expected.add(builder.build());
            builder = new SimpleMonitoringInfoBuilder();
            builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
            builder.setLabel("PCOLLECTION", "Window.Into()/Window.Assign.out");
            builder.setInt64DistributionValue(DistributionData.create(4L, 2L, 2L, 2L));
            expected.add(builder.build());
            builder = new SimpleMonitoringInfoBuilder();
            builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
            builder.setLabel("PCOLLECTION", "pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
            builder.setInt64DistributionValue(DistributionData.create(10L, 2L, 5L, 5L));
            expected.add(builder.build());
            closeable.close();
            ArrayList<MetricsApi.MonitoringInfo> result = new ArrayList<MetricsApi.MonitoringInfo>();
            for (MetricsApi.MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
                result.add(mi);
            }
            MatcherAssert.assertThat(result, Matchers.containsInAnyOrder(expected.toArray()));
        }

        @Test
        public void testTimers() throws Exception {
            this.dateTimeProvider.setDateTimeFixed(10000L);
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)KV.of((Object)"unused", (Object)"unused"), (Object[])new KV[0]));
            PCollection outputPCollection = (PCollection)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new TestTimerfulDoFn()));
            SdkComponents sdkComponents = SdkComponents.create();
            sdkComponents.registerEnvironment(RunnerApi.Environment.getDefaultInstance());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0)).toBuilder().build();
            FakeBeamFnStateClient fakeStateClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.bagUserStateKey("bag", "X"), Arrays.asList("X0"), (Object)this.bagUserStateKey("bag", "A"), Arrays.asList("A0"), (Object)this.bagUserStateKey("bag", "C"), Arrays.asList("C0")));
            ArrayList mainOutputValues = new ArrayList();
            TestBeamFnDataOutboundAggregator aggregator = new TestBeamFnDataOutboundAggregator(() -> "57L");
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeStateClient).processBundleInstructionId("57L").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).outboundAggregators((Map<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator>)ImmutableMap.of((Object)Endpoints.ApiServiceDescriptor.getDefaultInstance(), (Object)((Object)aggregator))).timerApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            LogicalEndpoint eventTimer = LogicalEndpoint.timer((String)"57L", (String)TEST_TRANSFORM_ID, (String)"ts-event");
            LogicalEndpoint processingTimer = LogicalEndpoint.timer((String)"57L", (String)TEST_TRANSFORM_ID, (String)"ts-processing");
            LogicalEndpoint eventFamilyTimer = LogicalEndpoint.timer((String)"57L", (String)TEST_TRANSFORM_ID, (String)"tfs-event-family");
            LogicalEndpoint processingFamilyTimer = LogicalEndpoint.timer((String)"57L", (String)TEST_TRANSFORM_ID, (String)"tfs-processing-family");
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept((Object)WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"X", (Object)"X1"), (Instant)new Instant(1000L)));
            mainInput.accept((Object)WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"Y1"), (Instant)new Instant(1100L)));
            mainInput.accept((Object)WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"X", (Object)"X2"), (Instant)new Instant(1200L)));
            mainInput.accept((Object)WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"Y2"), (Instant)new Instant(1300L)));
            context.getIncomingTimerEndpoint(eventTimer.getTimerFamilyId()).getReceiver().accept(this.timerInGlobalWindow("A", new Instant(1400L), new Instant(2400L)));
            context.getIncomingTimerEndpoint(eventTimer.getTimerFamilyId()).getReceiver().accept(this.timerInGlobalWindow("B", new Instant(1500L), new Instant(2500L)));
            context.getIncomingTimerEndpoint(eventTimer.getTimerFamilyId()).getReceiver().accept(this.timerInGlobalWindow("A", new Instant(1600L), new Instant(2600L)));
            context.getIncomingTimerEndpoint(processingTimer.getTimerFamilyId()).getReceiver().accept(this.timerInGlobalWindow("X", new Instant(1700L), new Instant(2700L)));
            context.getIncomingTimerEndpoint(processingTimer.getTimerFamilyId()).getReceiver().accept(this.timerInGlobalWindow("C", new Instant(1800L), new Instant(2800L)));
            context.getIncomingTimerEndpoint(processingTimer.getTimerFamilyId()).getReceiver().accept(this.timerInGlobalWindow("B", new Instant(1500L), new Instant(10032L)));
            context.getIncomingTimerEndpoint(eventFamilyTimer.getTimerFamilyId()).getReceiver().accept(this.dynamicTimerInGlobalWindow("B", "event-timer2", new Instant(2000L), new Instant(1650L)));
            context.getIncomingTimerEndpoint(processingFamilyTimer.getTimerFamilyId()).getReceiver().accept(this.dynamicTimerInGlobalWindow("Y", "processing-timer2", new Instant(2100L), new Instant(3100L)));
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.timestampedValueInGlobalWindow((Object)"key:X mainX[X0]", (Instant)new Instant(1000L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:Y mainY[]", (Instant)new Instant(1100L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:X mainX[X0, X1]", (Instant)new Instant(1200L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:Y mainY[Y1]", (Instant)new Instant(1300L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:A event[A0]", (Instant)new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:B event[]", (Instant)new Instant(1500L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:A event[A0, event]", (Instant)new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:A event[A0, event, event]", (Instant)new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:A event[A0, event, event, event]", (Instant)new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:A event[A0, event, event, event, event]", (Instant)new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:A event[A0, event, event, event, event, event]", (Instant)new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:A event[A0, event, event, event, event, event, event]", (Instant)new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:C processing[C0]", (Instant)new Instant(1800L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:B processing[event]", (Instant)new Instant(1500L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:B event[event, processing]", (Instant)new Instant(1500L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:B event[event, processing, event]", (Instant)new Instant(1500L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:B event[event, processing, event, event]", (Instant)new Instant(1500L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:B event-family[event, processing, event, event, event]", (Instant)new Instant(2000L)), WindowedValue.timestampedValueInGlobalWindow((Object)"key:Y processing-family[Y1, Y2]", (Instant)new Instant(2100L))));
            mainOutputValues.clear();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(aggregator.getOutputTimers().get(eventTimer), Matchers.contains(this.clearedTimerInGlobalWindow("X"), this.timerInGlobalWindow("Y", new Instant(2100L), new Instant(2181L)), this.timerInGlobalWindow("A", new Instant(1400L), new Instant(2617L)), this.timerInGlobalWindow("B", new Instant(2000L), new Instant(2071L)), this.timerInGlobalWindow("C", new Instant(1800L), new Instant(1861L))));
            MatcherAssert.assertThat(aggregator.getOutputTimers().get(processingTimer), Matchers.contains(this.clearedTimerInGlobalWindow("X"), this.timerInGlobalWindow("Y", new Instant(2100L), new Instant(10082L)), this.timerInGlobalWindow("A", new Instant(1400L), new Instant(10032L)), this.timerInGlobalWindow("B", new Instant(2000L), new Instant(10072L)), this.timerInGlobalWindow("C", new Instant(1800L), new Instant(10062L))));
            MatcherAssert.assertThat(aggregator.getOutputTimers().get(eventFamilyTimer), Matchers.containsInAnyOrder(this.dynamicTimerInGlobalWindow("X", "event-timer1", new Instant(1200L), new Instant(1203L)), this.clearedTimerInGlobalWindow("X", "to-delete-event"), this.clearedTimerInGlobalWindow("Y", "to-delete-event"), this.dynamicTimerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new Instant(2183L)), this.dynamicTimerInGlobalWindow("A", "event-timer1", new Instant(1400L), new Instant(2619L)), this.dynamicTimerInGlobalWindow("B", "event-timer1", new Instant(2000L), new Instant(2073L)), this.dynamicTimerInGlobalWindow("C", "event-timer1", new Instant(1800L), new Instant(1863L))));
            MatcherAssert.assertThat(aggregator.getOutputTimers().get(processingFamilyTimer), Matchers.containsInAnyOrder(this.dynamicTimerInGlobalWindow("X", "processing-timer1", new Instant(1200L), new Instant(10004L)), this.clearedTimerInGlobalWindow("X", "to-delete-processing"), this.dynamicTimerInGlobalWindow("Y", "processing-timer1", new Instant(2100L), new Instant(10084L)), this.clearedTimerInGlobalWindow("Y", "to-delete-processing"), this.dynamicTimerInGlobalWindow("A", "processing-timer1", new Instant(1400L), new Instant(10034L)), this.dynamicTimerInGlobalWindow("B", "processing-timer1", new Instant(2000L), new Instant(10074L)), this.dynamicTimerInGlobalWindow("C", "processing-timer1", new Instant(1800L), new Instant(10064L))));
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            Assert.assertEquals(new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.builder().put((Object)this.bagUserStateKey("bag", "X"), Arrays.asList("X0", "X1", "X2")).put((Object)this.bagUserStateKey("bag", "Y"), Arrays.asList("Y1", "Y2", "processing-family")).put((Object)this.bagUserStateKey("bag", "A"), Arrays.asList("A0", "event", "event", "event", "event", "event", "event", "event")).put((Object)this.bagUserStateKey("bag", "B"), Arrays.asList("event", "processing", "event", "event", "event", "event-family")).put((Object)this.bagUserStateKey("bag", "C"), Arrays.asList("C0", "processing")).build()).getData(), fakeStateClient.getData());
        }

        private <K> Timer<K> timerInGlobalWindow(K userKey, Instant holdTimestamp, Instant fireTimestamp) {
            return this.dynamicTimerInGlobalWindow(userKey, "", holdTimestamp, fireTimestamp);
        }

        private <K> Timer<K> clearedTimerInGlobalWindow(K userKey) {
            return this.clearedTimerInGlobalWindow(userKey, "");
        }

        private <K> Timer<K> clearedTimerInGlobalWindow(K userKey, String dynamicTimerTag) {
            return Timer.cleared(userKey, (String)dynamicTimerTag, Collections.singletonList(GlobalWindow.INSTANCE));
        }

        private <K> Timer<K> dynamicTimerInGlobalWindow(K userKey, String dynamicTimerTag, Instant holdTimestamp, Instant fireTimestamp) {
            return Timer.of(userKey, (String)dynamicTimerTag, Collections.singletonList(GlobalWindow.INSTANCE), (Instant)fireTimestamp, (Instant)holdTimestamp, (PaneInfo)PaneInfo.NO_FIRING);
        }

        private <T> WindowedValue<T> valueInWindows(T value, BoundedWindow window, BoundedWindow ... windows) {
            return WindowedValue.of(value, (Instant)window.maxTimestamp(), (Collection)ImmutableList.builder().add((Object)window).add((Object[])windows).build(), (PaneInfo)PaneInfo.NO_FIRING);
        }

        private BeamFnApi.StateKey iterableSideInputKey(String sideInputId) throws IOException {
            return this.iterableSideInputKey(sideInputId, ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)GlobalWindow.Coder.INSTANCE, (Object)GlobalWindow.INSTANCE)));
        }

        private BeamFnApi.StateKey iterableSideInputKey(String sideInputId, ByteString windowKey) {
            return BeamFnApi.StateKey.newBuilder().setIterableSideInput(BeamFnApi.StateKey.IterableSideInput.newBuilder().setTransformId(TEST_TRANSFORM_ID).setSideInputId(sideInputId).setWindow(windowKey)).build();
        }

        private ByteString encode(String ... values) throws IOException {
            ByteStringOutputStream out = new ByteStringOutputStream();
            for (String value : values) {
                StringUtf8Coder.of().encode(value, (OutputStream)out);
            }
            return out.toByteString();
        }

        @Test
        public void testRegistration() {
            for (PTransformRunnerFactory.Registrar registrar : ServiceLoader.load(PTransformRunnerFactory.Registrar.class)) {
                if (!(registrar instanceof FnApiDoFnRunner.Registrar)) continue;
                MatcherAssert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey("beam:transform:pardo:v1"));
                return;
            }
            Assert.fail("Expected registrar not found.");
        }

        @Test
        public void testProcessElementForSizedElementAndRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            WindowObservingTestSplittableDoFn doFn = new WindowObservingTestSplittableDoFn(singletonSideInputView);
            valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)doFn).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_process_sized_element_and_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            RunnerApi.PCollection inputPCollection = pProto.getComponents().getPcollectionsOrThrow(inputPCollectionId);
            RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)pProto.getComponents());
            WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(inputPCollection.getCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT), (Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(pProto.getComponents().getWindowingStrategiesOrThrow(inputPCollection.getWindowingStrategyId()).getWindowCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT));
            String outputPCollectionId = pTransform.getOutputsOrThrow("output");
            ImmutableMap stateData = ImmutableMap.of((Object)this.iterableSideInputKey(singletonSideInputView.getTagInternal().getId(), ByteString.EMPTY), Arrays.asList("8"));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData);
            BundleSplitListener.InMemory splitListener = BundleSplitListener.InMemory.create();
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).splitListener((BundleSplitListener)splitListener).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 10L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)5.0)));
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            BeamFnApi.BundleApplication primaryRoot = (BeamFnApi.BundleApplication)Iterables.getOnlyElement((Iterable)splitListener.getPrimaryRoots());
            BeamFnApi.DelayedBundleApplication residualRoot = (BeamFnApi.DelayedBundleApplication)Iterables.getOnlyElement((Iterable)splitListener.getResidualRoots());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), primaryRoot.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, primaryRoot.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), residualRoot.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 8L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)3.0)), inputCoder.decode(primaryRoot.getElement().newInput()));
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(8L, 10L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)7L)))), (Object)2.0)), inputCoder.decode(residualRoot.getApplication().getElement().newInput()));
            Instant expectedOutputWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)7L));
            Assert.assertEquals(ImmutableMap.of((Object)"output", (Object)Timestamp.newBuilder().setSeconds(expectedOutputWatermark.getMillis() / 1000L).setNanos((int)(expectedOutputWatermark.getMillis() % 1000L) * 1000000).build()), residualRoot.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Duration.newBuilder().setSeconds(54L).setNanos(321000000).build(), residualRoot.getRequestedTimeDelay());
            splitListener.clear();
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0)));
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.timestampedValueInGlobalWindow((Object)"5:5", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)5L))), WindowedValue.timestampedValueInGlobalWindow((Object)"5:6", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)6L))), WindowedValue.timestampedValueInGlobalWindow((Object)"5:7", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)7L))), WindowedValue.timestampedValueInGlobalWindow((Object)"2:0", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)0L))), WindowedValue.timestampedValueInGlobalWindow((Object)"2:1", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))));
            Assert.assertTrue(splitListener.getPrimaryRoots().isEmpty());
            Assert.assertTrue(splitListener.getResidualRoots().isEmpty());
            mainOutputValues.clear();
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<HandlesSplits.SplitResult> trySplitFuture = executorService.submit(() -> {
                try {
                    doFn.waitForSplitElementToBeProcessed();
                    Assert.assertEquals(0.6, ((HandlesSplits)mainInput).getProgress(), 0.01);
                    ExecutionTest.assertReportedProgressEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 3.0, 2.0);
                    HandlesSplits.SplitResult splitResult = ((HandlesSplits)mainInput).trySplit(0.0);
                    return splitResult;
                }
                finally {
                    doFn.trySplitPerformed();
                }
            });
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0)));
            HandlesSplits.SplitResult trySplitResult = trySplitFuture.get();
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.timestampedValueInGlobalWindow((Object)"7:0", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)0L))), WindowedValue.timestampedValueInGlobalWindow((Object)"7:1", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L))), WindowedValue.timestampedValueInGlobalWindow((Object)"7:2", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L))), WindowedValue.timestampedValueInGlobalWindow((Object)"7:3", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)3L)))));
            BeamFnApi.BundleApplication primaryRoot2 = (BeamFnApi.BundleApplication)Iterables.getOnlyElement((Iterable)trySplitResult.getPrimaryRoots());
            BeamFnApi.DelayedBundleApplication residualRoot2 = (BeamFnApi.DelayedBundleApplication)Iterables.getOnlyElement((Iterable)trySplitResult.getResidualRoots());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), primaryRoot2.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, primaryRoot2.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), residualRoot2.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRoot2.getApplication().getTransformId());
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 4L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)4.0)), inputCoder.decode(primaryRoot2.getElement().newInput()));
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(4L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L)))), (Object)1.0)), inputCoder.decode(residualRoot2.getApplication().getElement().newInput()));
            Instant expectedOutputWatermark2 = GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L));
            Assert.assertEquals(ImmutableMap.of((Object)"output", (Object)Timestamp.newBuilder().setSeconds(expectedOutputWatermark2.getMillis() / 1000L).setNanos((int)(expectedOutputWatermark2.getMillis() % 1000L) * 1000000).build()), residualRoot2.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(residualRoot2.getRequestedTimeDelay().getDefaultInstanceForType(), residualRoot2.getRequestedTimeDelay());
            Assert.assertTrue(splitListener.getPrimaryRoots().isEmpty());
            Assert.assertTrue(splitListener.getResidualRoots().isEmpty());
            mainOutputValues.clear();
            executorService.shutdown();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            Assert.assertEquals(new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData).getData(), fakeClient.getData());
        }

        @Test
        public void testProcessElementForSizedElementAndRestrictionSplitBeforeTryClaim() throws Exception {
            Pipeline p = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            WindowObservingTestSplittableDoFn doFn = new WindowObservingTestSplittableDoFn(singletonSideInputView);
            valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)doFn).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_process_sized_element_and_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            RunnerApi.PCollection inputPCollection = pProto.getComponents().getPcollectionsOrThrow(inputPCollectionId);
            RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)pProto.getComponents());
            WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(inputPCollection.getCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT), (Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(pProto.getComponents().getWindowingStrategiesOrThrow(inputPCollection.getWindowingStrategyId()).getWindowCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT));
            String outputPCollectionId = pTransform.getOutputsOrThrow("output");
            ImmutableMap stateData = ImmutableMap.of((Object)this.iterableSideInputKey(singletonSideInputView.getTagInternal().getId(), ByteString.EMPTY), Arrays.asList("8"));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData);
            BundleSplitListener.InMemory splitListener = BundleSplitListener.InMemory.create();
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).splitListener((BundleSplitListener)splitListener).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            doFn.setupBlockProcess();
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<HandlesSplits.SplitResult> trySplitFuture = executorService.submit(() -> {
                try {
                    doFn.waitForProcessEntered();
                    Assert.assertNull(((HandlesSplits)mainInput).trySplit(0.0));
                    doFn.unblockProcess();
                    doFn.waitForSplitElementToBeProcessed();
                    Assert.assertEquals(0.6, ((HandlesSplits)mainInput).getProgress(), 0.01);
                    ExecutionTest.assertReportedProgressEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 3.0, 2.0);
                    HandlesSplits.SplitResult splitResult = ((HandlesSplits)mainInput).trySplit(0.0);
                    return splitResult;
                }
                finally {
                    doFn.trySplitPerformed();
                }
            });
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0)));
            HandlesSplits.SplitResult trySplitResult = trySplitFuture.get();
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.timestampedValueInGlobalWindow((Object)"7:0", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)0L))), WindowedValue.timestampedValueInGlobalWindow((Object)"7:1", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L))), WindowedValue.timestampedValueInGlobalWindow((Object)"7:2", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L))), WindowedValue.timestampedValueInGlobalWindow((Object)"7:3", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)3L)))));
            BeamFnApi.BundleApplication primaryRoot = (BeamFnApi.BundleApplication)Iterables.getOnlyElement((Iterable)trySplitResult.getPrimaryRoots());
            BeamFnApi.DelayedBundleApplication residualRoot = (BeamFnApi.DelayedBundleApplication)Iterables.getOnlyElement((Iterable)trySplitResult.getResidualRoots());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), primaryRoot.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, primaryRoot.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), residualRoot.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 4L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)4.0)), inputCoder.decode(primaryRoot.getElement().newInput()));
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(4L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L)))), (Object)1.0)), inputCoder.decode(residualRoot.getApplication().getElement().newInput()));
            Instant expectedOutputWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L));
            Assert.assertEquals(ImmutableMap.of((Object)"output", (Object)Timestamp.newBuilder().setSeconds(expectedOutputWatermark.getMillis() / 1000L).setNanos((int)(expectedOutputWatermark.getMillis() % 1000L) * 1000000).build()), residualRoot.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(residualRoot.getRequestedTimeDelay().getDefaultInstanceForType(), residualRoot.getRequestedTimeDelay());
            Assert.assertTrue(splitListener.getPrimaryRoots().isEmpty());
            Assert.assertTrue(splitListener.getResidualRoots().isEmpty());
            mainOutputValues.clear();
            executorService.shutdown();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            Assert.assertEquals(new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData).getData(), fakeClient.getData());
        }

        @Test
        public void testProcessElementForSizedElementAndRestrictionNoTryClaim() throws Exception {
            Pipeline p = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            WindowObservingTestSplittableDoFn doFn = new WindowObservingTestSplittableDoFn(singletonSideInputView);
            doFn.setAbortProcessing();
            valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)doFn).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_process_sized_element_and_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            RunnerApi.PCollection inputPCollection = pProto.getComponents().getPcollectionsOrThrow(inputPCollectionId);
            RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)pProto.getComponents());
            WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(inputPCollection.getCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT), (Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(pProto.getComponents().getWindowingStrategiesOrThrow(inputPCollection.getWindowingStrategyId()).getWindowCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT));
            String outputPCollectionId = pTransform.getOutputsOrThrow("output");
            ImmutableMap stateData = ImmutableMap.of((Object)this.iterableSideInputKey(singletonSideInputView.getTagInternal().getId(), ByteString.EMPTY), Arrays.asList("8"));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData);
            BundleSplitListener.InMemory splitListener = BundleSplitListener.InMemory.create();
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).splitListener((BundleSplitListener)splitListener).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 10L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)5.0)));
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            BeamFnApi.BundleApplication primaryRoot = (BeamFnApi.BundleApplication)Iterables.getOnlyElement((Iterable)splitListener.getPrimaryRoots());
            BeamFnApi.DelayedBundleApplication residualRoot = (BeamFnApi.DelayedBundleApplication)Iterables.getOnlyElement((Iterable)splitListener.getResidualRoots());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), primaryRoot.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, primaryRoot.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), residualRoot.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)0.0)), inputCoder.decode(primaryRoot.getElement().newInput()));
            Assert.assertEquals(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 10L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)5.0)), inputCoder.decode(residualRoot.getApplication().getElement().newInput()));
            MatcherAssert.assertThat(residualRoot.getApplication().getOutputWatermarksMap(), Matchers.anEmptyMap());
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Duration.newBuilder().setSeconds(54L).setNanos(321000000).build(), residualRoot.getRequestedTimeDelay());
            splitListener.clear();
        }

        private static void assertNoReportedProgress(List<BundleProgressReporter> reporters) {
            HashMap monitoringData = new HashMap();
            for (BundleProgressReporter reporter : reporters) {
                reporter.updateIntermediateMonitoringData(monitoringData);
            }
            MatcherAssert.assertThat(monitoringData.entrySet(), Matchers.empty());
        }

        private static void assertReportedProgressEquals(ShortIdMap shortIdMap, List<BundleProgressReporter> reporters, double expectedWorkCompleted, double expectedWorkRemaining) throws Exception {
            HashMap monitoringData = new HashMap();
            for (BundleProgressReporter reporter : reporters) {
                reporter.updateIntermediateMonitoringData(monitoringData);
            }
            String workCompletedShortId = shortIdMap.getOrCreateShortId(WORK_COMPLETED_MI);
            String workRemainingShortId = shortIdMap.getOrCreateShortId(WORK_REMAINING_MI);
            Assert.assertTrue(monitoringData.containsKey(workCompletedShortId));
            Assert.assertTrue(monitoringData.containsKey(workRemainingShortId));
            Assert.assertEquals(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)IterableCoder.of((Coder)DoubleCoder.of()), Collections.singletonList(expectedWorkCompleted))), monitoringData.get(workCompletedShortId));
            Assert.assertEquals(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)IterableCoder.of((Coder)DoubleCoder.of()), Collections.singletonList(expectedWorkRemaining))), monitoringData.get(workRemainingShortId));
        }

        @Test
        public void testProcessElementForWindowedSizedElementAndRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            WindowObservingTestSplittableDoFn doFn = new WindowObservingTestSplittableDoFn(singletonSideInputView);
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)doFn).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_process_sized_element_and_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            RunnerApi.PCollection inputPCollection = pProto.getComponents().getPcollectionsOrThrow(inputPCollectionId);
            RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)pProto.getComponents());
            WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(inputPCollection.getCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT), (Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(pProto.getComponents().getWindowingStrategiesOrThrow(inputPCollection.getWindowingStrategyId()).getWindowCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT));
            String outputPCollectionId = pTransform.getOutputsOrThrow("output");
            ImmutableMap stateData = ImmutableMap.of((Object)this.iterableSideInputKey(singletonSideInputView.getTagInternal().getId(), ByteString.EMPTY), Arrays.asList("8"));
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData);
            BundleSplitListener.InMemory splitListener = BundleSplitListener.InMemory.create();
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).build()).splitListener((BundleSplitListener)splitListener).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getStartBundleFunctions())).run();
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            WindowedValue<KV> firstValue = this.valueInWindows(KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 10L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Object)5.0), (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(firstValue);
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            BeamFnApi.BundleApplication primaryRoot = (BeamFnApi.BundleApplication)Iterables.getOnlyElement((Iterable)splitListener.getPrimaryRoots());
            Assert.assertEquals(2L, splitListener.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication residualRoot = (BeamFnApi.DelayedBundleApplication)splitListener.getResidualRoots().get(1);
            BeamFnApi.DelayedBundleApplication residualRootForUnprocessedWindows = (BeamFnApi.DelayedBundleApplication)splitListener.getResidualRoots().get(0);
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), primaryRoot.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, primaryRoot.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), residualRoot.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
            Instant expectedOutputWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)7L));
            ImmutableMap expectedOutputWatmermarkMap = ImmutableMap.of((Object)"output", (Object)Timestamp.newBuilder().setSeconds(expectedOutputWatermark.getMillis() / 1000L).setNanos((int)(expectedOutputWatermark.getMillis() % 1000L) * 1000000).build());
            Instant initialWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L));
            ImmutableMap expectedOutputWatmermarkMapForUnprocessedWindows = ImmutableMap.of((Object)"output", (Object)Timestamp.newBuilder().setSeconds(initialWatermark.getMillis() / 1000L).setNanos((int)(initialWatermark.getMillis() % 1000L) * 1000000).build());
            Assert.assertEquals(expectedOutputWatmermarkMap, residualRoot.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Duration.newBuilder().setSeconds(54L).setNanos(321000000).build(), residualRoot.getRequestedTimeDelay());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), residualRootForUnprocessedWindows.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRootForUnprocessedWindows.getApplication().getTransformId());
            Assert.assertEquals(residualRootForUnprocessedWindows.getRequestedTimeDelay().getDefaultInstanceForType(), residualRootForUnprocessedWindows.getRequestedTimeDelay());
            Assert.assertEquals(expectedOutputWatmermarkMapForUnprocessedWindows, residualRootForUnprocessedWindows.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(ExecutionTest.decode(inputCoder, primaryRoot.getElement()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 8L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Object)3.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()));
            Assert.assertEquals(ExecutionTest.decode(inputCoder, residualRoot.getApplication().getElement()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(8L, 10L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)7L)))), (Object)2.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()));
            Assert.assertEquals(ExecutionTest.decode(inputCoder, residualRootForUnprocessedWindows.getApplication().getElement()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(5L, 10L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Object)5.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane()));
            splitListener.clear();
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            WindowedValue<KV> secondValue = this.valueInWindows(KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Object)2.0), (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(secondValue);
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)"5:5", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)5L)), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)"5:6", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)6L)), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)"5:7", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)7L)), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)"2:0", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)0L)), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)"2:1", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)"2:0", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)0L)), (BoundedWindow)window2, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)"2:1", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)), (BoundedWindow)window2, (PaneInfo)firstValue.getPane())));
            Assert.assertTrue(splitListener.getPrimaryRoots().isEmpty());
            Assert.assertTrue(splitListener.getResidualRoots().isEmpty());
            mainOutputValues.clear();
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<HandlesSplits.SplitResult> trySplitFuture = executorService.submit(() -> {
                try {
                    doFn.waitForSplitElementToBeProcessed();
                    Assert.assertEquals(0.3, ((HandlesSplits)mainInput).getProgress(), 0.01);
                    ExecutionTest.assertReportedProgressEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 3.0, 7.0);
                    HandlesSplits.SplitResult splitResult = ((HandlesSplits)mainInput).trySplit(0.0);
                    return splitResult;
                }
                finally {
                    doFn.trySplitPerformed();
                }
            });
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            WindowedValue<KV> splitValue = this.valueInWindows(KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Object)2.0), (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(splitValue);
            HandlesSplits.SplitResult trySplitResult = trySplitFuture.get();
            ExecutionTest.assertNoReportedProgress(context.getBundleProgressReporters());
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)"7:0", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)0L)), (BoundedWindow)window1, (PaneInfo)splitValue.getPane()), WindowedValue.of((Object)"7:1", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)), (BoundedWindow)window1, (PaneInfo)splitValue.getPane()), WindowedValue.of((Object)"7:2", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L)), (BoundedWindow)window1, (PaneInfo)splitValue.getPane()), WindowedValue.of((Object)"7:3", (Instant)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)3L)), (BoundedWindow)window1, (PaneInfo)splitValue.getPane())));
            BeamFnApi.BundleApplication primaryRoot2 = (BeamFnApi.BundleApplication)Iterables.getOnlyElement((Iterable)trySplitResult.getPrimaryRoots());
            Assert.assertEquals(2L, trySplitResult.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication residualRoot2 = (BeamFnApi.DelayedBundleApplication)trySplitResult.getResidualRoots().get(1);
            BeamFnApi.DelayedBundleApplication residualRootInUnprocessedWindows = (BeamFnApi.DelayedBundleApplication)trySplitResult.getResidualRoots().get(0);
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), primaryRoot2.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, primaryRoot2.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform), residualRoot2.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRoot2.getApplication().getTransformId());
            Assert.assertEquals(TEST_TRANSFORM_ID, residualRootInUnprocessedWindows.getApplication().getTransformId());
            Assert.assertEquals(residualRootInUnprocessedWindows.getRequestedTimeDelay().getDefaultInstanceForType(), residualRootInUnprocessedWindows.getRequestedTimeDelay());
            Instant initialWatermark2 = GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L));
            Instant expectedOutputWatermark2 = GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L));
            ImmutableMap expectedOutputWatermarkMapInUnprocessedResiduals = ImmutableMap.of((Object)"output", (Object)Timestamp.newBuilder().setSeconds(initialWatermark2.getMillis() / 1000L).setNanos((int)(initialWatermark2.getMillis() % 1000L) * 1000000).build());
            ImmutableMap expectedOutputWatermarkMap = ImmutableMap.of((Object)"output", (Object)Timestamp.newBuilder().setSeconds(expectedOutputWatermark2.getMillis() / 1000L).setNanos((int)(expectedOutputWatermark2.getMillis() % 1000L) * 1000000).build());
            Assert.assertEquals(expectedOutputWatermarkMapInUnprocessedResiduals, residualRootInUnprocessedWindows.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(this.valueInWindows(KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 4L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Object)4.0), (BoundedWindow)window1, new BoundedWindow[0]), inputCoder.decode(primaryRoot2.getElement().newInput()));
            Assert.assertEquals(this.valueInWindows(KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(4L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)2L)))), (Object)1.0), (BoundedWindow)window1, new BoundedWindow[0]), inputCoder.decode(residualRoot2.getApplication().getElement().newInput()));
            Assert.assertEquals(expectedOutputWatermarkMap, residualRoot2.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(WindowedValue.of((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Object)5.0), (Instant)splitValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)splitValue.getPane()), inputCoder.decode(residualRootInUnprocessedWindows.getApplication().getElement().newInput()));
            Assert.assertEquals(residualRoot2.getRequestedTimeDelay().getDefaultInstanceForType(), residualRoot2.getRequestedTimeDelay());
            Assert.assertTrue(splitListener.getPrimaryRoots().isEmpty());
            Assert.assertTrue(splitListener.getResidualRoots().isEmpty());
            mainOutputValues.clear();
            executorService.shutdown();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            Assert.assertEquals(new FakeBeamFnStateClient(StringUtf8Coder.of(), stateData).getData(), fakeClient.getData());
        }

        private static <T> T decode(Coder<T> coder, ByteString value) {
            try {
                return (T)coder.decode(value.newInput());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Test
        public void testProcessElementForPairWithRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new WindowObservingTestSplittableDoFn(singletonSideInputView)).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_pair_with_restriction:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)"5"));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)"2"));
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L))))), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))))));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedPairWithRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new WindowObservingTestSplittableDoFn(singletonSideInputView)).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_pair_with_restriction:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue<String> firstValue = this.valueInWindows("5", (BoundedWindow)window1, new BoundedWindow[]{window2});
            WindowedValue<String> secondValue = this.valueInWindows("2", (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(firstValue);
            mainInput.accept(secondValue);
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Instant)secondValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)secondValue.getPane()), WindowedValue.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Instant)secondValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)secondValue.getPane())));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedPairWithRestrictionWithNonWindowObservingOptimization() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            valuePCollection.apply((PTransform)View.asSingleton());
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_pair_with_restriction:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue<String> firstValue = this.valueInWindows("5", (BoundedWindow)window1, new BoundedWindow[]{window2});
            WindowedValue<String> secondValue = this.valueInWindows("2", (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(firstValue);
            mainInput.accept(secondValue);
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Instant)firstValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L)))), (Instant)secondValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)secondValue.getPane())));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testProcessElementForSplitAndSizeRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new WindowObservingTestSplittableDoFn(singletonSideInputView)).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_split_and_size_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)KvCoder.of((Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of())), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE))));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE))));
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(2L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)3.0)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(1L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0))));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedSplitAndSizeRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new WindowObservingTestSplittableDoFn(singletonSideInputView)).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_split_and_size_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)KvCoder.of((Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of())), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue<KV> firstValue = this.valueInWindows(KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (BoundedWindow)window1, new BoundedWindow[]{window2});
            WindowedValue<KV> secondValue = this.valueInWindows(KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(firstValue);
            mainInput.accept(secondValue);
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(2L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)3.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(2L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)3.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(1L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(1L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane())));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedSplitAndSizeRestrictionWithNonWindowObservingOptimization() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_split_and_size_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).build()).build();
            ArrayList mainOutputValues = new ArrayList();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)KvCoder.of((Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of())), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, mainOutputValues::add);
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue<KV> firstValue = this.valueInWindows(KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (BoundedWindow)window1, new BoundedWindow[]{window2});
            WindowedValue<KV> secondValue = this.valueInWindows(KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(firstValue);
            mainInput.accept(secondValue);
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (Instant)firstValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(2L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)3.0), (Instant)firstValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(1L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)firstValue.getPane())));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        private static HandlesSplits.SplitResult createSplitResult(double fractionOfRemainder) {
            ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
            ByteStringOutputStream residualBytes = new ByteStringOutputStream();
            try {
                DoubleCoder.of().encode(Double.valueOf(fractionOfRemainder), (OutputStream)primaryBytes);
                DoubleCoder.of().encode(Double.valueOf(1.0 - fractionOfRemainder), (OutputStream)residualBytes);
            }
            catch (Exception exception) {
                // empty catch block
            }
            return HandlesSplits.SplitResult.of((List)ImmutableList.of((Object)BeamFnApi.BundleApplication.newBuilder().setElement(primaryBytes.toByteString()).setInputId("mainInputId-process").setTransformId("processPTransfromId").build()), (List)ImmutableList.of((Object)BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setElement(residualBytes.toByteString()).setInputId("mainInputId-process").setTransformId("processPTransfromId").build()).build()));
        }

        @Test
        public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObservingWindows() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            WindowObservingTestSplittableDoFn doFn = WindowObservingTestSplittableDoFn.forSplitAtTruncate((PCollectionView<String>)singletonSideInputView);
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)doFn).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createTruncateReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_truncate_sized_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            RunnerApi.PCollection inputPCollection = pProto.getComponents().getPcollectionsOrThrow(inputPCollectionId);
            RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents((RunnerApi.Components)pProto.getComponents());
            WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(inputPCollection.getCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT), (Coder)CoderTranslation.fromProto((RunnerApi.Coder)pProto.getComponents().getCodersOrThrow(pProto.getComponents().getWindowingStrategiesOrThrow(inputPCollection.getWindowingStrategyId()).getWindowCoderId()), (RehydratedComponents)rehydratedComponents, (CoderTranslation.TranslationContext)CoderTranslation.TranslationContext.DEFAULT));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllEnvironments(Collections.emptyMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues = new ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>>();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)KvCoder.of((Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of())), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, new SplittableFnDataReceiver(mainOutputValues));
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            mainOutputValues.clear();
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            IntervalWindow window3 = new IntervalWindow(new Instant(7L), new Instant(12L));
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<HandlesSplits.SplitResult> trySplitFuture = executorService.submit(() -> {
                try {
                    doFn.waitForSplitElementToBeProcessed();
                    HandlesSplits.SplitResult result = ((HandlesSplits)mainInput).trySplit(0.0);
                    Assert.assertNotNull(result);
                    HandlesSplits.SplitResult splitResult = result;
                    return splitResult;
                }
                finally {
                    doFn.trySplitPerformed();
                }
            });
            WindowedValue<KV> splitValue = this.valueInWindows(KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 6L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)6.0), (BoundedWindow)window1, new BoundedWindow[]{window2, window3});
            mainInput.accept(splitValue);
            HandlesSplits.SplitResult trySplitResult = trySplitFuture.get();
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 3L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)3.0), (Instant)splitValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)splitValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 3L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)3.0), (Instant)splitValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)splitValue.getPane())));
            HandlesSplits.SplitResult expectedElementSplit = ExecutionTest.createSplitResult(0.0);
            BeamFnApi.BundleApplication expectedElementSplitPrimary = (BeamFnApi.BundleApplication)Iterables.getOnlyElement((Iterable)expectedElementSplit.getPrimaryRoots());
            ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
            inputCoder.encode((Object)WindowedValue.of((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 6L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)6.0), (Instant)splitValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)splitValue.getPane()), (OutputStream)primaryBytes);
            BeamFnApi.BundleApplication expectedWindowedPrimary = BeamFnApi.BundleApplication.newBuilder().setElement(primaryBytes.toByteString()).setInputId(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform)).setTransformId(TEST_TRANSFORM_ID).build();
            BeamFnApi.DelayedBundleApplication expectedElementSplitResidual = (BeamFnApi.DelayedBundleApplication)Iterables.getOnlyElement((Iterable)expectedElementSplit.getResidualRoots());
            ByteStringOutputStream residualBytes = new ByteStringOutputStream();
            inputCoder.encode((Object)WindowedValue.of((Object)KV.of((Object)KV.of((Object)"7", (Object)KV.of((Object)new OffsetRange(0L, 6L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)6.0), (Instant)splitValue.getTimestamp(), (BoundedWindow)window3, (PaneInfo)splitValue.getPane()), (OutputStream)residualBytes);
            BeamFnApi.DelayedBundleApplication expectedWindowedResidual = BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setElement(residualBytes.toByteString()).setInputId(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform)).setTransformId(TEST_TRANSFORM_ID).build()).build();
            MatcherAssert.assertThat(trySplitResult.getPrimaryRoots(), Matchers.contains(expectedWindowedPrimary, expectedElementSplitPrimary));
            MatcherAssert.assertThat(trySplitResult.getResidualRoots(), Matchers.contains(expectedWindowedResidual, expectedElementSplitResidual));
        }

        @Test
        public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWithoutObservingWindow() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createTruncateReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_truncate_sized_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues = new ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>>();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)OffsetRange.Coder.of()), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, new SplittableFnDataReceiver(mainOutputValues));
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            Assert.assertEquals(0.7, ((HandlesSplits)mainInput).getProgress(), 0.0);
            Assert.assertEquals(ExecutionTest.createSplitResult(0.4), ((HandlesSplits)mainInput).trySplit(0.4));
        }

        @Test
        public void testProcessElementForTruncateAndSizeRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createTruncateReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_truncate_sized_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues = new ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>>();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)KvCoder.of((Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of())), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, new SplittableFnDataReceiver(mainOutputValues));
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)5.0)));
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0)));
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0))));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedTruncateAndSizeRestriction() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            PCollectionView singletonSideInputView = (PCollectionView)valuePCollection.apply((PTransform)View.asSingleton());
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new WindowObservingTestSplittableDoFn(singletonSideInputView)).withSideInputs(new PCollectionView[]{singletonSideInputView}));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createTruncateReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_truncate_sized_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).beamFnStateClient(fakeClient).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues = new ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>>();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)KvCoder.of((Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of())), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, new SplittableFnDataReceiver(mainOutputValues));
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue<KV> firstValue = this.valueInWindows(KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)5.0), (BoundedWindow)window1, new BoundedWindow[]{window2});
            WindowedValue<KV> secondValue = this.valueInWindows(KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(firstValue);
            mainInput.accept(secondValue);
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window1, (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (BoundedWindow)window2, (PaneInfo)firstValue.getPane())));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedTruncateAndSizeRestrictionWithNonWindowObservingOptimization() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"unused", (Object[])new String[0]));
            ((PCollection)valuePCollection.apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardSeconds((long)1L))))).apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline pProto = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p, (SdkComponents)SdkComponents.create((PipelineOptions)p.getOptions()), (boolean)true), (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createTruncateReplacement());
            String expandedTransformId = (String)((Map.Entry)Iterables.find(pProto.getComponents().getTransformsMap().entrySet(), entry -> ((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn().equals("beam:transform:sdf_truncate_sized_restrictions:v1") && ((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID))).getKey();
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(expandedTransformId);
            String inputPCollectionId = pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName((RunnerApi.PTransformOrBuilder)pTransform));
            String outputPCollectionId = (String)Iterables.getOnlyElement(pTransform.getOutputsMap().values());
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues = new ArrayList<WindowedValue<KV<KV<String, OffsetRange>, Double>>>();
            KvCoder coder = KvCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)KvCoder.of((Coder)OffsetRange.Coder.of(), (Coder)InstantCoder.of())), (Coder)DoubleCoder.of());
            context.addPCollectionConsumer(outputPCollectionId, new SplittableFnDataReceiver(mainOutputValues));
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            Assert.assertTrue(context.getStartBundleFunctions().isEmpty());
            mainOutputValues.clear();
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), Matchers.containsInAnyOrder(inputPCollectionId, outputPCollectionId));
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            MatcherAssert.assertThat(mainInput, Matchers.instanceOf(HandlesSplits.class));
            IntervalWindow window1 = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow window2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue<KV> firstValue = this.valueInWindows(KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 5L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)5.0), (BoundedWindow)window1, new BoundedWindow[]{window2});
            WindowedValue<KV> secondValue = this.valueInWindows(KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (BoundedWindow)window1, new BoundedWindow[]{window2});
            mainInput.accept(firstValue);
            mainInput.accept(secondValue);
            MatcherAssert.assertThat(mainOutputValues, Matchers.contains(WindowedValue.of((Object)KV.of((Object)KV.of((Object)"5", (Object)KV.of((Object)new OffsetRange(0L, 2L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)2.0), (Instant)firstValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)firstValue.getPane()), WindowedValue.of((Object)KV.of((Object)KV.of((Object)"2", (Object)KV.of((Object)new OffsetRange(0L, 1L), (Object)GlobalWindow.TIMESTAMP_MIN_VALUE)), (Object)1.0), (Instant)firstValue.getTimestamp(), (Collection)ImmutableList.of((Object)window1, (Object)window2), (PaneInfo)firstValue.getPane())));
            mainOutputValues.clear();
            Assert.assertTrue(context.getFinishBundleFunctions().isEmpty());
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getTearDownFunctions())).run();
            MatcherAssert.assertThat(mainOutputValues, Matchers.empty());
        }

        @Test
        public void testDoFnSkewNotAllowed() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"0", (Object[])new String[]{"1"}));
            PCollection outputPCollection = (PCollection)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new SkewingDoFn(Duration.ZERO)));
            SdkComponents sdkComponents = SdkComponents.create((PipelineOptions)p.getOptions());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            ArrayList<WindowedValue<String>> mainOutputValues = new ArrayList<WindowedValue<String>>();
            StringUtf8Coder coder = StringUtf8Coder.of();
            context.addPCollectionConsumer(outputPCollectionId, new OutputFnDataReceiver(mainOutputValues));
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            mainOutputValues.clear();
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)"0"));
            String message = Assert.assertThrows(UserCodeException.class, () -> mainInput.accept((Object)WindowedValue.timestampedValueInGlobalWindow((Object)"1", (Instant)new Instant(0L)))).getMessage();
            MatcherAssert.assertThat(message, Matchers.allOf(Matchers.containsString(String.format("timestamp %s", new Instant(0L).minus((ReadableDuration)Duration.millis((long)1L)))), Matchers.containsString(String.format("allowed skew (%s)", PeriodFormat.getDefault().print((ReadablePeriod)Duration.ZERO.toPeriod())))));
        }

        @Test
        public void testDoFnSkewAllowed() throws Exception {
            Pipeline p = Pipeline.create();
            PCollection valuePCollection = (PCollection)p.apply((PTransform)Create.of((Object)"0", (Object[])new String[]{"3"}));
            PCollection outputPCollection = (PCollection)valuePCollection.apply(TEST_TRANSFORM_ID, (PTransform)ParDo.of((DoFn)new SkewingDoFn(Duration.millis((long)5L))));
            SdkComponents sdkComponents = SdkComponents.create((PipelineOptions)p.getOptions());
            RunnerApi.Pipeline pProto = PipelineTranslation.toProto((Pipeline)p, (SdkComponents)sdkComponents);
            String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
            String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
            RunnerApi.PTransform pTransform = pProto.getComponents().getTransformsOrThrow(pProto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            ArrayList<WindowedValue<String>> mainOutputValues = new ArrayList<WindowedValue<String>>();
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform).processBundleInstructionId("57").components(RunnerApi.Components.newBuilder().putAllCoders(pProto.getComponents().getCodersMap()).putAllEnvironments(Collections.emptyMap()).putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()).putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()).build()).build();
            StringUtf8Coder coder = StringUtf8Coder.of();
            context.addPCollectionConsumer(outputPCollectionId, new OutputFnDataReceiver(mainOutputValues));
            new FnApiDoFnRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            mainOutputValues.clear();
            FnDataReceiver mainInput = context.getPCollectionConsumer(inputPCollectionId);
            mainInput.accept((Object)WindowedValue.valueInGlobalWindow((Object)"0"));
            mainInput.accept((Object)WindowedValue.timestampedValueInGlobalWindow((Object)"3", (Instant)new Instant(0L)));
        }

        private static class OutputFnDataReceiver
        implements FnDataReceiver<WindowedValue> {
            private final List<WindowedValue<String>> mainOutputValues;

            OutputFnDataReceiver(List<WindowedValue<String>> mainOutputValues) {
                this.mainOutputValues = mainOutputValues;
            }

            public void accept(WindowedValue input) throws Exception {
                this.mainOutputValues.add((WindowedValue<String>)input);
            }
        }

        private static class SkewingDoFn
        extends DoFn<String, String> {
            private final Duration allowedSkew;

            private SkewingDoFn(Duration allowedSkew) {
                this.allowedSkew = allowedSkew;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                Duration duration = Duration.millis((long)Long.valueOf((String)context.element()));
                context.outputWithTimestamp((Object)((String)context.element()), context.timestamp().minus((ReadableDuration)duration));
            }

            public Duration getAllowedTimestampSkew() {
                return this.allowedSkew;
            }
        }

        private static class SplittableFnDataReceiver
        implements HandlesSplits,
        FnDataReceiver<WindowedValue> {
            private final List<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues;

            SplittableFnDataReceiver(List<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues) {
                this.mainOutputValues = mainOutputValues;
            }

            public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
                return ExecutionTest.createSplitResult(fractionOfRemainder);
            }

            public double getProgress() {
                return 0.7;
            }

            public void accept(WindowedValue input) throws Exception {
                this.mainOutputValues.add((WindowedValue<KV<KV<String, OffsetRange>, Double>>)input);
            }
        }

        static class WindowObservingTestSplittableDoFn
        extends NonWindowObservingTestSplittableDoFn {
            private final PCollectionView<String> singletonSideInput;
            private static final long PROCESSED_WINDOW = 1L;
            private boolean splitAtTruncate = false;
            private long processedWindowCount = 0L;

            private WindowObservingTestSplittableDoFn(PCollectionView<String> singletonSideInput) {
                this.singletonSideInput = singletonSideInput;
            }

            private static WindowObservingTestSplittableDoFn forSplitAtTruncate(PCollectionView<String> singletonSideInput) {
                WindowObservingTestSplittableDoFn doFn = new WindowObservingTestSplittableDoFn(singletonSideInput);
                doFn.splitAtTruncate = true;
                return doFn;
            }

            @Override
            @DoFn.ProcessElement
            public DoFn.ProcessContinuation processElement(DoFn.ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker, ManualWatermarkEstimator<Instant> watermarkEstimator) throws Exception {
                this.enterProcessAndBlockIfEnabled();
                long checkpointUpperBound = Long.parseLong((String)context.sideInput(this.singletonSideInput));
                long position = ((OffsetRange)tracker.currentRestriction()).getFrom();
                boolean claimStatus = true;
                while (!this.shouldAbortProcessing() && (claimStatus = tracker.tryClaim((Object)position))) {
                    if (position == 3L) {
                        this.splitElementProcessed();
                        this.waitForTrySplitPerformed();
                    }
                    context.outputWithTimestamp((Object)((String)context.element() + ":" + position), GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)position)));
                    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)position)));
                    if (++position != checkpointUpperBound) continue;
                }
                if (!claimStatus) {
                    return DoFn.ProcessContinuation.stop();
                }
                return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis((long)54321L));
            }

            public Duration getAllowedTimestampSkew() {
                return Duration.millis((long)Long.MAX_VALUE);
            }

            @Override
            @DoFn.TruncateRestriction
            public RestrictionTracker.TruncateResult<OffsetRange> truncateRestriction(@DoFn.Restriction OffsetRange range) throws Exception {
                if (this.splitAtTruncate && this.processedWindowCount == 1L) {
                    this.splitElementProcessed();
                    this.waitForTrySplitPerformed();
                }
                ++this.processedWindowCount;
                return RestrictionTracker.TruncateResult.of((Object)new OffsetRange(range.getFrom(), range.getTo() / 2L));
            }
        }

        static class NonWindowObservingTestSplittableDoFn
        extends DoFn<String, String> {
            private static final ConcurrentMap<String, Latches> DOFN_INSTANCE_TO_LATCHES = new ConcurrentHashMap<String, Latches>();
            private static final long SPLIT_ELEMENT = 3L;
            private static final long CHECKPOINT_UPPER_BOUND = 8L;
            private final String uuid = UUID.randomUUID().toString();

            private Latches getLatches() {
                return DOFN_INSTANCE_TO_LATCHES.computeIfAbsent(this.uuid, uuid -> new Latches());
            }

            public void splitElementProcessed() {
                this.getLatches().splitElementProcessedLatch.countDown();
            }

            public void waitForSplitElementToBeProcessed() throws InterruptedException {
                if (!this.getLatches().splitElementProcessedLatch.await(30L, TimeUnit.SECONDS)) {
                    Assert.fail("Failed to wait for trySplit to occur.");
                }
            }

            public void trySplitPerformed() {
                this.getLatches().trySplitPerformedLatch.countDown();
            }

            public void waitForTrySplitPerformed() throws InterruptedException {
                if (!this.getLatches().trySplitPerformedLatch.await(30L, TimeUnit.SECONDS)) {
                    Assert.fail("Failed to wait for trySplit to occur.");
                }
            }

            public void setupBlockProcess() {
                this.getLatches().blockProcessLatch = new CountDownLatch(1);
            }

            public void enterProcessAndBlockIfEnabled() throws InterruptedException {
                this.getLatches().processEnteredLatch.countDown();
                if (!this.getLatches().blockProcessLatch.await(30L, TimeUnit.SECONDS)) {
                    Assert.fail("Failed to wait for unblockProcess to occur.");
                }
            }

            public void waitForProcessEntered() throws InterruptedException {
                if (!this.getLatches().processEnteredLatch.await(5L, TimeUnit.SECONDS)) {
                    Assert.fail("Failed to wait for process to begin.");
                }
            }

            public void unblockProcess() throws InterruptedException {
                this.getLatches().blockProcessLatch.countDown();
            }

            public void setAbortProcessing() {
                this.getLatches().abortProcessing.set(true);
            }

            public boolean shouldAbortProcessing() {
                return this.getLatches().abortProcessing.get();
            }

            private NonWindowObservingTestSplittableDoFn() {
            }

            @DoFn.ProcessElement
            public DoFn.ProcessContinuation processElement(DoFn.ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker, ManualWatermarkEstimator<Instant> watermarkEstimator) throws Exception {
                long checkpointUpperBound = 8L;
                long position = ((OffsetRange)tracker.currentRestriction()).getFrom();
                boolean claimStatus = true;
                while (!this.shouldAbortProcessing() && (claimStatus = tracker.tryClaim((Object)position))) {
                    if (position == 3L) {
                        this.splitElementProcessed();
                        this.waitForTrySplitPerformed();
                    }
                    context.outputWithTimestamp((Object)((String)context.element() + ":" + position), GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)position)));
                    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)position)));
                    if (++position != checkpointUpperBound) continue;
                }
                if (!claimStatus) {
                    return DoFn.ProcessContinuation.stop();
                }
                return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis((long)54321L));
            }

            @DoFn.GetInitialRestriction
            public OffsetRange restriction(@DoFn.Element String element) {
                return new OffsetRange(0L, (long)Integer.parseInt(element));
            }

            @DoFn.NewTracker
            public RestrictionTracker<OffsetRange, Long> newTracker(@DoFn.Restriction OffsetRange restriction) {
                return new OffsetRangeTracker(restriction);
            }

            @DoFn.SplitRestriction
            public void splitRange(@DoFn.Restriction OffsetRange range, DoFn.OutputReceiver<OffsetRange> receiver) {
                receiver.output((Object)new OffsetRange(range.getFrom(), (range.getFrom() + range.getTo()) / 2L));
                receiver.output((Object)new OffsetRange((range.getFrom() + range.getTo()) / 2L, range.getTo()));
            }

            @DoFn.TruncateRestriction
            public RestrictionTracker.TruncateResult<OffsetRange> truncateRestriction(@DoFn.Restriction OffsetRange range) throws Exception {
                return RestrictionTracker.TruncateResult.of((Object)new OffsetRange(range.getFrom(), range.getTo() / 2L));
            }

            @DoFn.GetInitialWatermarkEstimatorState
            public Instant getInitialWatermarkEstimatorState() {
                return GlobalWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)1L));
            }

            @DoFn.NewWatermarkEstimator
            public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant watermark) {
                return new WatermarkEstimators.Manual(watermark);
            }

            static class Latches {
                CountDownLatch blockProcessLatch = new CountDownLatch(0);
                CountDownLatch processEnteredLatch = new CountDownLatch(1);
                CountDownLatch splitElementProcessedLatch = new CountDownLatch(1);
                CountDownLatch trySplitPerformedLatch = new CountDownLatch(1);
                AtomicBoolean abortProcessing = new AtomicBoolean();
            }
        }

        private static class TestTimerfulDoFn
        extends DoFn<KV<String, String>, String> {
            @DoFn.StateId(value="bag")
            private final StateSpec<BagState<String>> bagStateSpec = StateSpecs.bag((Coder)StringUtf8Coder.of());
            @DoFn.TimerId(value="event")
            private final TimerSpec eventTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.TimerId(value="processing")
            private final TimerSpec processingTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);
            @DoFn.TimerFamily(value="event-family")
            private final TimerSpec eventTimerFamilySpec = TimerSpecs.timerMap((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.TimerFamily(value="processing-family")
            private final TimerSpec processingTimerFamilySpec = TimerSpecs.timerMap((TimeDomain)TimeDomain.PROCESSING_TIME);

            private TestTimerfulDoFn() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context, @DoFn.StateId(value="bag") BagState<String> bagState, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer, @DoFn.TimerFamily(value="event-family") TimerMap eventTimerFamily, @DoFn.TimerFamily(value="processing-family") TimerMap processingTimerFamily) {
                context.output((Object)("key:" + (String)((KV)context.element()).getKey() + " main" + (String)((KV)context.element()).getKey() + Iterables.toString((Iterable)bagState.read())));
                bagState.add((Object)((String)((KV)context.element()).getValue()));
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)1L)));
                eventTimeTimer.clear();
                processingTimeTimer.offset(Duration.millis((long)2L));
                processingTimeTimer.setRelative();
                processingTimeTimer.clear();
                eventTimerFamily.get("event-timer1").withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)3L)));
                eventTimerFamily.get("to-delete-event").set(context.timestamp().plus((ReadableDuration)Duration.millis((long)5L)));
                eventTimerFamily.get("to-delete-event").clear();
                processingTimerFamily.get("processing-timer1").offset(Duration.millis((long)4L)).setRelative();
                processingTimerFamily.get("to-delete-processing").offset(Duration.millis((long)4L)).setRelative();
                processingTimerFamily.get("to-delete-processing").clear();
            }

            @DoFn.OnTimer(value="event")
            public void eventTimer(DoFn.OnTimerContext context, @DoFn.Key String key, @DoFn.StateId(value="bag") BagState<String> bagState, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer, @DoFn.TimerFamily(value="event-family") TimerMap eventTimerFamily, @DoFn.TimerFamily(value="processing-family") TimerMap processingTimerFamily) {
                context.output((Object)("key:" + key + " event" + Iterables.toString((Iterable)bagState.read())));
                bagState.add((Object)"event");
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.fireTimestamp().plus((ReadableDuration)Duration.millis((long)31L)));
                processingTimeTimer.offset(Duration.millis((long)32L));
                processingTimeTimer.setRelative();
                eventTimerFamily.get("event-timer1").withOutputTimestamp(context.timestamp()).set(context.fireTimestamp().plus((ReadableDuration)Duration.millis((long)33L)));
                processingTimerFamily.get("processing-timer1").offset(Duration.millis((long)34L)).setRelative();
            }

            @DoFn.OnTimer(value="processing")
            public void processingTimer(DoFn.OnTimerContext context, @DoFn.Key String key, @DoFn.StateId(value="bag") BagState<String> bagState, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer, @DoFn.TimerFamily(value="event-family") TimerMap eventTimerFamily, @DoFn.TimerFamily(value="processing-family") TimerMap processingTimerFamily) {
                context.output((Object)("key:" + key + " processing" + Iterables.toString((Iterable)bagState.read())));
                bagState.add((Object)"processing");
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)61L)));
                processingTimeTimer.offset(Duration.millis((long)62L));
                processingTimeTimer.setRelative();
                eventTimerFamily.get("event-timer1").withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)63L)));
                processingTimerFamily.get("processing-timer1").offset(Duration.millis((long)64L)).setRelative();
            }

            @DoFn.OnTimerFamily(value="event-family")
            public void eventFamilyOnTimer(DoFn.OnTimerContext context, @DoFn.Key String key, @DoFn.Timestamp Instant ts, @DoFn.StateId(value="bag") BagState<String> bagState, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer, @DoFn.TimerFamily(value="event-family") TimerMap eventTimerFamily, @DoFn.TimerFamily(value="processing-family") TimerMap processingTimerFamily) {
                context.output((Object)("key:" + key + " event-family" + Iterables.toString((Iterable)bagState.read())));
                bagState.add((Object)"event-family");
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)71L)));
                processingTimeTimer.offset(Duration.millis((long)72L));
                processingTimeTimer.setRelative();
                eventTimerFamily.get("event-timer1").withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)73L)));
                processingTimerFamily.get("processing-timer1").offset(Duration.millis((long)74L)).setRelative();
            }

            @DoFn.OnTimerFamily(value="processing-family")
            public void processingFamilyOnTimer(DoFn.OnTimerContext context, @DoFn.Key String key, @DoFn.StateId(value="bag") BagState<String> bagState, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer, @DoFn.TimerFamily(value="event-family") TimerMap eventTimerFamily, @DoFn.TimerFamily(value="processing-family") TimerMap processingTimerFamily) {
                context.output((Object)("key:" + key + " processing-family" + Iterables.toString((Iterable)bagState.read())));
                bagState.add((Object)"processing-family");
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)81L)));
                processingTimeTimer.offset(Duration.millis((long)82L));
                processingTimeTimer.setRelative();
                eventTimerFamily.get("event-timer1").withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)83L)));
                processingTimerFamily.get("processing-timer1").offset(Duration.millis((long)84L)).setRelative();
            }
        }

        private class TestBeamFnDataOutboundAggregator
        extends BeamFnDataOutboundAggregator {
            private Map<LogicalEndpoint, List<Timer<?>>> timers;
            private Map<LogicalEndpoint, List<WindowedValue<String>>> dataOutput;
            private Supplier<String> processBundleRequestIdSupplier;

            public TestBeamFnDataOutboundAggregator(Supplier<String> bundleIdSupplier) {
                super(PipelineOptionsFactory.create(), bundleIdSupplier, null, false);
                this.timers = new HashMap();
                this.dataOutput = new HashMap<LogicalEndpoint, List<WindowedValue<String>>>();
                this.processBundleRequestIdSupplier = bundleIdSupplier;
            }

            public Map<LogicalEndpoint, List<Timer<?>>> getOutputTimers() {
                return this.timers;
            }

            public Map<LogicalEndpoint, List<WindowedValue<String>>> getOutputData() {
                return this.dataOutput;
            }

            public <T> FnDataReceiver<T> registerOutputDataLocation(String pTransformId, Coder<T> coder) {
                return data -> this.dataOutput.computeIfAbsent(LogicalEndpoint.data((String)this.processBundleRequestIdSupplier.get(), (String)pTransformId), e -> new ArrayList()).add((WindowedValue)data);
            }

            public <T> FnDataReceiver<T> registerOutputTimersLocation(String pTransformId, String timerFamilyId, Coder<T> coder) {
                return data -> this.timers.computeIfAbsent(LogicalEndpoint.timer((String)this.processBundleRequestIdSupplier.get(), (String)pTransformId, (String)timerFamilyId), e -> new ArrayList()).add((Timer)data);
            }
        }

        private static class TestSideInputIsAccessibleForDownstreamCallersDoFn
        extends DoFn<String, Iterable<String>> {
            public static final String USER_COUNTER_NAME = "userCountedElems";
            private final Counter countedElements = Metrics.counter(TestSideInputIsAccessibleForDownstreamCallersDoFn.class, (String)"userCountedElems");
            private final PCollectionView<Iterable<String>> iterableSideInput;

            private TestSideInputIsAccessibleForDownstreamCallersDoFn(PCollectionView<Iterable<String>> iterableSideInput) {
                this.iterableSideInput = iterableSideInput;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                this.countedElements.inc();
                context.output((Object)((Iterable)context.sideInput(this.iterableSideInput)));
            }
        }

        private static class TestNonWindowObservingDoFn
        extends DoFn<String, String> {
            private final TupleTag<String> additionalOutput;

            private TestNonWindowObservingDoFn(TupleTag<String> additionalOutput) {
                this.additionalOutput = additionalOutput;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                context.output((Object)((String)context.element() + ":main"));
                context.output(this.additionalOutput, (Object)((String)context.element() + ":additional"));
            }
        }

        private static class TestSideInputDoFn
        extends DoFn<String, String> {
            private final PCollectionView<String> defaultSingletonSideInput;
            private final PCollectionView<String> singletonSideInput;
            private final PCollectionView<Iterable<String>> iterableSideInput;
            private final TupleTag<String> additionalOutput;

            private TestSideInputDoFn(PCollectionView<String> defaultSingletonSideInput, PCollectionView<String> singletonSideInput, PCollectionView<Iterable<String>> iterableSideInput, TupleTag<String> additionalOutput) {
                this.defaultSingletonSideInput = defaultSingletonSideInput;
                this.singletonSideInput = singletonSideInput;
                this.iterableSideInput = iterableSideInput;
                this.additionalOutput = additionalOutput;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                context.output((Object)((String)context.element() + ":" + (String)context.sideInput(this.defaultSingletonSideInput)));
                context.output((Object)((String)context.element() + ":" + (String)context.sideInput(this.singletonSideInput)));
                for (String sideInputValue : (Iterable)context.sideInput(this.iterableSideInput)) {
                    context.output((Object)((String)context.element() + ":" + sideInputValue));
                }
                context.output(this.additionalOutput, (Object)((String)context.element() + ":additional"));
            }
        }

        private static class TestStatefulDoFn
        extends DoFn<KV<String, String>, String> {
            @DoFn.StateId(value="value")
            private final StateSpec<ValueState<String>> valueStateSpec = StateSpecs.value((Coder)StringUtf8Coder.of());
            @DoFn.StateId(value="bag")
            private final StateSpec<BagState<String>> bagStateSpec = StateSpecs.bag((Coder)StringUtf8Coder.of());
            @DoFn.StateId(value="combine")
            private final StateSpec<CombiningState<String, String, String>> combiningStateSpec = StateSpecs.combining((Coder)StringUtf8Coder.of(), (Combine.CombineFn)new ConcatCombineFn());

            private TestStatefulDoFn() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context, @DoFn.StateId(value="value") ValueState<String> valueState, @DoFn.StateId(value="bag") BagState<String> bagState, @DoFn.StateId(value="combine") CombiningState<String, String, String> combiningState) {
                context.output((Object)("value:" + (String)valueState.read()));
                valueState.write((Object)((String)((KV)context.element()).getValue()));
                context.output((Object)("bag:" + Iterables.toString((Iterable)bagState.read())));
                bagState.add((Object)((String)((KV)context.element()).getValue()));
                context.output((Object)("combine:" + (String)combiningState.read()));
                combiningState.add((Object)((String)((KV)context.element()).getValue()));
            }
        }

        private static class ConcatCombineFn
        extends Combine.CombineFn<String, String, String> {
            private ConcatCombineFn() {
            }

            public String createAccumulator() {
                return "";
            }

            public String addInput(String accumulator, String input) {
                return accumulator.concat(input);
            }

            public String mergeAccumulators(Iterable<String> accumulators) {
                StringBuilder builder = new StringBuilder();
                for (String value : accumulators) {
                    builder.append(value);
                }
                return builder.toString();
            }

            public String extractOutput(String accumulator) {
                return accumulator;
            }
        }
    }
}

