/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.EmbeddedSdkHarness;
import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandler;
import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.BundleSplitHandler;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class SdkHarnessClientTest {
    @Rule
    public transient Timeout globalTimeout = Timeout.seconds((long)600L);
    @Mock
    public FnApiControlClient fnApiControlClient;
    @Mock
    public FnDataService dataService;
    @Captor
    ArgumentCaptor<CloseableFnDataReceiver<BeamFnApi.Elements>> outputReceiverCaptor;
    @Rule
    public EmbeddedSdkHarness harness = EmbeddedSdkHarness.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private SdkHarnessClient sdkHarnessClient;
    private BeamFnApi.ProcessBundleDescriptor descriptor;
    private static final String SDK_GRPC_READ_TRANSFORM = "read";
    private static final String SDK_GRPC_WRITE_TRANSFORM = "write";

    @Before
    public void setup() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        this.sdkHarnessClient = SdkHarnessClient.usingFnApiClient((InstructionRequestHandler)this.fnApiControlClient, (FnDataService)this.dataService);
        Pipeline userPipeline = Pipeline.create();
        TupleTag outputTag = new TupleTag();
        ((PCollection)userPipeline.apply("create", (PTransform)Create.of((Object)"foo", (Object[])new String[0]))).apply("proc", (PTransform)ParDo.of((DoFn)new TestFn()).withOutputTags(outputTag, TupleTagList.empty()));
        RunnerApi.Pipeline userProto = PipelineTranslation.toProto((Pipeline)userPipeline);
        BeamFnApi.ProcessBundleDescriptor.Builder pbdBuilder = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("my_id").putAllEnvironments(userProto.getComponents().getEnvironmentsMap()).putAllWindowingStrategies(userProto.getComponents().getWindowingStrategiesMap()).putAllCoders(userProto.getComponents().getCodersMap());
        RunnerApi.Coder fullValueCoder = CoderTranslation.toProto((Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE)).getCoder();
        pbdBuilder.putCoders("wire_coder", fullValueCoder);
        RunnerApi.PTransform targetProcessor = userProto.getComponents().getTransformsOrThrow("proc");
        BeamFnApi.RemoteGrpcPort port = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(this.harness.dataEndpoint()).setCoderId("wire_coder").build();
        RemoteGrpcPortRead readNode = RemoteGrpcPortRead.readFromPort((BeamFnApi.RemoteGrpcPort)port, (String)((String)Iterables.getOnlyElement(targetProcessor.getInputsMap().values())));
        RemoteGrpcPortWrite writeNode = RemoteGrpcPortWrite.writeToPort((String)((String)Iterables.getOnlyElement(targetProcessor.getOutputsMap().values())), (BeamFnApi.RemoteGrpcPort)port);
        for (String pc : targetProcessor.getInputsMap().values()) {
            pbdBuilder.putPcollections(pc, userProto.getComponents().getPcollectionsOrThrow(pc));
        }
        for (String pc : targetProcessor.getOutputsMap().values()) {
            pbdBuilder.putPcollections(pc, userProto.getComponents().getPcollectionsOrThrow(pc));
        }
        pbdBuilder.putTransforms("proc", targetProcessor).putTransforms(SDK_GRPC_READ_TRANSFORM, readNode.toPTransform()).putTransforms(SDK_GRPC_WRITE_TRANSFORM, writeNode.toPTransform());
        this.descriptor = pbdBuilder.build();
    }

    @Test
    public void testRegister() throws Exception {
        BeamFnApi.ProcessBundleDescriptor descriptor1 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor1").build();
        List<RemoteInputDestination> remoteInputs = Collections.singletonList(RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM));
        this.sdkHarnessClient.getProcessor(descriptor1, remoteInputs);
        ((FnApiControlClient)Mockito.verify((Object)this.fnApiControlClient)).registerProcessBundleDescriptor(descriptor1);
    }

    @Test
    public void testRegisterCachesBundleProcessors() throws Exception {
        BeamFnApi.ProcessBundleDescriptor descriptor1 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor1").build();
        BeamFnApi.ProcessBundleDescriptor descriptor2 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor2").build();
        List<RemoteInputDestination> remoteInputs = Collections.singletonList(RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM));
        SdkHarnessClient.BundleProcessor processor1 = this.sdkHarnessClient.getProcessor(descriptor1, remoteInputs);
        SdkHarnessClient.BundleProcessor processor2 = this.sdkHarnessClient.getProcessor(descriptor2, remoteInputs);
        Assert.assertNotSame((Object)processor1, (Object)processor2);
        Assert.assertSame((Object)processor1, (Object)this.sdkHarnessClient.getProcessor(descriptor1, remoteInputs));
    }

    @Test
    public void testRegisterWithStateRequiresStateDelegator() throws Exception {
        BeamFnApi.ProcessBundleDescriptor descriptor = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("test").setStateApiServiceDescriptor(Endpoints.ApiServiceDescriptor.newBuilder().setUrl("foo")).build();
        List<RemoteInputDestination> remoteInputs = Collections.singletonList(RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("containing a state");
        this.sdkHarnessClient.getProcessor(descriptor, remoteInputs);
    }

    @Test
    public void testNewBundleNoDataDoesNotCrash() throws Exception {
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)((BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class)));
        try (SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored());){
            BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
            processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        }
    }

    @Test
    public void testClosingActiveBundleMultipleTimesIsNoop() throws Exception {
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)((BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class)));
        SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored());
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
        processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        activeBundle.close();
        activeBundle.close();
    }

    @Test
    public void testProgressAndSplitCallsAreIgnoredWhenBundleIsComplete() throws Exception {
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)((BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class)));
        SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored());
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
        processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        activeBundle.close();
        ((FnApiControlClient)Mockito.verify((Object)this.fnApiControlClient)).registerProcessBundleDescriptor((BeamFnApi.ProcessBundleDescriptor)Matchers.any(BeamFnApi.ProcessBundleDescriptor.class));
        ((FnApiControlClient)Mockito.verify((Object)this.fnApiControlClient)).handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class));
        activeBundle.requestProgress();
        activeBundle.split(0.0);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.fnApiControlClient});
    }

    @Test
    public void testProgressHandlerOnCompletedHappensAfterOnProgress() throws Exception {
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        CompletableFuture progressResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenAnswer(invocationOnMock -> {
            switch (((BeamFnApi.InstructionRequest)invocationOnMock.getArgument(0)).getRequestCase()) {
                case PROCESS_BUNDLE: {
                    return processBundleResponseFuture;
                }
                case PROCESS_BUNDLE_PROGRESS: {
                    return progressResponseFuture;
                }
            }
            throw new IllegalArgumentException("Unexpected request " + invocationOnMock.getArgument(0));
        });
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)((BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class)));
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), mockProgressHandler);
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.newBuilder().putMonitoringData("test", ByteString.EMPTY).build();
        BeamFnApi.ProcessBundleProgressResponse progressResponse = BeamFnApi.ProcessBundleProgressResponse.newBuilder().putMonitoringData("test2", ByteString.EMPTY).build();
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        activeBundle.requestProgress();
        executor.schedule(() -> processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()), 1L, TimeUnit.SECONDS);
        executor.schedule(() -> progressResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(progressResponse).build()), 2L, TimeUnit.SECONDS);
        activeBundle.close();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{mockProgressHandler});
        ((BundleProgressHandler)inOrder.verify((Object)mockProgressHandler)).onProgress((BeamFnApi.ProcessBundleProgressResponse)Matchers.eq((Object)progressResponse));
        ((BundleProgressHandler)inOrder.verify((Object)mockProgressHandler)).onCompleted((BeamFnApi.ProcessBundleResponse)Matchers.eq((Object)response));
    }

    @Test
    public void testCheckpointHappensAfterAnySplitCalls() throws Exception {
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        CompletableFuture splitResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenAnswer(invocationOnMock -> {
            switch (((BeamFnApi.InstructionRequest)invocationOnMock.getArgument(0)).getRequestCase()) {
                case PROCESS_BUNDLE: {
                    return processBundleResponseFuture;
                }
                case PROCESS_BUNDLE_SPLIT: {
                    return splitResponseFuture;
                }
            }
            throw new IllegalArgumentException("Unexpected request " + invocationOnMock.getArgument(0));
        });
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)((BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class)));
        BundleCheckpointHandler mockCheckpointHandler = (BundleCheckpointHandler)Mockito.mock(BundleCheckpointHandler.class);
        BundleSplitHandler mockSplitHandler = (BundleSplitHandler)Mockito.mock(BundleSplitHandler.class);
        BundleFinalizationHandler mockFinalizationHandler = (BundleFinalizationHandler)Mockito.mock(BundleFinalizationHandler.class);
        SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), Collections.emptyMap(), StateRequestHandler.unsupported(), BundleProgressHandler.ignored(), mockSplitHandler, mockCheckpointHandler, mockFinalizationHandler);
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.newBuilder().addResidualRoots(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setTransformId("test").build()).build()).build();
        BeamFnApi.ProcessBundleSplitResponse splitResponse = BeamFnApi.ProcessBundleSplitResponse.newBuilder().addChannelSplits(BeamFnApi.ProcessBundleSplitResponse.ChannelSplit.newBuilder().setTransformId("test2")).build();
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        activeBundle.split(0.5);
        executor.schedule(() -> processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()), 1L, TimeUnit.SECONDS);
        executor.schedule(() -> splitResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(splitResponse).build()), 2L, TimeUnit.SECONDS);
        activeBundle.close();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{mockCheckpointHandler, mockSplitHandler});
        ((BundleSplitHandler)inOrder.verify((Object)mockSplitHandler)).split((BeamFnApi.ProcessBundleSplitResponse)Matchers.eq((Object)splitResponse));
        ((BundleCheckpointHandler)inOrder.verify((Object)mockCheckpointHandler)).onCheckpoint((BeamFnApi.ProcessBundleResponse)Matchers.eq((Object)response));
    }

    @Test
    public void testNewBundleAndProcessElements() throws Exception {
        SdkHarnessClient client = this.harness.client();
        SdkHarnessClient.BundleProcessor processor = client.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM)));
        ArrayList outputs = new ArrayList();
        try (SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.singletonMap(SDK_GRPC_WRITE_TRANSFORM, RemoteOutputReceiver.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)LengthPrefixCoder.of((Coder)StringUtf8Coder.of()), (Coder)GlobalWindow.Coder.INSTANCE), outputs::add)), BundleProgressHandler.ignored());){
            FnDataReceiver bundleInputReceiver = (FnDataReceiver)Iterables.getOnlyElement(activeBundle.getInputReceivers().values());
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"foo"));
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"bar"));
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"baz"));
        }
        MatcherAssert.assertThat(outputs, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"spam"), WindowedValue.valueInGlobalWindow((Object)"ham"), WindowedValue.valueInGlobalWindow((Object)"eggs")}));
    }

    @Test
    public void handleCleanupWhenInputSenderFails() throws Exception {
        RuntimeException testException = new RuntimeException();
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        ((FnDataService)Mockito.doNothing().when((Object)this.dataService)).registerReceiver((String)Matchers.any(), (CloseableFnDataReceiver)this.outputReceiverCaptor.capture());
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        ((BeamFnDataOutboundAggregator)Mockito.doThrow((Throwable[])new Throwable[]{testException}).when((Object)mockInputSender)).sendOrCollectBufferedDataAndFinishOutboundStreams();
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        try {
            SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)RemoteOutputReceiver.of((Coder)ByteArrayCoder.of(), (FnDataReceiver)((FnDataReceiver)Mockito.mock(FnDataReceiver.class)))), mockProgressHandler);
            if (activeBundle != null) {
                activeBundle.close();
            }
            Assert.fail((String)"Exception expected");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e);
            ((FnDataService)Mockito.verify((Object)this.dataService, (VerificationMode)Mockito.never())).unregisterReceiver((String)Matchers.any());
            Assert.assertThrows((String)"Inbound observer closed.", Exception.class, () -> ((CloseableFnDataReceiver)this.outputReceiverCaptor.getValue()).accept((Object)BeamFnApi.Elements.getDefaultInstance()));
        }
    }

    @Test
    public void handleCleanupWithStateWhenInputSenderFails() throws Exception {
        RuntimeException testException = new RuntimeException();
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        StateDelegator mockStateDelegator = (StateDelegator)Mockito.mock(StateDelegator.class);
        StateDelegator.Registration mockStateRegistration = (StateDelegator.Registration)Mockito.mock(StateDelegator.Registration.class);
        Mockito.when((Object)mockStateDelegator.registerForProcessBundleInstructionId((String)Matchers.any(), (StateRequestHandler)Matchers.any())).thenReturn((Object)mockStateRegistration);
        StateRequestHandler mockStateHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        Mockito.when((Object)mockStateHandler.getCacheTokens()).thenReturn(Collections.emptyList());
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)), mockStateDelegator);
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        ((BeamFnDataOutboundAggregator)Mockito.doThrow((Throwable[])new Throwable[]{testException}).when((Object)mockInputSender)).sendOrCollectBufferedDataAndFinishOutboundStreams();
        try {
            SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)RemoteOutputReceiver.of((Coder)ByteArrayCoder.of(), (FnDataReceiver)((FnDataReceiver)Mockito.mock(FnDataReceiver.class)))), mockStateHandler, mockProgressHandler);
            if (activeBundle != null) {
                activeBundle.close();
            }
            Assert.fail((String)"Exception expected");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e);
            ((StateDelegator.Registration)Mockito.verify((Object)mockStateRegistration)).abort();
            ((FnDataService)Mockito.verify((Object)this.dataService, (VerificationMode)Mockito.never())).unregisterReceiver((String)Matchers.any());
            Assert.assertThrows((String)"Inbound observer closed.", Exception.class, () -> ((CloseableFnDataReceiver)this.outputReceiverCaptor.getValue()).accept((Object)BeamFnApi.Elements.getDefaultInstance()));
        }
    }

    @Test
    public void handleCleanupWhenProcessingBundleFails() throws Exception {
        RuntimeException testException = new RuntimeException();
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        try {
            try (SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)RemoteOutputReceiver.of((Coder)ByteArrayCoder.of(), (FnDataReceiver)((FnDataReceiver)Mockito.mock(FnDataReceiver.class)))), mockProgressHandler);){
                processBundleResponseFuture.completeExceptionally(testException);
            }
            Assert.fail((String)"Exception expected");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
            ((FnDataService)Mockito.verify((Object)this.dataService, (VerificationMode)Mockito.never())).unregisterReceiver((String)Matchers.any());
            Assert.assertThrows((String)"Inbound observer closed.", Exception.class, () -> ((CloseableFnDataReceiver)this.outputReceiverCaptor.getValue()).accept((Object)BeamFnApi.Elements.getDefaultInstance()));
        }
    }

    @Test
    public void handleCleanupWithStateWhenProcessingBundleFails() throws Exception {
        RuntimeException testException = new RuntimeException();
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        StateDelegator mockStateDelegator = (StateDelegator)Mockito.mock(StateDelegator.class);
        StateDelegator.Registration mockStateRegistration = (StateDelegator.Registration)Mockito.mock(StateDelegator.Registration.class);
        Mockito.when((Object)mockStateDelegator.registerForProcessBundleInstructionId((String)Matchers.any(), (StateRequestHandler)Matchers.any())).thenReturn((Object)mockStateRegistration);
        StateRequestHandler mockStateHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        Mockito.when((Object)mockStateHandler.getCacheTokens()).thenReturn(Collections.emptyList());
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        CompletableFuture processBundleResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)), mockStateDelegator);
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        try {
            try (SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)RemoteOutputReceiver.of((Coder)ByteArrayCoder.of(), (FnDataReceiver)((FnDataReceiver)Mockito.mock(FnDataReceiver.class)))), mockStateHandler, mockProgressHandler);){
                processBundleResponseFuture.completeExceptionally(testException);
            }
            Assert.fail((String)"Exception expected");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
            ((StateDelegator.Registration)Mockito.verify((Object)mockStateRegistration)).abort();
            ((FnDataService)Mockito.verify((Object)this.dataService, (VerificationMode)Mockito.never())).unregisterReceiver((String)Matchers.any());
            Assert.assertThrows((String)"Inbound observer closed.", Exception.class, () -> ((CloseableFnDataReceiver)this.outputReceiverCaptor.getValue()).accept((Object)BeamFnApi.Elements.getDefaultInstance()));
        }
    }

    @Test
    public void handleCleanupWhenAwaitingOnClosingOutputReceivers() throws Exception {
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        ((FnDataService)Mockito.doNothing().when((Object)this.dataService)).registerReceiver((String)Matchers.any(), (CloseableFnDataReceiver)this.outputReceiverCaptor.capture());
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)RemoteOutputReceiver.of((Coder)ByteArrayCoder.of(), (FnDataReceiver)((FnDataReceiver)Mockito.mock(FnDataReceiver.class)))), mockProgressHandler);
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
        processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        ((CloseableFnDataReceiver)this.outputReceiverCaptor.getValue()).close();
        Assert.assertThrows((String)"Inbound observer closed.", Exception.class, () -> ((RemoteBundle)activeBundle).close());
    }

    @Test
    public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws Exception {
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        StateDelegator mockStateDelegator = (StateDelegator)Mockito.mock(StateDelegator.class);
        StateDelegator.Registration mockStateRegistration = (StateDelegator.Registration)Mockito.mock(StateDelegator.Registration.class);
        Mockito.when((Object)mockStateDelegator.registerForProcessBundleInstructionId((String)Matchers.any(), (StateRequestHandler)Matchers.any())).thenReturn((Object)mockStateRegistration);
        StateRequestHandler mockStateHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        Mockito.when((Object)mockStateHandler.getCacheTokens()).thenReturn(Collections.emptyList());
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)), mockStateDelegator);
        ((FnDataService)Mockito.doNothing().when((Object)this.dataService)).registerReceiver((String)Matchers.any(), (CloseableFnDataReceiver)this.outputReceiverCaptor.capture());
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)RemoteOutputReceiver.of((Coder)ByteArrayCoder.of(), (FnDataReceiver)((FnDataReceiver)Mockito.mock(FnDataReceiver.class)))), mockStateHandler, mockProgressHandler);
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
        processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        ((CloseableFnDataReceiver)this.outputReceiverCaptor.getValue()).close();
        Assert.assertThrows((String)"Inbound observer closed.", Exception.class, () -> ((RemoteBundle)activeBundle).close());
    }

    @Test
    public void verifyCacheTokensAreUsedInNewBundleRequest() throws InterruptedException {
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(CompletableFuture.completedFuture(BeamFnApi.InstructionResponse.newBuilder().build()));
        BeamFnApi.ProcessBundleDescriptor descriptor1 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("descriptor1").build();
        List<RemoteInputDestination> remoteInputs = Collections.singletonList(RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (String)SDK_GRPC_READ_TRANSFORM));
        SdkHarnessClient.BundleProcessor processor1 = this.sdkHarnessClient.getProcessor(descriptor1, remoteInputs);
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        StateRequestHandler stateRequestHandler = (StateRequestHandler)Mockito.mock(StateRequestHandler.class);
        List<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens = Collections.singletonList(BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().getDefaultInstanceForType());
        Mockito.when((Object)stateRequestHandler.getCacheTokens()).thenReturn(cacheTokens);
        processor1.newBundle((Map)ImmutableMap.of((Object)SDK_GRPC_WRITE_TRANSFORM, (Object)RemoteOutputReceiver.of((Coder)ByteArrayCoder.of(), (FnDataReceiver)((FnDataReceiver)Mockito.mock(FnDataReceiver.class)))), stateRequestHandler, BundleProgressHandler.ignored());
        ArgumentCaptor reqCaptor = ArgumentCaptor.forClass(BeamFnApi.InstructionRequest.class);
        ((FnApiControlClient)Mockito.verify((Object)this.fnApiControlClient, (VerificationMode)Mockito.times((int)1))).handle((BeamFnApi.InstructionRequest)reqCaptor.capture());
        List requests = reqCaptor.getAllValues();
        MatcherAssert.assertThat((Object)((BeamFnApi.InstructionRequest)requests.get(0)).getRequestCase(), (Matcher)org.hamcrest.Matchers.is((Object)BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE));
        MatcherAssert.assertThat((Object)((BeamFnApi.InstructionRequest)requests.get(0)).getProcessBundle().getCacheTokensList(), (Matcher)org.hamcrest.Matchers.is(cacheTokens));
    }

    @Test
    public void testBundleCheckpointCallback() throws Exception {
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        BundleSplitHandler mockSplitHandler = (BundleSplitHandler)Mockito.mock(BundleSplitHandler.class);
        BundleCheckpointHandler mockCheckpointHandler = (BundleCheckpointHandler)Mockito.mock(BundleCheckpointHandler.class);
        BundleFinalizationHandler mockFinalizationHandler = (BundleFinalizationHandler)Mockito.mock(BundleFinalizationHandler.class);
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.newBuilder().addResidualRoots(BeamFnApi.DelayedBundleApplication.getDefaultInstance()).build();
        try (SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), Collections.emptyMap(), request -> {
            throw new UnsupportedOperationException();
        }, mockProgressHandler, mockSplitHandler, mockCheckpointHandler, mockFinalizationHandler);){
            processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        }
        ((BundleProgressHandler)Mockito.verify((Object)mockProgressHandler)).onCompleted(response);
        ((BundleCheckpointHandler)Mockito.verify((Object)mockCheckpointHandler)).onCheckpoint(response);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockFinalizationHandler, mockSplitHandler});
    }

    @Test
    public void testBundleFinalizationCallback() throws Exception {
        String bundleId;
        BeamFnDataOutboundAggregator mockInputSender = (BeamFnDataOutboundAggregator)Mockito.mock(BeamFnDataOutboundAggregator.class);
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(this.descriptor, Collections.singletonList(RemoteInputDestination.of((Coder)coder, (String)SDK_GRPC_READ_TRANSFORM)));
        Mockito.when((Object)this.dataService.createOutboundAggregator((Supplier)Matchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)mockInputSender);
        BundleProgressHandler mockProgressHandler = (BundleProgressHandler)Mockito.mock(BundleProgressHandler.class);
        BundleSplitHandler mockSplitHandler = (BundleSplitHandler)Mockito.mock(BundleSplitHandler.class);
        BundleCheckpointHandler mockCheckpointHandler = (BundleCheckpointHandler)Mockito.mock(BundleCheckpointHandler.class);
        BundleFinalizationHandler mockFinalizationHandler = (BundleFinalizationHandler)Mockito.mock(BundleFinalizationHandler.class);
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.newBuilder().setRequiresFinalization(true).build();
        try (SdkHarnessClient.BundleProcessor.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap(), Collections.emptyMap(), request -> {
            throw new UnsupportedOperationException();
        }, mockProgressHandler, mockSplitHandler, mockCheckpointHandler, mockFinalizationHandler);){
            bundleId = activeBundle.getId();
            processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        }
        ((BundleProgressHandler)Mockito.verify((Object)mockProgressHandler)).onCompleted(response);
        ((BundleFinalizationHandler)Mockito.verify((Object)mockFinalizationHandler)).requestsFinalization(bundleId);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockCheckpointHandler, mockSplitHandler});
    }

    private static class TestFn
    extends DoFn<String, String> {
        private TestFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            if ("foo".equals(context.element())) {
                context.output((Object)"spam");
            } else if ("bar".equals(context.element())) {
                context.output((Object)"ham");
            } else {
                context.output((Object)"eggs");
            }
        }
    }
}

