/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactoryTestContext;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=Enclosed.class)
public class BeamFnDataReadRunnerTest {
    private static final Coder<String> ELEMENT_CODER = StringUtf8Coder.of();
    private static final String ELEMENT_CODER_SPEC_ID = "string-coder-id";
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(ELEMENT_CODER, (Coder)GlobalWindow.Coder.INSTANCE);
    private static final String CODER_SPEC_ID = "windowed-string-coder-id";
    private static final RunnerApi.Coder CODER_SPEC;
    private static final RunnerApi.Components COMPONENTS;
    private static final BeamFnApi.RemoteGrpcPort PORT_SPEC;
    private static final String DEFAULT_BUNDLE_ID = "57";
    private static final String INPUT_TRANSFORM_ID = "1";
    private static final String PTRANSFORM_ID = "ptransform_id";
    private static final MetricsApi.MonitoringInfo DATA_CHANNEL_READ_IDX_MONITORING_INFO;

    private static BeamFnDataReadRunner<String> createReadRunner(FnDataReceiver<WindowedValue<String>> consumer, String pTransformId) throws Exception {
        String localOutputId = "outputPC";
        RunnerApi.PTransform pTransform = RemoteGrpcPortRead.readFromPort((BeamFnApi.RemoteGrpcPort)PORT_SPEC, (String)localOutputId).toPTransform();
        PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(pTransformId, pTransform).processBundleInstructionId(DEFAULT_BUNDLE_ID).components(RunnerApi.Components.newBuilder().putAllPcollections((Map)ImmutableMap.of((Object)localOutputId, (Object)RunnerApi.PCollection.newBuilder().setCoderId(ELEMENT_CODER_SPEC_ID).build())).putAllCoders(COMPONENTS.getCodersMap()).putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()).build()).build();
        context.addPCollectionConsumer(localOutputId, consumer);
        return new BeamFnDataReadRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
    }

    private static void assertIntermediateMonitoringDataDataChannelReadIndexEquals(ShortIdMap shortIdMap, Collection<BundleProgressReporter> reporters, long expectedIndex) {
        HashMap monitoringData = new HashMap();
        for (BundleProgressReporter reporter : reporters) {
            reporter.updateIntermediateMonitoringData(monitoringData);
        }
        String shortId = shortIdMap.getOrCreateShortId(DATA_CHANNEL_READ_IDX_MONITORING_INFO);
        Assert.assertTrue(monitoringData.containsKey(shortId));
        Assert.assertEquals(expectedIndex, MonitoringInfoEncodings.decodeInt64Counter((ByteString)monitoringData.get(shortId)));
    }

    private static void assertFinalMonitoringDataDataChannelReadIndexEquals(ShortIdMap shortIdMap, Collection<BundleProgressReporter> reporters, long expectedIndex) {
        HashMap monitoringData = new HashMap();
        for (BundleProgressReporter reporter : reporters) {
            reporter.updateFinalMonitoringData(monitoringData);
        }
        String shortId = shortIdMap.getOrCreateShortId(DATA_CHANNEL_READ_IDX_MONITORING_INFO);
        Assert.assertTrue(monitoringData.containsKey(shortId));
        Assert.assertEquals(expectedIndex, MonitoringInfoEncodings.decodeInt64Counter((ByteString)monitoringData.get(shortId)));
    }

    private static BeamFnApi.ProcessBundleSplitResponse executeSplit(BeamFnDataReadRunner<String> readRunner, String pTransformId, String bundleId, long index, double fractionOfRemainder, long inputElements, List<Long> allowedSplitPoints) throws Exception {
        for (long i = -1L; i < index; ++i) {
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)Long.valueOf(i).toString()));
        }
        BeamFnApi.ProcessBundleSplitRequest request = BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId(bundleId).putDesiredSplits(pTransformId, BeamFnApi.ProcessBundleSplitRequest.DesiredSplit.newBuilder().setEstimatedInputElements(inputElements).setFractionOfRemainder(fractionOfRemainder).addAllAllowedSplitPoints(allowedSplitPoints).build()).build();
        BeamFnApi.ProcessBundleSplitResponse.Builder responseBuilder = BeamFnApi.ProcessBundleSplitResponse.newBuilder();
        readRunner.trySplit(request, responseBuilder);
        return responseBuilder.build();
    }

    private static BeamFnApi.ProcessBundleSplitResponse channelSplitResult(long firstResidualIndex) {
        return BeamFnApi.ProcessBundleSplitResponse.newBuilder().addChannelSplits(BeamFnApi.ProcessBundleSplitResponse.ChannelSplit.newBuilder().setLastPrimaryElement(firstResidualIndex - 1L).setFirstResidualElement(firstResidualIndex).build()).build();
    }

    private static BeamFnApi.ProcessBundleSplitResponse elementSplitResult(long index, double fractionOfRemainder) {
        return BeamFnApi.ProcessBundleSplitResponse.newBuilder().addPrimaryRoots(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("primary%.1f", fractionOfRemainder)).build()).addResidualRoots(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("residual%.1f", 1.0 - fractionOfRemainder)).build()).build()).addChannelSplits(BeamFnApi.ProcessBundleSplitResponse.ChannelSplit.newBuilder().setLastPrimaryElement(index - 1L).setFirstResidualElement(index + 1L).build()).build();
    }

    static {
        PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).setCoderId(CODER_SPEC_ID).build();
        try {
            RunnerApi.MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER);
            CODER_SPEC = coderAndComponents.getCoder();
            COMPONENTS = coderAndComponents.getComponents().toBuilder().putCoders(CODER_SPEC_ID, CODER_SPEC).putCoders(ELEMENT_CODER_SPEC_ID, CoderTranslation.toProto(ELEMENT_CODER).getCoder()).build();
        }
        catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
        DATA_CHANNEL_READ_IDX_MONITORING_INFO = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.DATA_CHANNEL_READ_INDEX).setType("beam:metrics:sum_int64:v1").putLabels("PTRANSFORM", INPUT_TRANSFORM_ID).build();
    }

    private static abstract class SplittingReceiver
    implements FnDataReceiver<WindowedValue<String>>,
    HandlesSplits {
        private SplittingReceiver() {
        }

        public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
            return HandlesSplits.SplitResult.of(Collections.singletonList(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("primary%.1f", fractionOfRemainder)).build()), Collections.singletonList(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("residual%.1f", 1.0 - fractionOfRemainder)).build()).build()));
        }
    }

    @RunWith(value=Parameterized.class)
    public static class ElementSplitWithAllowedSplitPointsTest {
        @Parameterized.Parameter(value=0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;
        @Parameterized.Parameter(value=1)
        public long index;
        @Parameterized.Parameter(value=2)
        public double elementProgress;
        @Parameterized.Parameter(value=3)
        public double fractionOfRemainder;
        @Parameterized.Parameter(value=4)
        public long bufferSize;
        @Parameterized.Parameter(value=5)
        public List<Long> allowedSplitPoints;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add((Object)new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(2L, 0.6), 2L, 0, 0.2, 5L, ImmutableList.of((Object)1L, (Object)2L, (Object)3L, (Object)4L, (Object)5L)}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 2L, 0, 0.2, 5L, ImmutableList.of((Object)1L, (Object)2L, (Object)4L, (Object)5L)}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(5L), 2L, 0, 0.2, 5L, ImmutableList.of((Object)1L, (Object)2L, (Object)5L)}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 2L, 0, 0.2, 5L, ImmutableList.of((Object)1L, (Object)3L, (Object)4L, (Object)5L)}).build();
        }

        @Test
        public void testElementSplittingWithAllowedSplitPoints() throws Exception {
            SplittingReceiver splittingReceiver = Mockito.mock(SplittingReceiver.class);
            Mockito.when(splittingReceiver.getProgress()).thenReturn(this.elementProgress);
            Mockito.when(splittingReceiver.trySplit(Matchers.anyDouble())).thenCallRealMethod();
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)splittingReceiver, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, this.allowedSplitPoints));
        }
    }

    @RunWith(value=Parameterized.class)
    public static class ElementSplitTest {
        @Parameterized.Parameter(value=0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;
        @Parameterized.Parameter(value=1)
        public long index;
        @Parameterized.Parameter(value=2)
        public double elementProgress;
        @Parameterized.Parameter(value=3)
        public double fractionOfRemainder;
        @Parameterized.Parameter(value=4)
        public long bufferSize;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 0L, 0, 0.51, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 0L, 0, 0.49, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0, 0.26, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0, 0.25, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(0L, 0.8), 0L, 0, 0.2, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(0L, 0.5), 0L, 0, 0.125, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0.5, 0.2, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 2L, 0, 0.6, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 2L, 0.9, 0.6, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(2L, 0.6), 2L, 0.5, 0.2, 4L}).build();
        }

        @Test
        public void testElementSplit() throws Exception {
            SplittingReceiver splittingReceiver = Mockito.mock(SplittingReceiver.class);
            Mockito.when(splittingReceiver.getProgress()).thenReturn(this.elementProgress);
            Mockito.when(splittingReceiver.trySplit(Matchers.anyDouble())).thenCallRealMethod();
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)splittingReceiver, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, Collections.EMPTY_LIST));
        }
    }

    @RunWith(value=Parameterized.class)
    public static class ChannelSplitWithAllowedSplitPointsTest {
        @Parameterized.Parameter(value=0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;
        @Parameterized.Parameter(value=1)
        public long index;
        @Parameterized.Parameter(value=2)
        public double fractionOfRemainder;
        @Parameterized.Parameter(value=3)
        public long bufferSize;
        @Parameterized.Parameter(value=4)
        public List<Long> allowedSplitPoints;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, 0.25, 16L, ImmutableList.of((Object)2L, (Object)3L, (Object)4L, (Object)5L)}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(5L), 0L, 0.25, 16L, ImmutableList.of((Object)2L, (Object)3L, (Object)5L)}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 0L, 0.25, 16L, ImmutableList.of((Object)2L, (Object)3L, (Object)6L)}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(5L), 0L, 0.25, 16L, ImmutableList.of((Object)5L, (Object)6L, (Object)7L)}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 0L, 0.25, 16L, ImmutableList.of((Object)1L, (Object)2L, (Object)3L)}).add((Object)new Object[]{BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance(), 5L, 0.25, 16L, ImmutableList.of((Object)1L, (Object)2L, (Object)3L)}).build();
        }

        @Test
        public void testChannelSplittingWithAllowedSplitPoints() throws Exception {
            ArrayList outputValues = new ArrayList();
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)outputValues::add, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, this.allowedSplitPoints));
        }
    }

    @RunWith(value=Parameterized.class)
    public static class ChannelSplitTest {
        @Parameterized.Parameter(value=0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;
        @Parameterized.Parameter(value=1)
        public long index;
        @Parameterized.Parameter(value=2)
        public double elementProgress;
        @Parameterized.Parameter(value=3)
        public double fractionOfRemainder;
        @Parameterized.Parameter(value=4)
        public long bufferSize;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0, 0, 16L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, 0, 0.24, 16L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, 0, 0.25, 16L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, 0, 0.26, 16L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(8L), 0L, 0, 0.5, 16L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(9L), 2, 0, 0.5, 16L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(11L), 6L, 0, 0.5, 16L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0.5, 0.25, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 0L, 0.9, 0.25, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 1L, 0, 0.25, 4L}).add((Object)new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 1L, 0.1, 0.25, 4L}).build();
        }

        @Test
        public void testChannelSplit() throws Exception {
            SplittingReceiver splittingReceiver = Mockito.mock(SplittingReceiver.class);
            Mockito.when(splittingReceiver.getProgress()).thenReturn(this.elementProgress);
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)splittingReceiver, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, Collections.EMPTY_LIST));
        }
    }

    @RunWith(value=JUnit4.class)
    public static class BeamFnDataReadRunnerExecutionTest {
        @Rule
        public TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);

        @Before
        public void setUp() {
            MockitoAnnotations.initMocks(this);
        }

        @Test
        public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception {
            ArrayList outputValues = new ArrayList();
            String localOutputId = "outputPC";
            RunnerApi.PTransform pTransform = RemoteGrpcPortRead.readFromPort((BeamFnApi.RemoteGrpcPort)PORT_SPEC, (String)localOutputId).toPTransform();
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID, pTransform).processBundleInstructionId(BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID).components(RunnerApi.Components.newBuilder().putAllPcollections((Map)ImmutableMap.of((Object)localOutputId, (Object)RunnerApi.PCollection.newBuilder().setCoderId(BeamFnDataReadRunnerTest.ELEMENT_CODER_SPEC_ID).build())).putAllCoders(COMPONENTS.getCodersMap()).putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()).build()).build();
            context.addPCollectionConsumer(localOutputId, outputValues::add);
            new BeamFnDataReadRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            MatcherAssert.assertThat(context.getTearDownFunctions(), org.hamcrest.Matchers.empty());
            MatcherAssert.assertThat(context.getStartBundleFunctions(), org.hamcrest.Matchers.empty());
            MatcherAssert.assertThat(context.getPCollectionConsumers().keySet(), org.hamcrest.Matchers.containsInAnyOrder(localOutputId));
            MatcherAssert.assertThat(context.getIncomingDataEndpoints().keySet(), org.hamcrest.Matchers.hasSize(1));
            DataEndpoint endpoint = (DataEndpoint)Iterables.getOnlyElement((Iterable)context.getIncomingDataEndpoints().get(PORT_SPEC.getApiServiceDescriptor()));
            Assert.assertEquals(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID, endpoint.getTransformId());
            Assert.assertEquals(CODER, endpoint.getCoder());
            endpoint.getReceiver().accept((Object)WindowedValue.valueInGlobalWindow((Object)"TestValue"));
            MatcherAssert.assertThat(outputValues, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"TestValue")));
            outputValues.clear();
            ((ThrowingRunnable)Iterables.getOnlyElement(context.getFinishBundleFunctions())).run();
        }

        @Test
        public void testReuseForMultipleBundles() throws Exception {
            String localOutputId = "outputPC";
            ArrayList outputValues = new ArrayList();
            AtomicReference<String> bundleId = new AtomicReference<String>("0");
            PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID, RemoteGrpcPortRead.readFromPort((BeamFnApi.RemoteGrpcPort)PORT_SPEC, (String)localOutputId).toPTransform()).processBundleInstructionIdSupplier(bundleId::get).components(RunnerApi.Components.newBuilder().putAllPcollections((Map)ImmutableMap.of((Object)localOutputId, (Object)RunnerApi.PCollection.newBuilder().setCoderId(BeamFnDataReadRunnerTest.ELEMENT_CODER_SPEC_ID).build())).putAllCoders(COMPONENTS.getCodersMap()).putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()).build()).build();
            context.addPCollectionConsumer(localOutputId, outputValues::add);
            BeamFnDataReadRunner readRunner = new BeamFnDataReadRunner.Factory().createRunnerForPTransform((PTransformRunnerFactory.Context)context);
            MatcherAssert.assertThat(context.getIncomingDataEndpoints().keySet(), org.hamcrest.Matchers.hasSize(1));
            DataEndpoint endpoint = (DataEndpoint)Iterables.getOnlyElement((Iterable)context.getIncomingDataEndpoints().get(PORT_SPEC.getApiServiceDescriptor()));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), -1L);
            endpoint.getReceiver().accept((Object)WindowedValue.valueInGlobalWindow((Object)"ABC"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 0L);
            endpoint.getReceiver().accept((Object)WindowedValue.valueInGlobalWindow((Object)"DEF"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 1L);
            readRunner.blockTillReadFinishes();
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 2L);
            BeamFnDataReadRunnerTest.assertFinalMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 2L);
            MatcherAssert.assertThat(outputValues, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"ABC"), WindowedValue.valueInGlobalWindow((Object)"DEF")));
            bundleId.set(null);
            readRunner.reset();
            outputValues.clear();
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), -1L);
            bundleId.set(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID);
            endpoint.getReceiver().accept((Object)WindowedValue.valueInGlobalWindow((Object)"GHI"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 0L);
            endpoint.getReceiver().accept((Object)WindowedValue.valueInGlobalWindow((Object)"JKL"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 1L);
            readRunner.blockTillReadFinishes();
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 2L);
            BeamFnDataReadRunnerTest.assertFinalMonitoringDataDataChannelReadIndexEquals(context.getShortIdMap(), context.getBundleProgressReporters(), 2L);
            MatcherAssert.assertThat(outputValues, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"GHI"), WindowedValue.valueInGlobalWindow((Object)"JKL")));
        }

        @Test
        public void testRegistration() {
            for (PTransformRunnerFactory.Registrar registrar : ServiceLoader.load(PTransformRunnerFactory.Registrar.class)) {
                if (!(registrar instanceof BeamFnDataReadRunner.Registrar)) continue;
                MatcherAssert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey("beam:runner:source:v1"));
                return;
            }
            Assert.fail("Expected registrar not found.");
        }

        @Test
        public void testSplittingBeforeStartBundle() throws Exception {
            ArrayList outputValues = new ArrayList();
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)outputValues::add, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnDataReadRunnerTest.channelSplitResult(5L), BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, -1L, 0.5, 10L, Collections.EMPTY_LIST));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"A"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"B"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"C"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"D"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"E"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"F"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"G"));
            MatcherAssert.assertThat(outputValues, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"A"), WindowedValue.valueInGlobalWindow((Object)"B"), WindowedValue.valueInGlobalWindow((Object)"C"), WindowedValue.valueInGlobalWindow((Object)"D"), WindowedValue.valueInGlobalWindow((Object)"E")));
        }

        @Test
        public void testSplittingWhenNoElementsProcessed() throws Exception {
            ArrayList outputValues = new ArrayList();
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)outputValues::add, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnDataReadRunnerTest.channelSplitResult(5L), BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, -1L, 0.5, 10L, Collections.EMPTY_LIST));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"A"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"B"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"C"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"D"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"E"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"F"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"G"));
            MatcherAssert.assertThat(outputValues, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"A"), WindowedValue.valueInGlobalWindow((Object)"B"), WindowedValue.valueInGlobalWindow((Object)"C"), WindowedValue.valueInGlobalWindow((Object)"D"), WindowedValue.valueInGlobalWindow((Object)"E")));
        }

        @Test
        public void testSplittingWhenSomeElementsProcessed() throws Exception {
            ArrayList outputValues = new ArrayList();
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)outputValues::add, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnDataReadRunnerTest.channelSplitResult(6L), BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, 1L, 0.5, 10L, Collections.EMPTY_LIST));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"2"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"3"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"4"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"5"));
            MatcherAssert.assertThat(outputValues, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"-1"), WindowedValue.valueInGlobalWindow((Object)"0"), WindowedValue.valueInGlobalWindow((Object)BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID), WindowedValue.valueInGlobalWindow((Object)"2"), WindowedValue.valueInGlobalWindow((Object)"3"), WindowedValue.valueInGlobalWindow((Object)"4")));
        }

        @Test
        public void testSplittingAfterReuse() throws Exception {
            ArrayList outputValues = new ArrayList();
            BeamFnDataReadRunner readRunner = BeamFnDataReadRunnerTest.createReadRunner((FnDataReceiver<WindowedValue<String>>)outputValues::add, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance(), BeamFnDataReadRunnerTest.executeSplit((BeamFnDataReadRunner<String>)readRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, "previousBundleId", 1L, 0.25, 10L, Collections.EMPTY_LIST));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"2"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"3"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"4"));
            readRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow((Object)"5"));
            MatcherAssert.assertThat(outputValues, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow((Object)"-1"), WindowedValue.valueInGlobalWindow((Object)"0"), WindowedValue.valueInGlobalWindow((Object)BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID), WindowedValue.valueInGlobalWindow((Object)"2"), WindowedValue.valueInGlobalWindow((Object)"3"), WindowedValue.valueInGlobalWindow((Object)"4"), WindowedValue.valueInGlobalWindow((Object)"5")));
        }
    }
}

