/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.fn.harness.logging.LoggingClient;
import org.apache.beam.fn.harness.logging.LoggingClientFactory;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class PCollectionConsumerRegistryTest {
    private static final Counter TEST_USER_COUNTER = Metrics.counter((String)"foo", (String)"bar");
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static final String P_COLLECTION_A = "pCollectionA";
    private static final String P_COLLECTION_B = "pCollectionB";
    private static final BeamFnApi.ProcessBundleDescriptor TEST_DESCRIPTOR;
    private ExecutionStateSampler sampler;

    @Before
    public void setUp() throws Exception {
        this.sampler = new ExecutionStateSampler(PipelineOptionsFactory.create(), System::currentTimeMillis);
    }

    @After
    public void tearDown() throws Exception {
        MetricsEnvironment.setCurrentContainer(null);
        this.sampler.stop();
    }

    @Test
    public void singleConsumer() throws Exception {
        String pTransformIdA = "pTransformIdA";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        FnDataReceiver consumerA1 = Mockito.mock(FnDataReceiver.class);
        consumers.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", consumerA1);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        String elementValue = "elem";
        WindowedValue element = WindowedValue.valueInGlobalWindow((Object)elementValue);
        int numElements = 10;
        for (int i = 0; i < numElements; ++i) {
            wrapperConsumer.accept((Object)element);
        }
        Mockito.verify(consumerA1, Mockito.times(numElements)).accept((Object)element);
        ArrayList<MetricsApi.MonitoringInfo> expected = new ArrayList<MetricsApi.MonitoringInfo>();
        SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        builder.setLabel("PCOLLECTION", P_COLLECTION_A);
        builder.setInt64SumValue(numElements);
        expected.add(builder.build());
        long elementByteSize = StringUtf8Coder.of().getEncodedElementByteSize(elementValue);
        builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        builder.setLabel("PCOLLECTION", P_COLLECTION_A);
        builder.setInt64DistributionValue(DistributionData.create((long)numElements * elementByteSize, numElements, elementByteSize, elementByteSize));
        expected.add(builder.build());
        HashMap<String, ByteString> actualData = new HashMap<String, ByteString>();
        reporterAndRegistrar.updateFinalMonitoringData(actualData);
        Iterable result = Iterables.filter(shortIds.toMonitoringInfo(actualData), monitoringInfo -> monitoringInfo.containsLabels("PCOLLECTION"));
        MatcherAssert.assertThat(result, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void singleConsumerException() throws Exception {
        String pTransformId = "pTransformId";
        String message = "testException";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        FnDataReceiver consumer = Mockito.mock(FnDataReceiver.class);
        consumers.register(P_COLLECTION_A, "pTransformId", "pTransformIdName", consumer);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        Mockito.doThrow(new Exception("testException")).when(consumer).accept((Object)((WindowedValue)Mockito.any()));
        this.expectedException.expectMessage("testException");
        this.expectedException.expect(Exception.class);
        wrapperConsumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)"elem"));
    }

    @Test
    public void noConsumers() throws Exception {
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        String elementValue = "elem";
        WindowedValue element = WindowedValue.valueInGlobalWindow((Object)elementValue);
        int numElements = 10;
        for (int i = 0; i < numElements; ++i) {
            wrapperConsumer.accept((Object)element);
        }
        ArrayList<MetricsApi.MonitoringInfo> expected = new ArrayList<MetricsApi.MonitoringInfo>();
        SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        builder.setLabel("PCOLLECTION", P_COLLECTION_A);
        builder.setInt64SumValue(numElements);
        expected.add(builder.build());
        long elementByteSize = StringUtf8Coder.of().getEncodedElementByteSize(elementValue);
        builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        builder.setLabel("PCOLLECTION", P_COLLECTION_A);
        builder.setInt64DistributionValue(DistributionData.create((long)numElements * elementByteSize, numElements, elementByteSize, elementByteSize));
        expected.add(builder.build());
        HashMap<String, ByteString> actualData = new HashMap<String, ByteString>();
        reporterAndRegistrar.updateFinalMonitoringData(actualData);
        Iterable result = Iterables.filter(shortIds.toMonitoringInfo(actualData), monitoringInfo -> monitoringInfo.containsLabels("PCOLLECTION"));
        MatcherAssert.assertThat(result, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void multipleConsumersSamePCollection() throws Exception {
        String pTransformIdA = "pTransformIdA";
        String pTransformIdB = "pTransformIdB";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        FnDataReceiver consumerA1 = Mockito.mock(FnDataReceiver.class);
        FnDataReceiver consumerA2 = Mockito.mock(FnDataReceiver.class);
        consumers.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", consumerA1);
        consumers.register(P_COLLECTION_A, "pTransformIdB", "pTransformIdBName", consumerA2);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        String elementValue = "elem";
        WindowedValue element = WindowedValue.valueInGlobalWindow((Object)elementValue);
        int numElements = 10;
        for (int i = 0; i < numElements; ++i) {
            wrapperConsumer.accept((Object)element);
        }
        Mockito.verify(consumerA1, Mockito.times(numElements)).accept((Object)element);
        Mockito.verify(consumerA2, Mockito.times(numElements)).accept((Object)element);
        ArrayList<MetricsApi.MonitoringInfo> expected = new ArrayList<MetricsApi.MonitoringInfo>();
        SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        builder.setLabel("PCOLLECTION", P_COLLECTION_A);
        builder.setInt64SumValue(numElements);
        expected.add(builder.build());
        long elementByteSize = StringUtf8Coder.of().getEncodedElementByteSize(elementValue);
        builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        builder.setLabel("PCOLLECTION", P_COLLECTION_A);
        builder.setInt64DistributionValue(DistributionData.create((long)numElements * elementByteSize, numElements, elementByteSize, elementByteSize));
        expected.add(builder.build());
        HashMap<String, ByteString> actualData = new HashMap<String, ByteString>();
        reporterAndRegistrar.updateFinalMonitoringData(actualData);
        Iterable result = Iterables.filter(shortIds.toMonitoringInfo(actualData), monitoringInfo -> monitoringInfo.containsLabels("PCOLLECTION"));
        MatcherAssert.assertThat(result, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void multipleConsumersSamePCollectionException() throws Exception {
        String pTransformId = "pTransformId";
        String message = "testException";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        FnDataReceiver consumerA1 = Mockito.mock(FnDataReceiver.class);
        FnDataReceiver consumerA2 = Mockito.mock(FnDataReceiver.class);
        consumers.register(P_COLLECTION_A, "pTransformId", "pTransformIdName", consumerA1);
        consumers.register(P_COLLECTION_A, "pTransformId", "pTransformIdName", consumerA2);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        Mockito.doThrow(new Exception("testException")).when(consumerA2).accept((Object)((WindowedValue)Mockito.any()));
        this.expectedException.expectMessage("testException");
        this.expectedException.expect(Exception.class);
        wrapperConsumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)"elem"));
    }

    @Test
    public void throwsOnRegisteringAfterMultiplexingConsumerWasInitialized() throws Exception {
        String pTransformId = "pTransformId";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        FnDataReceiver consumerA1 = Mockito.mock(FnDataReceiver.class);
        FnDataReceiver consumerA2 = Mockito.mock(FnDataReceiver.class);
        consumers.register(P_COLLECTION_A, "pTransformId", "pTransformIdName", consumerA1);
        consumers.getMultiplexingConsumer(P_COLLECTION_A);
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectMessage("cannot be register()-d after");
        consumers.register(P_COLLECTION_A, "pTransformId", "pTransformIdName", consumerA2);
    }

    @Test
    public void testMetricContainerUpdatedUponAcceptingElement() throws Exception {
        ExecutionStateSampler.ExecutionStateTracker executionStateTracker = this.sampler.create();
        MetricsEnvironment.setCurrentContainer((MetricsContainer)executionStateTracker.getMetricsContainer());
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        executionStateTracker.start("testBundle");
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(executionStateTracker, shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        consumers.register(P_COLLECTION_A, "pTransformA", "pTransformAName", unused -> TEST_USER_COUNTER.inc());
        consumers.register(P_COLLECTION_A, "pTransformB", "pTransformBName", unused -> TEST_USER_COUNTER.inc(2L));
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        WindowedValue element = WindowedValue.valueInGlobalWindow((Object)"elem");
        wrapperConsumer.accept((Object)element);
        TEST_USER_COUNTER.inc(3L);
        Assert.assertEquals(1L, executionStateTracker.getMetricsContainerRegistry().getContainer("pTransformA").getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        Assert.assertEquals(2L, executionStateTracker.getMetricsContainerRegistry().getContainer("pTransformB").getCounter(TEST_USER_COUNTER.getName()).getCumulative());
        Assert.assertEquals(3L, executionStateTracker.getMetricsContainerRegistry().getUnboundContainer().getCounter(TEST_USER_COUNTER.getName()).getCumulative());
    }

    @Test
    public void testHandlesSplitsPassedToOriginalConsumer() throws Exception {
        String pTransformIdA = "pTransformIdA";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        SplittingReceiver consumerA1 = Mockito.mock(SplittingReceiver.class);
        consumers.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", (FnDataReceiver)consumerA1);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        Assert.assertTrue(wrapperConsumer instanceof HandlesSplits);
        ((HandlesSplits)wrapperConsumer).getProgress();
        Mockito.verify(consumerA1).getProgress();
        ((HandlesSplits)wrapperConsumer).trySplit(0.3);
        Mockito.verify(consumerA1).trySplit(0.3);
    }

    @Test
    public void testLazyByteSizeEstimation() throws Exception {
        String pTransformIdA = "pTransformIdA";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
        FnDataReceiver consumerA1 = Mockito.mock(FnDataReceiver.class);
        consumers.register(P_COLLECTION_B, "pTransformIdA", "pTransformIdAName", consumerA1);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_B);
        String elementValue = "elem";
        long elementByteSize = StringUtf8Coder.of().getEncodedElementByteSize(elementValue);
        WindowedValue element = WindowedValue.valueInGlobalWindow(new TestElementByteSizeObservableIterable<String>(Arrays.asList(elementValue, elementValue), elementByteSize));
        int numElements = 10;
        Mockito.doAnswer(invocation -> {
            Object[] args = invocation.getArguments();
            WindowedValue arg = (WindowedValue)args[0];
            Iterator it = ((Iterable)arg.getValue()).iterator();
            while (it.hasNext()) {
                it.next();
            }
            return null;
        }).when(consumerA1).accept((Object)element);
        for (int i = 0; i < numElements; ++i) {
            wrapperConsumer.accept((Object)element);
        }
        Mockito.verify(consumerA1, Mockito.times(numElements)).accept((Object)element);
        ArrayList<MetricsApi.MonitoringInfo> expected = new ArrayList<MetricsApi.MonitoringInfo>();
        SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        builder.setLabel("PCOLLECTION", P_COLLECTION_B);
        builder.setInt64SumValue(numElements);
        expected.add(builder.build());
        builder = new SimpleMonitoringInfoBuilder();
        builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        builder.setLabel("PCOLLECTION", P_COLLECTION_B);
        long expectedBytes = (elementByteSize + 1L) * 2L + 5L;
        builder.setInt64DistributionValue(DistributionData.create((long)numElements * expectedBytes, numElements, expectedBytes, expectedBytes));
        expected.add(builder.build());
        HashMap<String, ByteString> actualData = new HashMap<String, ByteString>();
        reporterAndRegistrar.updateFinalMonitoringData(actualData);
        Iterable result = Iterables.filter(shortIds.toMonitoringInfo(actualData), monitoringInfo -> monitoringInfo.containsLabels("PCOLLECTION"));
        MatcherAssert.assertThat(result, Matchers.containsInAnyOrder(expected.toArray()));
    }

    @Test
    public void dataSampling() throws Exception {
        String pTransformIdA = "pTransformIdA";
        ShortIdMap shortIds = new ShortIdMap();
        BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
        DataSampler dataSampler = new DataSampler();
        PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(this.sampler.create(), shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR, dataSampler);
        FnDataReceiver consumerA1 = Mockito.mock(FnDataReceiver.class);
        consumers.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", consumerA1);
        FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
        String elementValue = "elem";
        WindowedValue element = WindowedValue.valueInGlobalWindow((Object)elementValue);
        int numElements = 10;
        for (int i = 0; i < numElements; ++i) {
            wrapperConsumer.accept((Object)element);
        }
        BeamFnApi.InstructionRequest request = BeamFnApi.InstructionRequest.newBuilder().setSampleData(BeamFnApi.SampleDataRequest.newBuilder()).build();
        BeamFnApi.InstructionResponse response = dataSampler.handleDataSampleRequest(request).build();
        Map elementSamplesMap = response.getSampleData().getElementSamplesMap();
        Assert.assertFalse(elementSamplesMap.isEmpty());
        BeamFnApi.SampleDataResponse.ElementList elementList = (BeamFnApi.SampleDataResponse.ElementList)elementSamplesMap.get(P_COLLECTION_A);
        Assert.assertNotNull(elementList);
        ArrayList<BeamFnApi.SampledElement> expectedSamples = new ArrayList<BeamFnApi.SampledElement>();
        StringUtf8Coder coder = StringUtf8Coder.of();
        for (int i = 0; i < numElements; ++i) {
            ByteStringOutputStream stream = new ByteStringOutputStream();
            coder.encode(elementValue, (OutputStream)stream);
            expectedSamples.add(BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
        }
        Assert.assertTrue(elementList.getElementsList().containsAll(expectedSamples));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void logsExceptionWithTransformId() throws Exception {
        String pTransformId = "pTransformId";
        String message = "testException";
        String instructionId = "instruction";
        Exception thrownException = new Exception("testException");
        AtomicBoolean clientClosedStream = new AtomicBoolean();
        ConcurrentLinkedQueue values = new ConcurrentLinkedQueue();
        final AtomicReference outboundServerObserver = new AtomicReference();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(logEntries -> values.addAll(logEntries.getLogEntriesList())).withOnCompleted(() -> {
            clientClosedStream.set(true);
            ((StreamObserver)outboundServerObserver.get()).onCompleted();
        }).build();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnLoggingGrpc.BeamFnLoggingImplBase(){

            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
                outboundServerObserver.set(outboundObserver);
                return inboundServerObserver;
            }
        })).build();
        server.start();
        ManagedChannel channel = InProcessChannelBuilder.forName((String)apiServiceDescriptor.getUrl()).build();
        ExecutionStateSampler sampler = new ExecutionStateSampler(PipelineOptionsFactory.create(), System::currentTimeMillis);
        ExecutionStateSampler.ExecutionStateTracker stateTracker = sampler.create();
        stateTracker.start("process-bundle");
        ExecutionStateSampler.ExecutionState state = stateTracker.create("shortId", "pTransformId", "pTransformId", "process");
        state.activate();
        BeamFnLoggingMDC.setInstructionId((String)"instruction");
        BeamFnLoggingMDC.setStateTracker((ExecutionStateSampler.ExecutionStateTracker)stateTracker);
        try (LoggingClient ignored = LoggingClientFactory.createAndStart((PipelineOptions)PipelineOptionsFactory.create(), (Endpoints.ApiServiceDescriptor)apiServiceDescriptor, descriptor -> channel);){
            ShortIdMap shortIds = new ShortIdMap();
            BundleProgressReporter.InMemory reporterAndRegistrar = new BundleProgressReporter.InMemory();
            PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(stateTracker, shortIds, (BundleProgressReporter.Registrar)reporterAndRegistrar, TEST_DESCRIPTOR);
            FnDataReceiver consumer = Mockito.mock(FnDataReceiver.class);
            consumers.register(P_COLLECTION_A, "pTransformId", "pTransformIdName", consumer);
            FnDataReceiver wrapperConsumer = consumers.getMultiplexingConsumer(P_COLLECTION_A);
            Mockito.doThrow(thrownException).when(consumer).accept((Object)((WindowedValue)Mockito.any()));
            this.expectedException.expectMessage("testException");
            this.expectedException.expect(Exception.class);
            wrapperConsumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)"elem"));
        }
        finally {
            BeamFnApi.LogEntry expectedEntry = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction").setTransformId("pTransformId").setMessage("Failed to process element for bundle \"process-bundle\"").build();
            ArrayList entries = new ArrayList(values);
            Assert.assertEquals(1L, entries.size());
            BeamFnApi.LogEntry actualEntry = (BeamFnApi.LogEntry)entries.get(0);
            BeamFnApi.LogEntry actualEntryCulled = BeamFnApi.LogEntry.newBuilder().setInstructionId(actualEntry.getInstructionId()).setTransformId(actualEntry.getTransformId()).setMessage(actualEntry.getMessage()).build();
            Assert.assertEquals(expectedEntry, actualEntryCulled);
            server.shutdownNow();
        }
    }

    static {
        SdkComponents sdkComponents = SdkComponents.create();
        try {
            String utf8CoderId = sdkComponents.registerCoder((Coder)StringUtf8Coder.of());
            String iterableUtf8CoderId = sdkComponents.registerCoder((Coder)IterableCoder.of((Coder)StringUtf8Coder.of()));
            TEST_DESCRIPTOR = BeamFnApi.ProcessBundleDescriptor.newBuilder().putPcollections(P_COLLECTION_A, RunnerApi.PCollection.newBuilder().setCoderId(utf8CoderId).build()).putPcollections(P_COLLECTION_B, RunnerApi.PCollection.newBuilder().setCoderId(iterableUtf8CoderId).build()).putAllCoders(sdkComponents.toComponents().getCodersMap()).build();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static abstract class SplittingReceiver<T>
    implements FnDataReceiver<T>,
    HandlesSplits {
        private SplittingReceiver() {
        }
    }

    private static class TestElementByteSizeObservableIterable<T>
    extends ElementByteSizeObservableIterable<T, ElementByteSizeObservableIterator<T>> {
        private List<T> elements;
        private long elementByteSize;

        public TestElementByteSizeObservableIterable(List<T> elements, long elementByteSize) {
            this.elements = elements;
            this.elementByteSize = elementByteSize;
        }

        protected ElementByteSizeObservableIterator createIterator() {
            return new ElementByteSizeObservableIterator(){
                private int index = 0;

                public boolean hasNext() {
                    return this.index < elements.size();
                }

                public Object next() {
                    this.notifyValueReturned(elementByteSize);
                    return elements.get(this.index++);
                }
            };
        }
    }
}

