/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMatchers;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.LogWriter;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.HeaderAccessor;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.util.construction.Timer;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.FusedPipeline;
import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
import org.apache.beam.sdk.util.construction.graph.SideInputReference;
import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class RemoteExecutionTest
implements Serializable {
    @Rule
    public transient Timeout globalTimeout = Timeout.seconds((long)600L);
    @Rule
    public transient ResetDateTimeProvider resetDateTimeProvider = new ResetDateTimeProvider();
    private static final String WORKER_ID = "remote_test";
    private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private transient GrpcFnServer<GrpcDataService> dataServer;
    private transient GrpcFnServer<GrpcStateService> stateServer;
    private transient LogCapturer logCapturer;
    private transient GrpcFnServer<GrpcLoggingService> loggingServer;
    private transient GrpcStateService stateDelegator;
    private transient SdkHarnessClient controlClient;
    private transient ExecutorService serverExecutor;
    private transient ExecutorService sdkHarnessExecutor;
    private transient Future<?> sdkHarnessExecutorFuture;

    public void launchSdkHarness(PipelineOptions options) throws Exception {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
        this.serverExecutor = Executors.newCachedThreadPool(threadFactory);
        InProcessServerFactory serverFactory = InProcessServerFactory.create();
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcDataService.create((PipelineOptions)PipelineOptionsFactory.create(), (ExecutorService)this.serverExecutor, (OutboundObserverFactory)OutboundObserverFactory.serverDirect()), (ServerFactory)serverFactory);
        this.logCapturer = new LogCapturer();
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcLoggingService.forWriter((LogWriter)this.logCapturer), (ServerFactory)serverFactory);
        this.stateDelegator = GrpcStateService.create();
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor((FnService)this.stateDelegator, (ServerFactory)serverFactory);
        MapControlClientPool clientPool = MapControlClientPool.create();
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor((FnService)FnApiControlClientPoolService.offeringClientsToPool((ControlClientPool.Sink)clientPool.getSink(), (HeaderAccessor)GrpcContextHeaderAccessorProvider.getHeaderAccessor()), (ServerFactory)serverFactory);
        this.sdkHarnessExecutor = Executors.newSingleThreadExecutor(threadFactory);
        this.sdkHarnessExecutorFuture = this.sdkHarnessExecutor.submit(() -> {
            try {
                FnHarness.main((String)WORKER_ID, (PipelineOptions)options, Collections.emptySet(), (Endpoints.ApiServiceDescriptor)this.loggingServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.controlServer.getApiServiceDescriptor(), null, (ManagedChannelFactory)ManagedChannelFactory.createInProcess(), (OutboundObserverFactory)OutboundObserverFactory.clientDirect(), (Cache)Caches.eternal());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        InstructionRequestHandler controlClient = clientPool.getSource().take(WORKER_ID, java.time.Duration.ofSeconds(10L));
        this.controlClient = SdkHarnessClient.usingFnApiClient((InstructionRequestHandler)controlClient, (FnDataService)((FnDataService)this.dataServer.getService()));
    }

    @After
    public void tearDown() throws Exception {
        block3: {
            this.controlServer.close();
            this.stateServer.close();
            this.dataServer.close();
            this.loggingServer.close();
            this.controlClient.close();
            this.sdkHarnessExecutor.shutdownNow();
            this.serverExecutor.shutdownNow();
            try {
                this.sdkHarnessExecutorFuture.get();
            }
            catch (ExecutionException e) {
                Throwable ex = e.getCause();
                while (ex instanceof RuntimeException) {
                    ex = ex.getCause();
                }
                if (ex instanceof InterruptedException) break block3;
                throw e;
            }
        }
        this.logCapturer = null;
    }

    @Test
    public void testExecution() throws Exception {
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        ((PCollection)((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)"zero");
                ctxt.output((Object)"one");
                ctxt.output((Object)"two");
            }
        }))).apply("len", (PTransform)ParDo.of((DoFn)new DoFn<String, Long>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)((String)ctxt.element()).length());
            }
        }))).apply("addKeys", (PTransform)WithKeys.of((Object)"foo"))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianLongCoder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Preconditions.checkState((fused.getFusedStages().size() == 1 ? 1 : 0) != 0, (Object)"Expected exactly one fused stage");
        ExecutableStage stage = (ExecutableStage)fused.getFusedStages().iterator().next();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"my_stage", (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[0]));
        }
        for (Collection windowedValues : outputValues.values()) {
            MatcherAssert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new Object[]{WindowedValue.valueInGlobalWindow(this.byteValueOf("foo", 4L)), WindowedValue.valueInGlobalWindow(this.byteValueOf("foo", 3L)), WindowedValue.valueInGlobalWindow(this.byteValueOf("foo", 3L))}));
        }
    }

    @Test
    public void testLogging() throws Exception {
        String instructionId;
        long startTime = System.currentTimeMillis();
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        ((PCollection)((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)"zero");
            }
        }))).apply("len", (PTransform)ParDo.of((DoFn)new DoFn<String, Long>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                Logger logger = LoggerFactory.getLogger(RemoteExecutionTest.class);
                logger.warn("TEST" + (String)ctxt.element());
                logger.error("TEST_EXCEPTION" + (String)ctxt.element(), (Throwable)new Exception());
            }
        }))).apply("addKeys", (PTransform)WithKeys.of((Object)"foo"))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianLongCoder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Preconditions.checkState((fused.getFusedStages().size() == 1 ? 1 : 0) != 0, (Object)"Expected exactly one fused stage");
        ExecutableStage stage = (ExecutableStage)fused.getFusedStages().iterator().next();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"my_stage", (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor());
        String ptransformId = null;
        for (Map.Entry entry : descriptor.getProcessBundleDescriptor().getTransformsMap().entrySet()) {
            if (!((RunnerApi.PTransform)entry.getValue()).getUniqueName().contains("len")) continue;
            ptransformId = (String)entry.getKey();
        }
        Assert.assertNotNull(ptransformId);
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());){
            instructionId = bundle.getId();
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[0]));
        }
        while (System.currentTimeMillis() - startTime < 30000L) {
            BeamFnApi.LogEntry[] logs = this.logCapturer.capturedLogs.toArray(new BeamFnApi.LogEntry[0]);
            boolean foundPTransformLog = false;
            boolean foundExceptionLog = false;
            for (BeamFnApi.LogEntry log : logs) {
                MatcherAssert.assertThat((Object)(log.getTimestamp().getSeconds() * 1000L + (long)(log.getTimestamp().getNanos() / 1000000)), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(startTime)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(System.currentTimeMillis()))));
                MatcherAssert.assertThat((Object)log.getThread(), (Matcher)Matchers.not((Object)""));
                MatcherAssert.assertThat((Object)log.getLogLocation(), (Matcher)Matchers.not((Object)""));
                if ("TESTzero".equals(log.getMessage())) {
                    MatcherAssert.assertThat((Object)log.getSeverity(), (Matcher)Matchers.equalTo((Object)BeamFnApi.LogEntry.Severity.Enum.WARN));
                    MatcherAssert.assertThat((Object)log.getInstructionId(), (Matcher)Matchers.equalTo((Object)instructionId));
                    MatcherAssert.assertThat((Object)log.getLogLocation(), (Matcher)Matchers.equalTo((Object)RemoteExecutionTest.class.getCanonicalName()));
                    MatcherAssert.assertThat((Object)log.getTransformId(), (Matcher)Matchers.equalTo((Object)ptransformId));
                    MatcherAssert.assertThat((Object)log.getTrace(), (Matcher)Matchers.equalTo((Object)""));
                    foundPTransformLog = true;
                    continue;
                }
                if (!"TEST_EXCEPTIONzero".equals(log.getMessage())) continue;
                MatcherAssert.assertThat((Object)log.getSeverity(), (Matcher)Matchers.equalTo((Object)BeamFnApi.LogEntry.Severity.Enum.ERROR));
                MatcherAssert.assertThat((Object)log.getInstructionId(), (Matcher)Matchers.equalTo((Object)instructionId));
                MatcherAssert.assertThat((Object)log.getLogLocation(), (Matcher)Matchers.equalTo((Object)RemoteExecutionTest.class.getCanonicalName()));
                MatcherAssert.assertThat((Object)log.getTransformId(), (Matcher)Matchers.equalTo((Object)ptransformId));
                MatcherAssert.assertThat((Object)log.getTrace(), (Matcher)Matchers.containsString((String)"RemoteExecutionTest"));
                foundExceptionLog = true;
            }
            if (foundPTransformLog && foundExceptionLog) break;
            Thread.sleep(500L);
        }
    }

    @Test
    public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        ((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) throws Exception {
                String element = (String)CoderUtils.decodeFromByteArray((Coder)StringUtf8Coder.of(), (byte[])((byte[])ctxt.element()));
                if (element.equals("X")) {
                    throw new Exception("testBundleExecutionFailure");
                }
                ctxt.output((Object)KV.of((Object)element, (Object)element));
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Preconditions.checkState((fused.getFusedStages().size() == 1 ? 1 : 0) != 0, (Object)"Expected exactly one fused stage");
        ExecutableStage stage = (ExecutableStage)fused.getFusedStages().iterator().next();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"my_stage", (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"Y")));
        }
        try {
            bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());
            try {
                ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"X")));
            }
            finally {
                if (bundle != null) {
                    bundle.close();
                }
            }
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("testBundleExecutionFailure"));
        }
        bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());
        try {
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"Z")));
        }
        finally {
            if (bundle != null) {
                bundle.close();
            }
        }
        for (Collection windowedValues : outputValues.values()) {
            MatcherAssert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new Object[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"Y")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Z", (Object)"Z"))}));
        }
    }

    @Test
    public void testExecutionWithSideInput() throws Exception {
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
        ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
        PCollection input = ((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)"zero");
                ctxt.output((Object)"one");
                ctxt.output((Object)"two");
            }
        }))).setCoder((Coder)StringUtf8Coder.of());
        final PCollectionView iterableView = (PCollectionView)input.apply("createIterableSideInput", (PTransform)View.asIterable());
        final PCollectionView multimapView = (PCollectionView)((PCollection)input.apply((PTransform)WithKeys.of((Object)"key"))).apply("createMultimapSideInput", (PTransform)View.asMultimap());
        ((PCollection)input.apply("readSideInput", (PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, String>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                for (String string : (Iterable)context.sideInput(iterableView)) {
                    context.output((Object)KV.of((Object)((String)context.element()), (Object)string));
                }
                for (Map.Entry entry : ((Map)context.sideInput(multimapView)).entrySet()) {
                    for (String value : (Iterable)entry.getValue()) {
                        context.output((Object)KV.of((Object)((String)context.element()), (Object)((String)entry.getKey() + ":" + value)));
                    }
                }
            }
        }).withSideInputs(new PCollectionView[]{iterableView, multimapView}))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getSideInputs().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with side inputs.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        StateRequestHandler stateRequestHandler = StateRequestHandlers.forSideInputHandlerFactory((Map)descriptor.getSideInputSpecs(), (StateRequestHandlers.SideInputHandlerFactory)new StateRequestHandlers.SideInputHandlerFactory(){

            public <V, W extends BoundedWindow> StateRequestHandlers.IterableSideInputHandler<V, W> forIterableSideInput(String pTransformId, String sideInputId, final Coder<V> elementCoder, Coder<W> windowCoder) {
                return new StateRequestHandlers.IterableSideInputHandler<V, W>(){

                    public Iterable<V> get(W window) {
                        return Arrays.asList("A", "B", "C");
                    }

                    public Coder<V> elementCoder() {
                        return elementCoder;
                    }
                };
            }

            public <K, V, W extends BoundedWindow> StateRequestHandlers.MultimapSideInputHandler<K, V, W> forMultimapSideInput(String pTransformId, String sideInputId, final KvCoder<K, V> elementCoder, Coder<W> windowCoder) {
                return new StateRequestHandlers.MultimapSideInputHandler<K, V, W>(){

                    public Iterable<K> get(W window) {
                        return Arrays.asList("key1", "key2");
                    }

                    public Iterable<V> get(K key, W window) {
                        if ("key1".equals(key)) {
                            return Arrays.asList("H", "I", "J");
                        }
                        if ("key2".equals(key)) {
                            return Arrays.asList("M", "N", "O");
                        }
                        return Collections.emptyList();
                    }

                    public Coder<K> keyCoder() {
                        return elementCoder.getKeyCoder();
                    }

                    public Coder<V> valueCoder() {
                        return elementCoder.getValueCoder();
                    }
                };
            }
        });
        BundleProgressHandler progressHandler = BundleProgressHandler.ignored();
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, stateRequestHandler, progressHandler);){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)"X"));
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)"Y"));
        }
        for (Collection windowedValues : outputValues.values()) {
            MatcherAssert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"C")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key1:H")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key1:I")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key1:J")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key2:M")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key2:N")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key2:O")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"C")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key1:H")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key1:I")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key1:J")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key2:M")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key2:N")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key2:O"))}));
        }
    }

    @Test
    public void testExecutionWithSideInputCaching() throws Exception {
        Pipeline p = Pipeline.create();
        ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
        ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"use_runner_v2");
        this.launchSdkHarness(p.getOptions());
        PCollection input = ((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)"zero");
                ctxt.output((Object)"one");
                ctxt.output((Object)"two");
            }
        }))).setCoder((Coder)StringUtf8Coder.of());
        final PCollectionView iterableView = (PCollectionView)input.apply("createIterableSideInput", (PTransform)View.asIterable());
        final PCollectionView multimapView = (PCollectionView)((PCollection)input.apply((PTransform)WithKeys.of((Object)"key"))).apply("createMultimapSideInput", (PTransform)View.asMultimap());
        ((PCollection)input.apply("readSideInput", (PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, String>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                for (String string : (Iterable)context.sideInput(iterableView)) {
                    context.output((Object)KV.of((Object)((String)context.element()), (Object)string));
                }
                for (Map.Entry entry : ((Map)context.sideInput(multimapView)).entrySet()) {
                    for (String value : (Iterable)entry.getValue()) {
                        context.output((Object)KV.of((Object)((String)context.element()), (Object)((String)entry.getKey() + ":" + value)));
                    }
                }
            }
        }).withSideInputs(new PCollectionView[]{iterableView, multimapView}))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getSideInputs().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with side inputs.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        StoringStateRequestHandler stateRequestHandler = new StoringStateRequestHandler(StateRequestHandlers.forSideInputHandlerFactory((Map)descriptor.getSideInputSpecs(), (StateRequestHandlers.SideInputHandlerFactory)new StateRequestHandlers.SideInputHandlerFactory(){

            public <V, W extends BoundedWindow> StateRequestHandlers.IterableSideInputHandler<V, W> forIterableSideInput(String pTransformId, String sideInputId, final Coder<V> elementCoder, Coder<W> windowCoder) {
                return new StateRequestHandlers.IterableSideInputHandler<V, W>(){

                    public Iterable<V> get(W window) {
                        return Arrays.asList("A", "B", "C");
                    }

                    public Coder<V> elementCoder() {
                        return elementCoder;
                    }
                };
            }

            public <K, V, W extends BoundedWindow> StateRequestHandlers.MultimapSideInputHandler<K, V, W> forMultimapSideInput(String pTransformId, String sideInputId, final KvCoder<K, V> elementCoder, Coder<W> windowCoder) {
                return new StateRequestHandlers.MultimapSideInputHandler<K, V, W>(){

                    public Iterable<K> get(W window) {
                        return Arrays.asList("key1", "key2");
                    }

                    public Iterable<V> get(K key, W window) {
                        if ("key1".equals(key)) {
                            return Arrays.asList("H", "I", "J");
                        }
                        if ("key2".equals(key)) {
                            return Arrays.asList("M", "N", "O");
                        }
                        return Collections.emptyList();
                    }

                    public Coder<K> keyCoder() {
                        return elementCoder.getKeyCoder();
                    }

                    public Coder<V> valueCoder() {
                        return elementCoder.getValueCoder();
                    }
                };
            }
        }));
        String transformId = ((SideInputReference)Iterables.get((Iterable)stage2.getSideInputs(), (int)0)).transform().getId();
        stateRequestHandler.addCacheToken(BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setSideInput(BeamFnApi.ProcessBundleRequest.CacheToken.SideInput.newBuilder().setSideInputId(iterableView.getTagInternal().getId()).setTransformId(transformId).build()).setToken(ByteString.copyFromUtf8((String)"IterableSideInputToken")).build());
        stateRequestHandler.addCacheToken(BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setSideInput(BeamFnApi.ProcessBundleRequest.CacheToken.SideInput.newBuilder().setSideInputId(multimapView.getTagInternal().getId()).setTransformId(transformId).build()).setToken(ByteString.copyFromUtf8((String)"MulitmapSideInputToken")).build());
        BundleProgressHandler progressHandler = BundleProgressHandler.ignored();
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, (StateRequestHandler)stateRequestHandler, progressHandler);){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)"X"));
        }
        bundle = processor.newBundle(outputReceivers, (StateRequestHandler)stateRequestHandler, progressHandler);
        try {
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)"Y"));
        }
        finally {
            if (bundle != null) {
                bundle.close();
            }
        }
        for (Collection windowedValues : outputValues.values()) {
            MatcherAssert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"C")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key1:H")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key1:I")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key1:J")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key2:M")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key2:N")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"key2:O")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"C")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key1:H")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key1:I")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key1:J")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key2:M")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key2:N")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"key2:O"))}));
        }
        Assert.assertEquals((long)4L, (long)stateRequestHandler.receivedRequests.size());
        Assert.assertEquals((Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(0)).getStateKey().getIterableSideInput(), (Object)BeamFnApi.StateKey.IterableSideInput.newBuilder().setSideInputId(iterableView.getTagInternal().getId()).setTransformId(transformId).build());
        Assert.assertEquals((Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(1)).getStateKey().getMultimapKeysSideInput(), (Object)BeamFnApi.StateKey.MultimapKeysSideInput.newBuilder().setSideInputId(multimapView.getTagInternal().getId()).setTransformId(transformId).build());
        Assert.assertEquals((Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(2)).getStateKey().getMultimapSideInput(), (Object)BeamFnApi.StateKey.MultimapSideInput.newBuilder().setSideInputId(multimapView.getTagInternal().getId()).setTransformId(transformId).setKey(RemoteExecutionTest.encode("key1")).build());
        Assert.assertEquals((Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(3)).getStateKey().getMultimapSideInput(), (Object)BeamFnApi.StateKey.MultimapSideInput.newBuilder().setSideInputId(multimapView.getTagInternal().getId()).setTransformId(transformId).setKey(RemoteExecutionTest.encode("key2")).build());
    }

    private static ByteString encode(String value) throws Exception {
        ByteStringOutputStream output = new ByteStringOutputStream();
        StringUtf8Coder.of().encode(value, (OutputStream)output);
        return output.toByteString();
    }

    @Test
    public void testMetrics() throws Exception {
        this.launchSdkHarness(PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=state_sampling_period_millis=10"}).create());
        final MetricsDoFn metricsDoFn = new MetricsDoFn();
        Pipeline p = Pipeline.create();
        PCollection input = ((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)metricsDoFn))).setCoder((Coder)StringUtf8Coder.of());
        ParDo.SingleOutput pardo = ParDo.of((DoFn)new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)((String)ctxt.element()));
                ctxt.output((Object)((String)ctxt.element()));
            }
        });
        ((PCollection)input.apply("processA", (PTransform)pardo)).setCoder((Coder)StringUtf8Coder.of());
        ((PCollection)input.apply("processB", (PTransform)pardo)).setCoder((Coder)StringUtf8Coder.of());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> true);
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with side inputs.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        final AtomicReference progressMonitoringInfos = new AtomicReference();
        String testPTransformId = "create-ParMultiDo-Metrics-";
        BundleProgressHandler progressHandler = new BundleProgressHandler(){

            public void onProgress(BeamFnApi.ProcessBundleProgressResponse response) {
                progressMonitoringInfos.set(response.getMonitoringInfosList());
                ((CountDownLatch)MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid)).countDown();
                ArrayList<Object> matchers = new ArrayList<Object>();
                SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserCounter");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64SumValue(1L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserCounter");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64SumValue(10L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserCounter");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                matchers.add(Matchers.not((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build())));
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserDistribution");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64DistributionValue(DistributionData.create((long)1L, (long)1L, (long)1L, (long)1L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserDistribution");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64DistributionValue(DistributionData.create((long)10L, (long)1L, (long)10L, (long)10L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserDistribution");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                matchers.add(Matchers.not((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build())));
                MatcherAssert.assertThat((Object)response.getMonitoringInfosList(), (Matcher)Matchers.hasItems((Matcher[])matchers.toArray(new Matcher[0])));
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
                ArrayList<Object> matchers = new ArrayList<Object>();
                SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserCounter");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64SumValue(1L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserCounter");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64SumValue(10L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserCounter");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64SumValue(100L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserDistribution");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64DistributionValue(DistributionData.create((long)1L, (long)1L, (long)1L, (long)1L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserDistribution");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64DistributionValue(DistributionData.create((long)10L, (long)1L, (long)10L, (long)10L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserDistribution");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                builder.setInt64DistributionValue(DistributionData.create((long)100L, (long)1L, (long)100L, (long)100L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel("PCOLLECTION", "impulse.out");
                builder.setInt64SumValue(1L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel("PCOLLECTION", "create/ParMultiDo(Metrics).output");
                builder.setInt64SumValue(3L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel("PCOLLECTION", "processA/ParMultiDo(Anonymous).output");
                builder.setInt64SumValue(6L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel("PCOLLECTION", "processB/ParMultiDo(Anonymous).output");
                builder.setInt64SumValue(6L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn("beam:metric:pardo_execution_time:start_bundle_msecs:v1");
                builder.setType("beam:metrics:sum_int64:v1");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                matchers.add(Matchers.allOf((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()), (Matcher)MonitoringInfoMatchers.counterValueGreaterThanOrEqualTo((long)1L)));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn("beam:metric:pardo_execution_time:process_bundle_msecs:v1");
                builder.setType("beam:metrics:sum_int64:v1");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                matchers.add(Matchers.allOf((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()), (Matcher)MonitoringInfoMatchers.counterValueGreaterThanOrEqualTo((long)1L)));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn("beam:metric:pardo_execution_time:finish_bundle_msecs:v1");
                builder.setType("beam:metrics:sum_int64:v1");
                builder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                matchers.add(Matchers.allOf((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()), (Matcher)MonitoringInfoMatchers.counterValueGreaterThanOrEqualTo((long)1L)));
                List oldMonitoringInfos = (List)progressMonitoringInfos.get();
                if (oldMonitoringInfos == null) {
                    throw new IllegalStateException("Progress request did not complete before timeout allowing for bundle to complete.");
                }
                List mergedMonitoringInfos = RemoteExecutionTest.mergeMonitoringInfos(oldMonitoringInfos, response.getMonitoringInfosList());
                MatcherAssert.assertThat((Object)mergedMonitoringInfos, (Matcher)Matchers.hasItems((Matcher[])matchers.toArray(new Matcher[0])));
            }
        };
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, StateRequestHandler.unsupported(), progressHandler);){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"X")));
            executor.submit(() -> RemoteExecutionTest.lambda$testMetrics$4(metricsDoFn, (RemoteBundle)bundle));
        }
        executor.shutdown();
    }

    private static List<MetricsApi.MonitoringInfo> mergeMonitoringInfos(List<MetricsApi.MonitoringInfo> oldMonitoringInfos, List<MetricsApi.MonitoringInfo> newMonitoringInfos) {
        HashMap<MetricsApi.MonitoringInfo, MetricsApi.MonitoringInfo> miKeyToMiWithPayload = new HashMap<MetricsApi.MonitoringInfo, MetricsApi.MonitoringInfo>();
        for (MetricsApi.MonitoringInfo monitoringInfo : oldMonitoringInfos) {
            miKeyToMiWithPayload.put(monitoringInfo.toBuilder().clearPayload().build(), monitoringInfo);
        }
        for (MetricsApi.MonitoringInfo monitoringInfo : newMonitoringInfos) {
            miKeyToMiWithPayload.put(monitoringInfo.toBuilder().clearPayload().build(), monitoringInfo);
        }
        return new ArrayList<MetricsApi.MonitoringInfo>(miKeyToMiWithPayload.values());
    }

    @Test
    public void testExecutionWithUserState() throws Exception {
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        String stateId = "foo";
        String stateId2 = "foo2";
        ((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
            }
        }))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("userState", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, String>, KV<String, String>>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag((Coder)StringUtf8Coder.of());
            @DoFn.StateId(value="foo2")
            private final StateSpec<BagState<String>> bufferState2 = StateSpecs.bag((Coder)StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<String, String> element, @DoFn.StateId(value="foo") BagState<String> state, @DoFn.StateId(value="foo2") BagState<String> state2, DoFn.OutputReceiver<KV<String, String>> r) {
                for (String value : state.read()) {
                    r.output((Object)KV.of((Object)((String)element.getKey()), (Object)value));
                }
                state.add((Object)((String)element.getValue()));
                state2.clear();
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getUserStates().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with user state.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        ImmutableMap userStateData = ImmutableMap.of((Object)"foo", new ArrayList<ByteString>(Arrays.asList(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"A", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"B", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"C", (Coder.Context)Coder.Context.NESTED)))), (Object)"foo2", new ArrayList<ByteString>(Arrays.asList(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"D", (Coder.Context)Coder.Context.NESTED)))));
        StateRequestHandler stateRequestHandler = StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)descriptor, (StateRequestHandlers.BagUserStateHandlerFactory)new StateRequestHandlers.BagUserStateHandlerFactory<ByteString, Object, BoundedWindow>((Map)userStateData){
            final /* synthetic */ Map val$userStateData;
            {
                this.val$userStateData = map;
            }

            public StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow> forUserState(String pTransformId, final String userStateId, Coder<ByteString> keyCoder, Coder<Object> valueCoder, Coder<BoundedWindow> windowCoder) {
                return new StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow>(){

                    public Iterable<Object> get(ByteString key, BoundedWindow window) {
                        return (Iterable)val$userStateData.get(userStateId);
                    }

                    public void append(ByteString key, BoundedWindow window, Iterator<Object> values) {
                        Iterators.addAll((Collection)((Collection)val$userStateData.get(userStateId)), values);
                    }

                    public void clear(ByteString key, BoundedWindow window) {
                        ((List)val$userStateData.get(userStateId)).clear();
                    }
                };
            }
        });
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, stateRequestHandler, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"Y")));
        }
        for (Collection windowedValues : outputValues.values()) {
            MatcherAssert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"C"))}));
        }
        MatcherAssert.assertThat((Object)((List)userStateData.get("foo")), (Matcher)IsIterableContainingInOrder.contains((Object[])new ByteString[]{ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"A", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"B", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"C", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"Y", (Coder.Context)Coder.Context.NESTED))}));
        MatcherAssert.assertThat((Object)((List)userStateData.get("foo2")), (Matcher)IsEmptyIterable.emptyIterable());
    }

    @Test
    public void testExecutionWithUserStateCaching() throws Exception {
        Pipeline p = Pipeline.create();
        this.launchSdkHarness(p.getOptions());
        String stateId = "foo";
        String stateId2 = "bar";
        ((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
            }
        }))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("userState", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, String>, KV<String, String>>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag((Coder)StringUtf8Coder.of());
            @DoFn.StateId(value="bar")
            private final StateSpec<BagState<String>> bufferState2 = StateSpecs.bag((Coder)StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<String, String> element, @DoFn.StateId(value="foo") BagState<String> state, @DoFn.StateId(value="bar") BagState<String> state2, DoFn.OutputReceiver<KV<String, String>> r) {
                for (String value : state.read()) {
                    r.output((Object)KV.of((Object)((String)element.getKey()), (Object)value));
                }
                ReadableState isEmpty = state2.isEmpty();
                if (((Boolean)isEmpty.read()).booleanValue()) {
                    r.output((Object)KV.of((Object)((String)element.getKey()), (Object)"Empty"));
                } else {
                    state2.clear();
                }
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getUserStates().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with user state.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        ImmutableMap userStateData = ImmutableMap.of((Object)"foo", new ArrayList<ByteString>(Arrays.asList(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"A", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"B", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"C", (Coder.Context)Coder.Context.NESTED)))), (Object)"bar", new ArrayList<ByteString>(Arrays.asList(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"D", (Coder.Context)Coder.Context.NESTED)))));
        StoringStateRequestHandler stateRequestHandler = new StoringStateRequestHandler(StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)descriptor, (StateRequestHandlers.BagUserStateHandlerFactory)new StateRequestHandlers.BagUserStateHandlerFactory<ByteString, Object, BoundedWindow>((Map)userStateData){
            final /* synthetic */ Map val$userStateData;
            {
                this.val$userStateData = map;
            }

            public StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow> forUserState(String pTransformId, final String userStateId, Coder<ByteString> keyCoder, Coder<Object> valueCoder, Coder<BoundedWindow> windowCoder) {
                return new StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow>(){

                    public Iterable<Object> get(ByteString key, BoundedWindow window) {
                        return (Iterable)val$userStateData.get(userStateId);
                    }

                    public void append(ByteString key, BoundedWindow window, Iterator<Object> values) {
                        Iterators.addAll((Collection)((Collection)val$userStateData.get(userStateId)), values);
                    }

                    public void clear(ByteString key, BoundedWindow window) {
                        ((List)val$userStateData.get(userStateId)).clear();
                    }
                };
            }
        }));
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, (StateRequestHandler)stateRequestHandler, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"Y")));
        }
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle2 = processor.newBundle(outputReceivers, (StateRequestHandler)stateRequestHandler, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle2.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"Z")));
        }
        for (Collection windowedValues : outputValues.values()) {
            MatcherAssert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"C")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"C")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"Empty"))}));
        }
        MatcherAssert.assertThat((Object)((List)userStateData.get("foo")), (Matcher)IsIterableContainingInOrder.contains((Object[])new ByteString[]{ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"A", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"B", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"C", (Coder.Context)Coder.Context.NESTED))}));
        MatcherAssert.assertThat((Object)((List)userStateData.get("bar")), (Matcher)IsEmptyIterable.emptyIterable());
        Assert.assertEquals((long)3L, (long)stateRequestHandler.getRequestCount());
        ByteStringOutputStream out = new ByteStringOutputStream();
        StringUtf8Coder.of().encode("X", (OutputStream)out);
        Assert.assertEquals((Object)"foo", (Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(0)).getStateKey().getBagUserState().getUserStateId());
        Assert.assertEquals((Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(0)).getStateKey().getBagUserState().getKey(), (Object)out.toByteString());
        Assert.assertTrue((boolean)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(0)).hasGet());
        Assert.assertEquals((Object)"bar", (Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(1)).getStateKey().getBagUserState().getUserStateId());
        Assert.assertEquals((Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(1)).getStateKey().getBagUserState().getKey(), (Object)out.toByteString());
        Assert.assertTrue((boolean)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(1)).hasGet());
        Assert.assertEquals((Object)"bar", (Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(2)).getStateKey().getBagUserState().getUserStateId());
        Assert.assertEquals((Object)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(2)).getStateKey().getBagUserState().getKey(), (Object)out.toByteString());
        Assert.assertTrue((boolean)((BeamFnApi.StateRequest)stateRequestHandler.receivedRequests.get(2)).hasClear());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    @Test
    public void testExecutionWithTimer() throws Exception {
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        ((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
            }
        }))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("timer", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, String>, KV<String, String>>(){
            @DoFn.TimerId(value="event")
            private final TimerSpec eventTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.TimerId(value="processing")
            private final TimerSpec processingTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer) {
                context.output((Object)KV.of((Object)("main" + (String)((KV)context.element()).getKey()), (Object)""));
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus((ReadableDuration)Duration.millis((long)1L)));
                processingTimeTimer.offset(Duration.millis((long)2L));
                processingTimeTimer.setRelative();
            }

            @DoFn.OnTimer(value="event")
            public void eventTimer(DoFn.OnTimerContext context, @DoFn.Key String key, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer) {
                context.output((Object)KV.of((Object)"event", (Object)key));
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.fireTimestamp().plus((ReadableDuration)Duration.millis((long)11L)));
                processingTimeTimer.offset(Duration.millis((long)12L));
                processingTimeTimer.setRelative();
            }

            @DoFn.OnTimer(value="processing")
            public void processingTimer(DoFn.OnTimerContext context, @DoFn.Key String key, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer) {
                context.output((Object)KV.of((Object)"processing", (Object)key));
                eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.fireTimestamp().plus((ReadableDuration)Duration.millis((long)21L)));
                processingTimeTimer.offset(Duration.millis((long)22L));
                processingTimeTimer.setRelative();
            }

            @DoFn.OnWindowExpiration
            public void onWindowExpiration(@DoFn.Key String key, DoFn.OutputReceiver<KV<String, String>> outputReceiver) {
                outputReceiver.output((Object)KV.of((Object)"onWindowExpiration", (Object)key));
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getTimers().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with timers.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator, descriptor.getTimerSpecs());
        HashMap<String, Object> outputValues = new HashMap<String, Object>();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : descriptor.getRemoteOutputCoders().entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        HashMap timerValues = new HashMap();
        HashMap<KV, RemoteOutputReceiver> timerReceivers = new HashMap<KV, RemoteOutputReceiver>();
        for (Map.Entry entry : descriptor.getTimerSpecs().entrySet()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : ((Map)entry.getValue()).values()) {
                KV key = KV.of((Object)timerSpec.transformId(), (Object)timerSpec.timerId());
                List outputContents = Collections.synchronizedList(new ArrayList());
                timerValues.put(key, outputContents);
                timerReceivers.put(key, RemoteOutputReceiver.of((Coder)timerSpec.coder(), outputContents::add));
            }
        }
        ProcessBundleDescriptors.TimerSpec eventTimerSpec = null;
        Object var13_14 = null;
        ProcessBundleDescriptors.TimerSpec onWindowExpirationSpec = null;
        for (Map timerSpecs : descriptor.getTimerSpecs().values()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : timerSpecs.values()) {
                if ("onWindowExpiration0".equals(timerSpec.timerId())) {
                    onWindowExpirationSpec = timerSpec;
                    continue;
                }
                if (TimeDomain.EVENT_TIME.equals((Object)timerSpec.getTimerSpec().getTimeDomain())) {
                    eventTimerSpec = timerSpec;
                    continue;
                }
                if (TimeDomain.PROCESSING_TIME.equals((Object)timerSpec.getTimerSpec().getTimeDomain())) {
                    ProcessBundleDescriptors.TimerSpec timerSpec2 = timerSpec;
                    continue;
                }
                Assert.fail((String)String.format("Unknown timer specification %s", timerSpec));
            }
        }
        DateTimeUtils.setCurrentMillisFixed((long)(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis() + 10000L));
        try {
            void var13_15;
            try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, timerReceivers, StateRequestHandler.unsupported(), BundleProgressHandler.ignored(), null, null);){
                ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"X")));
                ((FnDataReceiver)bundle.getTimerReceivers().get(KV.of((Object)eventTimerSpec.transformId(), (Object)eventTimerSpec.timerId()))).accept(this.timerForTest("Y", 1000L, 100L));
                ((FnDataReceiver)bundle.getTimerReceivers().get(KV.of((Object)var13_15.transformId(), (Object)var13_15.timerId()))).accept(this.timerForTest("Z", 2000L, 200L));
                ((FnDataReceiver)bundle.getTimerReceivers().get(KV.of((Object)onWindowExpirationSpec.transformId(), (Object)onWindowExpirationSpec.timerId()))).accept(this.timerForTest("key", 5001L, 5000L));
            }
            String mainOutputTransform = (String)Iterables.getOnlyElement(descriptor.getRemoteOutputCoders().keySet());
            MatcherAssert.assertThat((Object)((Collection)outputValues.get(mainOutputTransform)), (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"mainX", (Object)"")), WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"event", (Object)"Y"), (Instant)BoundedWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)100L))), WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"processing", (Object)"Z"), (Instant)BoundedWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)200L))), WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"onWindowExpiration", (Object)"key"), (Instant)BoundedWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)5000L)))}));
            MatcherAssert.assertThat((Object)((Collection)timerValues.get(KV.of((Object)eventTimerSpec.transformId(), (Object)eventTimerSpec.timerId()))), (Matcher)Matchers.containsInAnyOrder((Object[])new Timer[]{this.timerForTest("X", 1L, 0L), this.timerForTest("Y", 1011L, 100L), this.timerForTest("Z", 2021L, 200L)}));
            MatcherAssert.assertThat((Object)((Collection)timerValues.get(KV.of((Object)var13_15.transformId(), (Object)var13_15.timerId()))), (Matcher)Matchers.containsInAnyOrder((Object[])new Timer[]{this.timerForTest("X", 10002L, 0L), this.timerForTest("Y", 10012L, 100L), this.timerForTest("Z", 10022L, 200L)}));
        }
        finally {
            DateTimeUtils.setCurrentMillisSystem();
        }
    }

    @Test
    public void testExecutionWithMultipleStages() throws Exception {
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        Function<String, PCollection> pCollectionGenerator = suffix -> (PCollection)((PCollection)((PCollection)p.apply("impulse" + suffix, (PTransform)Impulse.create())).apply("create" + suffix, (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                try {
                    c.output((Object)((String)CoderUtils.decodeFromByteArray((Coder)StringUtf8Coder.of(), (byte[])((byte[])c.element()))));
                }
                catch (CoderException e) {
                    throw new RuntimeException(e);
                }
            }
        }))).setCoder((Coder)StringUtf8Coder.of()).apply((PTransform)ParDo.of((DoFn)new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                c.output((Object)("stream" + suffix + (String)c.element()));
            }
        }));
        PCollection input1 = pCollectionGenerator.apply("1");
        PCollection input2 = pCollectionGenerator.apply("2");
        PCollection outputMerged = (PCollection)PCollectionList.of((PCollection)input1).and(input2).apply((PTransform)Flatten.pCollections());
        ((PCollection)outputMerged.apply("createKV", (PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                c.output((Object)KV.of((Object)((String)c.element()), (Object)""));
            }
        }))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Set stages = fused.getFusedStages();
        MatcherAssert.assertThat((Object)stages.size(), (Matcher)Matchers.equalTo((Object)2));
        List outputValues = Collections.synchronizedList(new ArrayList());
        for (ExecutableStage stage : stages) {
            ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)stage.toString(), (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
            SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
            Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
            HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
            for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
                outputReceivers.putIfAbsent((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputValues::add));
            }
            SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, StateRequestHandler.unsupported(), BundleProgressHandler.ignored());
            try {
                ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"X")));
            }
            finally {
                if (bundle == null) continue;
                bundle.close();
            }
        }
        MatcherAssert.assertThat(outputValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"stream1X", (Object)"")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"stream2X", (Object)""))}));
    }

    @Test(timeout=60000L)
    public void testSplit() throws Exception {
        ScheduledFuture<?> future;
        this.launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline p = Pipeline.create();
        ((PCollection)((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)"zero");
                ctxt.output((Object)"WaitTillSplit");
                ctxt.output((Object)"two");
            }
        }))).apply("forceSplit", (PTransform)ParDo.of((DoFn)new DoFn<String, String>(){

            @DoFn.GetInitialRestriction
            public String getInitialRestriction(@DoFn.Element String element) {
                return element;
            }

            @DoFn.NewTracker
            public WaitingTillSplitRestrictionTracker newTracker(@DoFn.Restriction String restriction) {
                return new WaitingTillSplitRestrictionTracker(restriction);
            }

            @DoFn.ProcessElement
            public void process(RestrictionTracker<String, Void> tracker, DoFn.ProcessContext context) {
                while (tracker.tryClaim(null)) {
                }
                context.output((Object)((String)tracker.currentRestriction()));
            }
        }))).apply("addKeys", (PTransform)WithKeys.of((Object)"foo"))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipeline = PipelineTranslation.toProto((Pipeline)p);
        RunnerApi.Pipeline pipelineWithSdfExpanded = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)pipeline, (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineWithSdfExpanded);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> Iterables.filter((Iterable)stage.getTransforms(), node -> "beam:transform:sdf_process_sized_element_and_restrictions:v1".equals(node.getTransform().getSpec().getUrn())).iterator().hasNext());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with SDF ProcessSizedElementAndRestriction.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"my_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        ArrayList splitResponses = new ArrayList();
        ArrayList checkpointResponses = new ArrayList();
        ArrayList requestsFinalization = new ArrayList();
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        try (SdkHarnessClient.BundleProcessor.ActiveBundle bundle = processor.newBundle(outputReceivers, Collections.emptyMap(), StateRequestHandler.unsupported(), BundleProgressHandler.ignored(), splitResponses::add, checkpointResponses::add, requestsFinalization::add);){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow(this.sdfSizedElementAndRestrictionForTest("WaitTillSplit")));
            future = executor.scheduleWithFixedDelay(() -> RemoteExecutionTest.lambda$testSplit$11((RemoteBundle)bundle), 0L, 100L, TimeUnit.MILLISECONDS);
        }
        future.cancel(false);
        executor.shutdown();
        Assert.assertTrue((boolean)requestsFinalization.isEmpty());
        Assert.assertTrue((boolean)checkpointResponses.isEmpty());
        Assert.assertFalse((boolean)splitResponses.isEmpty());
        BeamFnApi.ProcessBundleSplitResponse splitResponse = (BeamFnApi.ProcessBundleSplitResponse)splitResponses.get(splitResponses.size() - 1);
        BeamFnApi.ProcessBundleSplitResponse.ChannelSplit channelSplit = (BeamFnApi.ProcessBundleSplitResponse.ChannelSplit)Iterables.getOnlyElement((Iterable)splitResponse.getChannelSplitsList());
        Assert.assertEquals((long)-1L, (long)channelSplit.getLastPrimaryElement());
        Assert.assertEquals((long)1L, (long)channelSplit.getFirstResidualElement());
        Assert.assertEquals((long)1L, (long)splitResponse.getPrimaryRootsCount());
        Assert.assertEquals((long)1L, (long)splitResponse.getResidualRootsCount());
        MatcherAssert.assertThat((Object)((Collection)Iterables.getOnlyElement(outputValues.values())), (Matcher)Matchers.containsInAnyOrder((Object[])new Object[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"foo", (Object)"Primary"))}));
    }

    private KV<KV<String, KV<String, byte[]>>, Double> sdfSizedElementAndRestrictionForTest(String element) {
        return KV.of((Object)KV.of((Object)element, (Object)KV.of((Object)element, (Object)new byte[0])), (Object)0.0);
    }

    private KV<String, byte[]> byteValueOf(String key, long value) throws CoderException {
        return KV.of((Object)key, (Object)CoderUtils.encodeToByteArray((Coder)BigEndianLongCoder.of(), (Object)value));
    }

    private Timer<String> timerForTest(String key, long fireTimestamp, long holdTimestamp) {
        return Timer.of((Object)key, (String)"", Collections.singletonList(GlobalWindow.INSTANCE), (Instant)BoundedWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)fireTimestamp)), (Instant)BoundedWindow.TIMESTAMP_MIN_VALUE.plus((ReadableDuration)Duration.millis((long)holdTimestamp)), (PaneInfo)PaneInfo.NO_FIRING);
    }

    private static /* synthetic */ void lambda$testSplit$11(RemoteBundle bundle) {
        bundle.split(0.5);
    }

    private static /* synthetic */ Void lambda$testMetrics$4(MetricsDoFn metricsDoFn, RemoteBundle bundle) throws Exception {
        Preconditions.checkState((boolean)((CountDownLatch)MetricsDoFn.AFTER_PROCESS.get(metricsDoFn.uuid)).await(60L, TimeUnit.SECONDS), (Object)"Runner waited too long for DoFn to get to AFTER_PROCESS.");
        bundle.requestProgress();
        return null;
    }

    private static class WaitingTillSplitRestrictionTracker
    extends RestrictionTracker<String, Void> {
        private static final String WAIT_TILL_SPLIT = "WaitTillSplit";
        private static final String PRIMARY = "Primary";
        private static final String RESIDUAL = "Residual";
        private String currentRestriction;

        private WaitingTillSplitRestrictionTracker(String restriction) {
            this.currentRestriction = restriction;
        }

        public boolean tryClaim(Void position) {
            return this.needsSplitting();
        }

        public String currentRestriction() {
            return this.currentRestriction;
        }

        public SplitResult<String> trySplit(double fractionOfRemainder) {
            if (!this.needsSplitting()) {
                return null;
            }
            this.currentRestriction = PRIMARY;
            return SplitResult.of((Object)this.currentRestriction, (Object)RESIDUAL);
        }

        private boolean needsSplitting() {
            return WAIT_TILL_SPLIT.equals(this.currentRestriction);
        }

        public void checkDone() throws IllegalStateException {
            Preconditions.checkState((!this.needsSplitting() ? 1 : 0) != 0, (Object)"Expected for this restriction to have been split.");
        }

        public RestrictionTracker.IsBounded isBounded() {
            return RestrictionTracker.IsBounded.BOUNDED;
        }
    }

    private static class StoringStateRequestHandler
    implements StateRequestHandler {
        private StateRequestHandler stateRequestHandler;
        private ArrayList<BeamFnApi.StateRequest> receivedRequests;
        private ArrayList<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens;

        StoringStateRequestHandler(StateRequestHandler delegate) {
            this.stateRequestHandler = delegate;
            this.receivedRequests = new ArrayList();
            this.cacheTokens = new ArrayList();
        }

        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest request) throws Exception {
            this.receivedRequests.add(request);
            return this.stateRequestHandler.handle(request);
        }

        public Iterable<BeamFnApi.ProcessBundleRequest.CacheToken> getCacheTokens() {
            return Iterables.concat((Iterable)this.stateRequestHandler.getCacheTokens(), this.cacheTokens);
        }

        public int getRequestCount() {
            return this.receivedRequests.size();
        }

        public void addCacheToken(BeamFnApi.ProcessBundleRequest.CacheToken token) {
            this.cacheTokens.add(token);
        }
    }

    private static class MetricsDoFn
    extends DoFn<byte[], String> {
        private static final String PROCESS_USER_COUNTER_NAME = "processUserCounter";
        private static final String START_USER_COUNTER_NAME = "startUserCounter";
        private static final String FINISH_USER_COUNTER_NAME = "finishUserCounter";
        private static final String PROCESS_USER_DISTRIBUTION_NAME = "processUserDistribution";
        private static final String START_USER_DISTRIBUTION_NAME = "startUserDistribution";
        private static final String FINISH_USER_DISTRIBUTION_NAME = "finishUserDistribution";
        private static final ConcurrentMap<String, CountDownLatch> AFTER_PROCESS = new ConcurrentHashMap<String, CountDownLatch>();
        private static final ConcurrentMap<String, CountDownLatch> ALLOW_COMPLETION = new ConcurrentHashMap<String, CountDownLatch>();
        private final String uuid = UUID.randomUUID().toString();

        public MetricsDoFn() {
            AFTER_PROCESS.put(this.uuid, new CountDownLatch(1));
            ALLOW_COMPLETION.put(this.uuid, new CountDownLatch(1));
        }

        @DoFn.StartBundle
        public void startBundle() throws InterruptedException {
            Metrics.counter(RemoteExecutionTest.class, (String)START_USER_COUNTER_NAME).inc(10L);
            Metrics.distribution(RemoteExecutionTest.class, (String)START_USER_DISTRIBUTION_NAME).update(10L);
            Thread.sleep(500L);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctxt) throws InterruptedException {
            ctxt.output((Object)"zero");
            ctxt.output((Object)"one");
            ctxt.output((Object)"two");
            Metrics.counter(RemoteExecutionTest.class, (String)PROCESS_USER_COUNTER_NAME).inc();
            Metrics.distribution(RemoteExecutionTest.class, (String)PROCESS_USER_DISTRIBUTION_NAME).update(1L);
            Thread.sleep(500L);
            ((CountDownLatch)AFTER_PROCESS.get(this.uuid)).countDown();
            Preconditions.checkState((boolean)((CountDownLatch)ALLOW_COMPLETION.get(this.uuid)).await(60L, TimeUnit.SECONDS), (Object)"Failed to wait for DoFn to be allowed to complete.");
        }

        @DoFn.FinishBundle
        public void finishBundle() throws InterruptedException {
            Metrics.counter(RemoteExecutionTest.class, (String)FINISH_USER_COUNTER_NAME).inc(100L);
            Metrics.distribution(RemoteExecutionTest.class, (String)FINISH_USER_DISTRIBUTION_NAME).update(100L);
            Thread.sleep(500L);
        }
    }

    private static class LogCapturer
    implements LogWriter {
        List<BeamFnApi.LogEntry> capturedLogs = Collections.synchronizedList(new ArrayList());

        private LogCapturer() {
        }

        public void log(BeamFnApi.LogEntry entry) {
            this.capturedLogs.add(entry);
        }
    }
}

