/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.AutoValue_ProcessBundleHandler_BundleProcessor;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.TimerEndpoint;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.util.construction.BeamUrns;
import org.apache.beam.sdk.util.construction.Timer;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.MessageOrBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessBundleHandler {
    private static final @UnknownKeyFor @NonNull @Initialized String DATA_INPUT_URN = "beam:runner:source:v1";
    private static final @UnknownKeyFor @NonNull @Initialized String DATA_OUTPUT_URN = "beam:runner:sink:v1";
    public static final @UnknownKeyFor @NonNull @Initialized String JAVA_SOURCE_URN = "beam:source:java:0.1";
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
    private final @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor> fnApiRegistry;
    private final @UnknownKeyFor @NonNull @Initialized BeamFnDataClient beamFnDataClient;
    private final @UnknownKeyFor @NonNull @Initialized BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
    private final @UnknownKeyFor @NonNull @Initialized FinalizeBundleHandler finalizeBundleHandler;
    private final @UnknownKeyFor @NonNull @Initialized ShortIdMap shortIds;
    private final @UnknownKeyFor @NonNull @Initialized boolean runnerAcceptsShortIds;
    private final @UnknownKeyFor @NonNull @Initialized ExecutionStateSampler executionStateSampler;
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
    private final @UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory defaultPTransformRunnerFactory;
    private final @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> processWideCache;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized BundleProcessorCache bundleProcessorCache;
    private final @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilities;
    private final @Nullable @UnknownKeyFor @Initialized DataSampler dataSampler;

    public ProcessBundleHandler(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilities, @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor> fnApiRegistry, @UnknownKeyFor @NonNull @Initialized BeamFnDataClient beamFnDataClient, @UnknownKeyFor @NonNull @Initialized BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, @UnknownKeyFor @NonNull @Initialized FinalizeBundleHandler finalizeBundleHandler, @UnknownKeyFor @NonNull @Initialized ShortIdMap shortIds, @UnknownKeyFor @NonNull @Initialized ExecutionStateSampler executionStateSampler, @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> processWideCache, @Nullable @UnknownKeyFor @Initialized DataSampler dataSampler) {
        this(options, runnerCapabilities, fnApiRegistry, beamFnDataClient, beamFnStateGrpcClientCache, finalizeBundleHandler, shortIds, executionStateSampler, REGISTERED_RUNNER_FACTORIES, processWideCache, new BundleProcessorCache(), dataSampler);
    }

    @VisibleForTesting
    ProcessBundleHandler(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilities, @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor> fnApiRegistry, @UnknownKeyFor @NonNull @Initialized BeamFnDataClient beamFnDataClient, @UnknownKeyFor @NonNull @Initialized BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, @UnknownKeyFor @NonNull @Initialized FinalizeBundleHandler finalizeBundleHandler, @UnknownKeyFor @NonNull @Initialized ShortIdMap shortIds, @UnknownKeyFor @NonNull @Initialized ExecutionStateSampler executionStateSampler, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory> urnToPTransformRunnerFactoryMap, @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> processWideCache, @UnknownKeyFor @NonNull @Initialized BundleProcessorCache bundleProcessorCache, @Nullable @UnknownKeyFor @Initialized DataSampler dataSampler) {
        this.options = options;
        this.fnApiRegistry = fnApiRegistry;
        this.beamFnDataClient = beamFnDataClient;
        this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
        this.finalizeBundleHandler = finalizeBundleHandler;
        this.shortIds = shortIds;
        this.runnerCapabilities = runnerCapabilities;
        this.runnerAcceptsShortIds = runnerCapabilities.contains(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardRunnerProtocols.Enum.MONITORING_INFO_SHORT_IDS));
        this.executionStateSampler = executionStateSampler;
        this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
        this.defaultPTransformRunnerFactory = new UnknownPTransformRunnerFactory(urnToPTransformRunnerFactoryMap.keySet());
        this.processWideCache = processWideCache;
        this.bundleProcessorCache = bundleProcessorCache;
        this.dataSampler = dataSampler;
    }

    private void createRunnerAndConsumersForPTransformRecursively(final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, final @UnknownKeyFor @NonNull @Initialized BeamFnDataClient queueingClient, final @UnknownKeyFor @NonNull @Initialized String pTransformId, final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.PTransform pTransform, final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> processBundleInstructionId, final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens, final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> bundleCache, final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, @UnknownKeyFor @NonNull @Initialized SetMultimap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> pCollectionIdsToConsumingPTransforms, final @UnknownKeyFor @NonNull @Initialized PCollectionConsumerRegistry pCollectionConsumerRegistry, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> processedPTransformIds, final @UnknownKeyFor @NonNull @Initialized PTransformFunctionRegistry startFunctionRegistry, final @UnknownKeyFor @NonNull @Initialized PTransformFunctionRegistry finishFunctionRegistry, final @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized ThrowingRunnable> addResetFunction, final @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized ThrowingRunnable> addTearDownFunction, final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized BiConsumer<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized DataEndpoint<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> addDataEndpoint, final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized TimerEndpoint<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> addTimerEndpoint, final @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized BundleProgressReporter> addBundleProgressReporter, final @UnknownKeyFor @NonNull @Initialized BundleSplitListener splitListener, final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer bundleFinalizer, @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized BeamFnDataReadRunner> channelRoots, final @UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized BeamFnDataOutboundAggregator> outboundAggregatorMap, final @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilities) throws @UnknownKeyFor @NonNull @Initialized IOException {
        for (String pCollectionId : pTransform.getOutputsMap().values()) {
            for (String consumingPTransformId : pCollectionIdsToConsumingPTransforms.get((Object)pCollectionId)) {
                this.createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, queueingClient, consumingPTransformId, (RunnerApi.PTransform)processBundleDescriptor.getTransformsMap().get(consumingPTransformId), processBundleInstructionId, cacheTokens, bundleCache, processBundleDescriptor, pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, startFunctionRegistry, finishFunctionRegistry, addResetFunction, addTearDownFunction, addDataEndpoint, addTimerEndpoint, addBundleProgressReporter, splitListener, bundleFinalizer, channelRoots, outboundAggregatorMap, runnerCapabilities);
            }
        }
        if (!pTransform.hasSpec()) {
            throw new IllegalArgumentException(String.format("Cannot process transform with no spec: %s", TextFormat.printer().printToString((MessageOrBuilder)pTransform)));
        }
        if (pTransform.getSubtransformsCount() > 0) {
            throw new IllegalArgumentException(String.format("Cannot process composite transform: %s", TextFormat.printer().printToString((MessageOrBuilder)pTransform)));
        }
        if (!processedPTransformIds.contains(pTransformId)) {
            Object runner = this.urnToPTransformRunnerFactoryMap.getOrDefault(pTransform.getSpec().getUrn(), this.defaultPTransformRunnerFactory).createRunnerForPTransform(new PTransformRunnerFactory.Context(){

                @Override
                public @UnknownKeyFor @NonNull @Initialized PipelineOptions getPipelineOptions() {
                    return ProcessBundleHandler.this.options;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized ShortIdMap getShortIdMap() {
                    return ProcessBundleHandler.this.shortIds;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized BeamFnDataClient getBeamFnDataClient() {
                    return queueingClient;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized BeamFnStateClient getBeamFnStateClient() {
                    return beamFnStateClient;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized String getPTransformId() {
                    return pTransformId;
                }

                @Override
                public // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized RunnerApi.PTransform getPTransform() {
                    return pTransform;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> getProcessBundleInstructionIdSupplier() {
                    return processBundleInstructionId;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest.CacheToken>> getCacheTokensSupplier() {
                    return cacheTokens;
                }

                @Override
                public /*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getBundleCacheSupplier() {
                    return bundleCache;
                }

                @Override
                public /*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> getProcessWideCache() {
                    return ProcessBundleHandler.this.processWideCache;
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized RunnerApi.PCollection> getPCollections() {
                    return processBundleDescriptor.getPcollectionsMap();
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized RunnerApi.Coder> getCoders() {
                    return processBundleDescriptor.getCodersMap();
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized RunnerApi.WindowingStrategy> getWindowingStrategies() {
                    return processBundleDescriptor.getWindowingStrategiesMap();
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> getRunnerCapabilities() {
                    return runnerCapabilities;
                }

                @Override
                public <T> void addPCollectionConsumer(@UnknownKeyFor @NonNull @Initialized String pCollectionId, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>> consumer) {
                    pCollectionConsumerRegistry.register(pCollectionId, pTransformId, pTransform.getUniqueName(), consumer);
                }

                @Override
                public <T> @UnknownKeyFor @NonNull @Initialized FnDataReceiver<T> addOutgoingDataEndpoint(// Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized Coder<T> coder) {
                    BeamFnDataOutboundAggregator aggregator = outboundAggregatorMap.computeIfAbsent(apiServiceDescriptor, arg_0 -> 1.lambda$addOutgoingDataEndpoint$0(queueingClient, (Supplier)processBundleInstructionId, runnerCapabilities, arg_0));
                    return aggregator.registerOutputDataLocation(pTransformId, coder);
                }

                @Override
                public <T> @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized Timer<T>> addOutgoingTimersEndpoint(@UnknownKeyFor @NonNull @Initialized String timerFamilyId, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized Timer<T>> coder) {
                    if (!processBundleDescriptor.hasTimerApiServiceDescriptor()) {
                        throw new IllegalStateException(String.format("Timers are unsupported because the ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", processBundleInstructionId.get()));
                    }
                    BeamFnDataOutboundAggregator aggregator = outboundAggregatorMap.computeIfAbsent(processBundleDescriptor.getTimerApiServiceDescriptor(), arg_0 -> 1.lambda$addOutgoingTimersEndpoint$1(queueingClient, (Supplier)processBundleInstructionId, runnerCapabilities, arg_0));
                    return aggregator.registerOutputTimersLocation(pTransformId, timerFamilyId, coder);
                }

                public /*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> getPCollectionConsumer(@UnknownKeyFor @NonNull @Initialized String pCollectionId) {
                    return pCollectionConsumerRegistry.getMultiplexingConsumer(pCollectionId);
                }

                @Override
                public void addStartBundleFunction(@UnknownKeyFor @NonNull @Initialized ThrowingRunnable startFunction) {
                    startFunctionRegistry.register(pTransformId, pTransform.getUniqueName(), startFunction);
                }

                @Override
                public void addFinishBundleFunction(@UnknownKeyFor @NonNull @Initialized ThrowingRunnable finishFunction) {
                    finishFunctionRegistry.register(pTransformId, pTransform.getUniqueName(), finishFunction);
                }

                @Override
                public <T> void addIncomingDataEndpoint(// Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized Coder<T> coder, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<T> receiver) {
                    addDataEndpoint.accept(apiServiceDescriptor, DataEndpoint.create((String)pTransformId, coder, receiver));
                }

                @Override
                public <T> void addIncomingTimerEndpoint(@UnknownKeyFor @NonNull @Initialized String timerFamilyId, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized Timer<T>> coder, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized Timer<T>> receiver) {
                    addTimerEndpoint.accept(TimerEndpoint.create((String)pTransformId, (String)timerFamilyId, coder, receiver));
                }

                @Override
                public void addResetFunction(@UnknownKeyFor @NonNull @Initialized ThrowingRunnable resetFunction) {
                    addResetFunction.accept(resetFunction);
                }

                @Override
                public void addTearDownFunction(@UnknownKeyFor @NonNull @Initialized ThrowingRunnable tearDownFunction) {
                    addTearDownFunction.accept(tearDownFunction);
                }

                @Override
                public void addBundleProgressReporter(@UnknownKeyFor @NonNull @Initialized BundleProgressReporter bundleProgressReporter) {
                    addBundleProgressReporter.accept(bundleProgressReporter);
                }

                @Override
                public @UnknownKeyFor @NonNull @Initialized BundleSplitListener getSplitListener() {
                    return splitListener;
                }

                @Override
                public // Could not load outer class - annotation placement on inner may be incorrect
                 @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer getBundleFinalizer() {
                    return bundleFinalizer;
                }

                private static /* synthetic */ BeamFnDataOutboundAggregator lambda$addOutgoingTimersEndpoint$1(BeamFnDataClient queueingClient2, Supplier processBundleInstructionId2, Set runnerCapabilities2, Endpoints.ApiServiceDescriptor asd) {
                    return queueingClient2.createOutboundAggregator(asd, processBundleInstructionId2, runnerCapabilities2.contains(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)));
                }

                private static /* synthetic */ BeamFnDataOutboundAggregator lambda$addOutgoingDataEndpoint$0(BeamFnDataClient queueingClient2, Supplier processBundleInstructionId2, Set runnerCapabilities2, Endpoints.ApiServiceDescriptor asd) {
                    return queueingClient2.createOutboundAggregator(asd, processBundleInstructionId2, runnerCapabilities2.contains(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)));
                }
            });
            if (runner instanceof BeamFnDataReadRunner) {
                channelRoots.add((BeamFnDataReadRunner)runner);
            }
            processedPTransformIds.add(pTransformId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder processBundle(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest request) throws @UnknownKeyFor @NonNull @Initialized Exception {
        @Nullable BundleProcessor bundleProcessor = null;
        try {
            bundleProcessor = (BundleProcessor)Preconditions.checkNotNull((Object)this.bundleProcessorCache.get(request, () -> {
                try {
                    return this.createBundleProcessor(request.getProcessBundle().getProcessBundleDescriptorId(), request.getProcessBundle());
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
            PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry();
            PTransformFunctionRegistry finishFunctionRegistry = bundleProcessor.getFinishFunctionRegistry();
            ExecutionStateSampler.ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
            BeamFnApi.ProcessBundleResponse.Builder response = BeamFnApi.ProcessBundleResponse.newBuilder();
            try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient();){
                stateTracker.start(request.getInstructionId());
                try {
                    for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
                        LOG.debug("Starting function {}", (Object)startFunction);
                        startFunction.run();
                    }
                    if (request.getProcessBundle().hasElements()) {
                        boolean inputFinished = bundleProcessor.getInboundObserver().multiplexElements(request.getProcessBundle().getElements());
                        if (!inputFinished) {
                            throw new RuntimeException("Elements embedded in ProcessBundleRequest do not contain stream terminators for all data and timer inputs. Unterminated endpoints: " + bundleProcessor.getInboundObserver().getUnfinishedEndpoints());
                        }
                    } else if (!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
                        BeamFnDataInboundObserver observer = bundleProcessor.getInboundObserver();
                        this.beamFnDataClient.registerReceiver(request.getInstructionId(), bundleProcessor.getInboundEndpointApiServiceDescriptors(), (CloseableFnDataReceiver<BeamFnApi.Elements>)observer);
                        observer.awaitCompletion();
                        this.beamFnDataClient.unregisterReceiver(request.getInstructionId(), bundleProcessor.getInboundEndpointApiServiceDescriptors());
                    }
                    for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctionRegistry.getFunctions())) {
                        LOG.debug("Finishing function {}", (Object)finishFunction);
                        finishFunction.run();
                    }
                    this.embedOutboundElementsIfApplicable(response, bundleProcessor);
                    response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
                    bundleProcessor.getProgressRequestLock().lock();
                    Map<String, ByteString> monitoringData = this.finalMonitoringData(bundleProcessor);
                    if (this.runnerAcceptsShortIds) {
                        response.putAllMonitoringData(monitoringData);
                    } else {
                        for (Map.Entry<String, ByteString> metric : monitoringData.entrySet()) {
                            response.addMonitoringInfos(this.shortIds.get(metric.getKey()).toBuilder().setPayload(metric.getValue()));
                        }
                    }
                    if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
                        this.finalizeBundleHandler.registerCallbacks(bundleProcessor.getInstructionId(), (Collection<FinalizeBundleHandler.CallbackRegistration>)ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
                        response.setRequiresFinalization(true);
                    }
                }
                finally {
                    stateTracker.reset();
                }
            }
            this.bundleProcessorCache.release(request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
        }
        catch (Exception e) {
            LOG.debug("Error processing bundle {} with bundleProcessor for {} after exception: {}", new Object[]{request.getInstructionId(), request.getProcessBundle().getProcessBundleDescriptorId(), e.getMessage()});
            if (bundleProcessor != null) {
                this.bundleProcessorCache.discard(bundleProcessor);
            }
            this.beamFnDataClient.poisonInstructionId(request.getInstructionId());
            throw e;
        }
    }

    private void embedOutboundElementsIfApplicable(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleResponse.Builder response, @UnknownKeyFor @NonNull @Initialized BundleProcessor bundleProcessor) {
        if (bundleProcessor.getOutboundAggregators().isEmpty()) {
            return;
        }
        ArrayList<BeamFnApi.Elements> collectedElements = new ArrayList<BeamFnApi.Elements>(bundleProcessor.getOutboundAggregators().size());
        boolean hasFlushedAggregator = false;
        for (BeamFnDataOutboundAggregator aggregator : bundleProcessor.getOutboundAggregators().values()) {
            BeamFnApi.Elements elements = aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
            if (elements == null) {
                hasFlushedAggregator = true;
            }
            collectedElements.add(elements);
        }
        if (!hasFlushedAggregator) {
            BeamFnApi.Elements.Builder elementsToEmbed = BeamFnApi.Elements.newBuilder();
            for (BeamFnApi.Elements collectedElement : collectedElements) {
                elementsToEmbed.mergeFrom(collectedElement);
            }
            response.setElements(elementsToEmbed.build());
        } else {
            int i = 0;
            for (BeamFnDataOutboundAggregator aggregator : bundleProcessor.getOutboundAggregators().values()) {
                BeamFnApi.Elements elements;
                if ((elements = (BeamFnApi.Elements)collectedElements.get(i++)) == null) continue;
                aggregator.sendElements(elements);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder progress(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest request) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Map<String, ByteString> monitoringData;
        BundleProcessor bundleProcessor = this.bundleProcessorCache.find(request.getProcessBundleProgress().getInstructionId());
        if (bundleProcessor == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
        }
        if (!bundleProcessor.getProgressRequestLock().tryLock()) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
        }
        try {
            if (this.bundleProcessorCache.find(request.getProcessBundleProgress().getInstructionId()) == null) {
                BeamFnApi.InstructionResponse.Builder builder = BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
                return builder;
            }
            monitoringData = this.intermediateMonitoringData(bundleProcessor);
        }
        finally {
            bundleProcessor.getProgressRequestLock().unlock();
        }
        BeamFnApi.ProcessBundleProgressResponse.Builder response = BeamFnApi.ProcessBundleProgressResponse.newBuilder();
        if (this.runnerAcceptsShortIds) {
            response.putAllMonitoringData(monitoringData);
        } else {
            for (Map.Entry<String, ByteString> metric : monitoringData.entrySet()) {
                response.addMonitoringInfos(this.shortIds.get(metric.getKey()).toBuilder().setPayload(metric.getValue()));
            }
        }
        response.setConsumingReceivedData(bundleProcessor.getInboundObserver().isConsumingReceivedData());
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(response);
    }

    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ByteString> intermediateMonitoringData(@UnknownKeyFor @NonNull @Initialized BundleProcessor bundleProcessor) throws @UnknownKeyFor @NonNull @Initialized Exception {
        HashMap<String, ByteString> monitoringData = new HashMap<String, ByteString>();
        monitoringData.putAll(bundleProcessor.getStateTracker().getMetricsContainerRegistry().getMonitoringData(this.shortIds));
        bundleProcessor.getBundleProgressReporterAndRegistrar().updateIntermediateMonitoringData(monitoringData);
        return monitoringData;
    }

    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ByteString> finalMonitoringData(@UnknownKeyFor @NonNull @Initialized BundleProcessor bundleProcessor) throws @UnknownKeyFor @NonNull @Initialized Exception {
        HashMap<String, ByteString> monitoringData = new HashMap<String, ByteString>();
        monitoringData.putAll(bundleProcessor.getStateTracker().getMetricsContainerRegistry().getMonitoringData(this.shortIds));
        bundleProcessor.getBundleProgressReporterAndRegistrar().updateFinalMonitoringData(monitoringData);
        return monitoringData;
    }

    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder trySplit(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest request) {
        BundleProcessor bundleProcessor = this.bundleProcessorCache.find(request.getProcessBundleSplit().getInstructionId());
        BeamFnApi.ProcessBundleSplitResponse.Builder response = BeamFnApi.ProcessBundleSplitResponse.newBuilder();
        if (bundleProcessor == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance());
        }
        for (BeamFnDataReadRunner channelRoot : bundleProcessor.getChannelRoots()) {
            channelRoot.trySplit(request.getProcessBundleSplit(), response);
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(response);
    }

    public void shutdown() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.bundleProcessorCache.shutdown();
    }

    private @UnknownKeyFor @NonNull @Initialized BundleProcessor createBundleProcessor(@UnknownKeyFor @NonNull @Initialized String bundleId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest processBundleRequest) throws @UnknownKeyFor @NonNull @Initialized IOException {
        HandleStateCallsForBundle beamFnStateClient;
        BeamFnApi.ProcessBundleDescriptor bundleDescriptor = this.fnApiRegistry.apply(bundleId);
        HashMultimap pCollectionIdsToConsumingPTransforms = HashMultimap.create();
        BundleProgressReporter.InMemory bundleProgressReporterAndRegistrar = new BundleProgressReporter.InMemory();
        MetricsEnvironmentStateForBundle metricsEnvironmentStateForBundle = new MetricsEnvironmentStateForBundle();
        ExecutionStateSampler.ExecutionStateTracker stateTracker = this.executionStateSampler.create();
        bundleProgressReporterAndRegistrar.register(stateTracker);
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(stateTracker, this.shortIds, bundleProgressReporterAndRegistrar, bundleDescriptor, this.dataSampler);
        HashSet<String> processedPTransformIds = new HashSet<String>();
        PTransformFunctionRegistry startFunctionRegistry = new PTransformFunctionRegistry(this.shortIds, stateTracker, "beam:metric:pardo_execution_time:start_bundle_msecs:v1");
        PTransformFunctionRegistry finishFunctionRegistry = new PTransformFunctionRegistry(this.shortIds, stateTracker, "beam:metric:pardo_execution_time:finish_bundle_msecs:v1");
        ArrayList<ThrowingRunnable> resetFunctions = new ArrayList<ThrowingRunnable>();
        ArrayList<ThrowingRunnable> tearDownFunctions = new ArrayList<ThrowingRunnable>();
        for (Map.Entry entry : bundleDescriptor.getTransformsMap().entrySet()) {
            for (String pCollectionId : ((RunnerApi.PTransform)entry.getValue()).getInputsMap().values()) {
                pCollectionIdsToConsumingPTransforms.put((Object)pCollectionId, (Object)((String)entry.getKey()));
            }
        }
        if (bundleDescriptor.hasStateApiServiceDescriptor()) {
            BeamFnStateClient underlyingClient = this.beamFnStateGrpcClientCache.forApiServiceDescriptor(bundleDescriptor.getStateApiServiceDescriptor());
            beamFnStateClient = new BlockTillStateCallsFinish(underlyingClient);
        } else {
            beamFnStateClient = new FailAllStateCallsForBundle(processBundleRequest);
        }
        BundleSplitListener.InMemory splitListener = BundleSplitListener.InMemory.create();
        final ArrayList<FinalizeBundleHandler.CallbackRegistration> bundleFinalizationCallbackRegistrations = new ArrayList<FinalizeBundleHandler.CallbackRegistration>();
        DoFn.BundleFinalizer bundleFinalizer = new DoFn.BundleFinalizer(){

            public void afterBundleCommit(@UnknownKeyFor @NonNull @Initialized Instant callbackExpiry, // Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer.Callback callback) {
                bundleFinalizationCallbackRegistrations.add(FinalizeBundleHandler.CallbackRegistration.create(callbackExpiry, callback));
            }
        };
        BundleProcessor bundleProcessor = BundleProcessor.create(this.processWideCache, bundleProgressReporterAndRegistrar, bundleDescriptor, startFunctionRegistry, finishFunctionRegistry, resetFunctions, tearDownFunctions, splitListener, pCollectionConsumerRegistry, metricsEnvironmentStateForBundle, stateTracker, beamFnStateClient, bundleFinalizationCallbackRegistrations, this.runnerCapabilities);
        for (Map.Entry entry : bundleDescriptor.getTransformsMap().entrySet()) {
            if (!DATA_INPUT_URN.equals(((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn()) && !DATA_OUTPUT_URN.equals(((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn()) && !JAVA_SOURCE_URN.equals(((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn()) && !"beam:transform:read:v1".equals(((RunnerApi.PTransform)entry.getValue()).getSpec().getUrn())) continue;
            this.createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, this.beamFnDataClient, (String)entry.getKey(), (RunnerApi.PTransform)entry.getValue(), bundleProcessor::getInstructionId, bundleProcessor::getCacheTokens, bundleProcessor::getBundleCache, bundleDescriptor, (SetMultimap<String, String>)pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, startFunctionRegistry, finishFunctionRegistry, resetFunctions::add, tearDownFunctions::add, (apiServiceDescriptor, dataEndpoint) -> {
                if (!bundleProcessor.getInboundEndpointApiServiceDescriptors().contains(apiServiceDescriptor)) {
                    bundleProcessor.getInboundEndpointApiServiceDescriptors().add((Endpoints.ApiServiceDescriptor)apiServiceDescriptor);
                }
                bundleProcessor.getInboundDataEndpoints().add((DataEndpoint<?>)dataEndpoint);
            }, timerEndpoint -> {
                if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
                    throw new IllegalStateException(String.format("Timers are unsupported because the ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", bundleId));
                }
                bundleProcessor.getTimerEndpoints().add((TimerEndpoint<?>)timerEndpoint);
            }, bundleProgressReporterAndRegistrar::register, splitListener, bundleFinalizer, bundleProcessor.getChannelRoots(), bundleProcessor.getOutboundAggregators(), bundleProcessor.getRunnerCapabilities());
        }
        bundleProcessor.finish();
        return bundleProcessor;
    }

    public @UnknownKeyFor @NonNull @Initialized BundleProcessorCache getBundleProcessorCache() {
        return this.bundleProcessorCache;
    }

    static {
        TreeSet pipelineRunnerRegistrars = Sets.newTreeSet((Comparator)ReflectHelpers.ObjectsClassComparator.INSTANCE);
        pipelineRunnerRegistrars.addAll(Lists.newArrayList(ServiceLoader.load(PTransformRunnerFactory.Registrar.class, ReflectHelpers.findClassLoader())));
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (PTransformRunnerFactory.Registrar registrar : pipelineRunnerRegistrars) {
            builder.putAll(registrar.getPTransformRunnerFactories());
        }
        REGISTERED_RUNNER_FACTORIES = builder.build();
    }

    private static class UnknownPTransformRunnerFactory
    implements PTransformRunnerFactory<Object> {
        private final @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> knownUrns;

        private UnknownPTransformRunnerFactory(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> knownUrns) {
            this.knownUrns = knownUrns;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Object createRunnerForPTransform(@UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory.Context context) {
            String message = String.format("No factory registered for %s, known factories %s", context.getPTransform().getSpec().getUrn(), this.knownUrns);
            LOG.error(message);
            throw new IllegalStateException(message);
        }
    }

    static abstract class HandleStateCallsForBundle
    implements AutoCloseable,
    BeamFnStateClient {
        HandleStateCallsForBundle() {
        }
    }

    private static class FailAllStateCallsForBundle
    extends HandleStateCallsForBundle {
        private final // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest request;

        private FailAllStateCallsForBundle(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest request) {
            this.request = request;
        }

        @Override
        public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized CompletableFuture<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse> handle(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest.Builder requestBuilder) {
            throw new IllegalStateException(String.format("State API calls are unsupported because the ProcessBundleRequest %s does not support state.", this.request));
        }
    }

    private static class BlockTillStateCallsFinish
    extends HandleStateCallsForBundle {
        private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
        private final @UnknownKeyFor @NonNull @Initialized Phaser phaser;
        private @UnknownKeyFor @NonNull @Initialized int currentPhase;

        private BlockTillStateCallsFinish(@UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient) {
            this.beamFnStateClient = beamFnStateClient;
            this.phaser = new Phaser(1);
            this.currentPhase = this.phaser.getPhase();
        }

        @Override
        public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
            int unarrivedParties = this.phaser.getUnarrivedParties();
            if (unarrivedParties > 0) {
                LOG.debug("Waiting for {} parties to arrive before closing, current phase {}.", (Object)unarrivedParties, (Object)this.currentPhase);
            }
            this.currentPhase = this.phaser.arriveAndAwaitAdvance();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized CompletableFuture<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse> handle(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest.Builder requestBuilder) {
            CompletableFuture<BeamFnApi.StateResponse> response = this.beamFnStateClient.handle(requestBuilder);
            this.phaser.register();
            response.whenComplete((stateResponse, throwable) -> this.phaser.arriveAndDeregister());
            return response;
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class BundleProcessor {
        private @UnknownKeyFor @NonNull @Initialized String instructionId;
        private @UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens;
        private @UnknownKeyFor @NonNull @Initialized Caches.ClearableCache<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> bundleCache;
        private @UnknownKeyFor @NonNull @Initialized BeamFnDataInboundObserver inboundObserver2;

        public static @UnknownKeyFor @NonNull @Initialized BundleProcessor create(@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> processWideCache, @UnknownKeyFor @NonNull @Initialized BundleProgressReporter.InMemory bundleProgressReporterAndRegistrar, // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, @UnknownKeyFor @NonNull @Initialized PTransformFunctionRegistry startFunctionRegistry, @UnknownKeyFor @NonNull @Initialized PTransformFunctionRegistry finishFunctionRegistry, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized ThrowingRunnable> resetFunctions, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized ThrowingRunnable> tearDownFunctions, @UnknownKeyFor @NonNull @Initialized BundleSplitListener.InMemory splitListener, @UnknownKeyFor @NonNull @Initialized PCollectionConsumerRegistry pCollectionConsumerRegistry, @UnknownKeyFor @NonNull @Initialized MetricsEnvironmentStateForBundle metricsEnvironmentStateForBundle, @UnknownKeyFor @NonNull @Initialized ExecutionStateSampler. @UnknownKeyFor @NonNull @Initialized ExecutionStateTracker stateTracker, @UnknownKeyFor @NonNull @Initialized HandleStateCallsForBundle beamFnStateClient, @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized FinalizeBundleHandler.CallbackRegistration> bundleFinalizationCallbackRegistrations, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilities) {
            return new AutoValue_ProcessBundleHandler_BundleProcessor(processWideCache, bundleProgressReporterAndRegistrar, processBundleDescriptor, startFunctionRegistry, finishFunctionRegistry, resetFunctions, tearDownFunctions, splitListener, pCollectionConsumerRegistry, metricsEnvironmentStateForBundle, stateTracker, beamFnStateClient, new ArrayList<Endpoints.ApiServiceDescriptor>(), new ArrayList(), new ArrayList(), bundleFinalizationCallbackRegistrations, new ArrayList<BeamFnDataReadRunner>(), new LinkedHashMap<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator>(), runnerCapabilities, new ReentrantLock());
        }

        abstract /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> getProcessWideCache();

        abstract @UnknownKeyFor @NonNull @Initialized BundleProgressReporter.InMemory getBundleProgressReporterAndRegistrar();

        abstract // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor();

        abstract @UnknownKeyFor @NonNull @Initialized PTransformFunctionRegistry getStartFunctionRegistry();

        abstract @UnknownKeyFor @NonNull @Initialized PTransformFunctionRegistry getFinishFunctionRegistry();

        abstract @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized ThrowingRunnable> getResetFunctions();

        abstract @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized ThrowingRunnable> getTearDownFunctions();

        abstract @UnknownKeyFor @NonNull @Initialized BundleSplitListener.InMemory getSplitListener();

        abstract @UnknownKeyFor @NonNull @Initialized PCollectionConsumerRegistry getpCollectionConsumerRegistry();

        abstract @UnknownKeyFor @NonNull @Initialized MetricsEnvironmentStateForBundle getMetricsEnvironmentStateForBundle();

        public abstract @UnknownKeyFor @NonNull @Initialized ExecutionStateSampler. @UnknownKeyFor @NonNull @Initialized ExecutionStateTracker getStateTracker();

        abstract @UnknownKeyFor @NonNull @Initialized HandleStateCallsForBundle getBeamFnStateClient();

        abstract @UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors();

        abstract /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized DataEndpoint<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getInboundDataEndpoints();

        abstract /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TimerEndpoint<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getTimerEndpoints();

        abstract @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized FinalizeBundleHandler.CallbackRegistration> getBundleFinalizationCallbackRegistrations();

        abstract @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized BeamFnDataReadRunner> getChannelRoots();

        abstract @UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized BeamFnDataOutboundAggregator> getOutboundAggregators();

        abstract @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> getRunnerCapabilities();

        abstract @UnknownKeyFor @NonNull @Initialized Lock getProgressRequestLock();

        synchronized @UnknownKeyFor @NonNull @Initialized String getInstructionId() {
            return this.instructionId;
        }

        synchronized @UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest.CacheToken> getCacheTokens() {
            return this.cacheTokens;
        }

        synchronized @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> getBundleCache() {
            if (this.bundleCache == null) {
                this.bundleCache = new Caches.ClearableCache(Caches.subCache(this.getProcessWideCache(), "Bundle", this.instructionId));
            }
            return this.bundleCache;
        }

        @UnknownKeyFor @NonNull @Initialized BeamFnDataInboundObserver getInboundObserver() {
            return this.inboundObserver2;
        }

        void finish() {
            this.inboundObserver2 = BeamFnDataInboundObserver.forConsumers(this.getInboundDataEndpoints(), this.getTimerEndpoints());
            for (BeamFnDataOutboundAggregator aggregator : this.getOutboundAggregators().values()) {
                aggregator.start();
            }
        }

        synchronized void setupForProcessBundleRequest(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest request) {
            this.instructionId = request.getInstructionId();
            this.cacheTokens = request.getProcessBundle().getCacheTokensList();
            this.getMetricsEnvironmentStateForBundle().start(this.getStateTracker().getMetricsContainer());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void reset() throws @UnknownKeyFor @NonNull @Initialized Exception {
            BundleProcessor bundleProcessor = this;
            synchronized (bundleProcessor) {
                this.instructionId = null;
                this.cacheTokens = null;
                if (this.bundleCache != null) {
                    this.bundleCache.clear();
                    this.bundleCache = null;
                }
            }
            this.getSplitListener().clear();
            this.getMetricsEnvironmentStateForBundle().reset();
            this.getStateTracker().reset();
            this.getBundleFinalizationCallbackRegistrations().clear();
            for (ThrowingRunnable resetFunction : this.getResetFunctions()) {
                resetFunction.run();
            }
            this.getInboundObserver().reset();
            this.getBundleProgressReporterAndRegistrar().reset();
            this.getProgressRequestLock().unlock();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void discard() {
            BundleProcessor bundleProcessor = this;
            synchronized (bundleProcessor) {
                this.instructionId = null;
                this.cacheTokens = null;
                if (this.bundleCache != null) {
                    this.bundleCache.clear();
                }
                for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) {
                    try {
                        teardownFunction.run();
                    }
                    catch (Throwable e) {
                        LOG.warn("Exceptions are thrown from DoFn.teardown method when trying to discard ProcessBundleHandler", e);
                    }
                }
                this.getMetricsEnvironmentStateForBundle().discard();
                for (BeamFnDataOutboundAggregator aggregator : this.getOutboundAggregators().values()) {
                    aggregator.discard();
                }
            }
        }

        void shutdown() {
            for (ThrowingRunnable tearDownFunction : this.getTearDownFunctions()) {
                LOG.debug("Tearing down function {}", (Object)tearDownFunction);
                try {
                    tearDownFunction.run();
                }
                catch (Exception e) {
                    LOG.error("Exceptions are thrown from DoFn.teardown method. Note that it will not fail the pipeline execution,", (Throwable)e);
                }
            }
        }
    }

    public static class BundleProcessorCache {
        private final @UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ConcurrentLinkedQueue<@UnknownKeyFor @NonNull @Initialized BundleProcessor>> cachedBundleProcessors = CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(1L)).removalListener(removalNotification -> ((ConcurrentLinkedQueue)removalNotification.getValue()).forEach(bundleProcessor -> bundleProcessor.shutdown())).build((CacheLoader)new CacheLoader<String, ConcurrentLinkedQueue<BundleProcessor>>(){

            public @UnknownKeyFor @NonNull @Initialized ConcurrentLinkedQueue<@UnknownKeyFor @NonNull @Initialized BundleProcessor> load(@UnknownKeyFor @NonNull @Initialized String s) throws @UnknownKeyFor @NonNull @Initialized Exception {
                return new ConcurrentLinkedQueue<BundleProcessor>();
            }
        });
        private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized BundleProcessor> activeBundleProcessors = Collections.synchronizedMap(new WeakHashMap());

        @Pure
        public @UnknownKeyFor @NonNull @Initialized int hashCode() {
            return super.hashCode();
        }

        BundleProcessorCache() {
        }

        @VisibleForTesting
        @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ConcurrentLinkedQueue<@UnknownKeyFor @NonNull @Initialized BundleProcessor>> getCachedBundleProcessors() {
            return ImmutableMap.copyOf((Map)this.cachedBundleProcessors.asMap());
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized BundleProcessor> getActiveBundleProcessors() {
            return ImmutableMap.copyOf(this.activeBundleProcessors);
        }

        @UnknownKeyFor @NonNull @Initialized BundleProcessor get(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest processBundleRequest, @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized BundleProcessor> bundleProcessorSupplier) {
            ConcurrentLinkedQueue bundleProcessors = (ConcurrentLinkedQueue)this.cachedBundleProcessors.getUnchecked((Object)processBundleRequest.getProcessBundle().getProcessBundleDescriptorId());
            BundleProcessor bundleProcessor = (BundleProcessor)bundleProcessors.poll();
            if (bundleProcessor == null) {
                bundleProcessor = bundleProcessorSupplier.get();
            }
            bundleProcessor.setupForProcessBundleRequest(processBundleRequest);
            this.activeBundleProcessors.put(processBundleRequest.getInstructionId(), bundleProcessor);
            return bundleProcessor;
        }

        public @UnknownKeyFor @NonNull @Initialized BundleProcessor find(@UnknownKeyFor @NonNull @Initialized String instructionId) {
            return this.activeBundleProcessors.get(instructionId);
        }

        void release(@UnknownKeyFor @NonNull @Initialized String bundleDescriptorId, @UnknownKeyFor @NonNull @Initialized BundleProcessor bundleProcessor) {
            this.activeBundleProcessors.remove(bundleProcessor.getInstructionId());
            try {
                bundleProcessor.reset();
                ((ConcurrentLinkedQueue)this.cachedBundleProcessors.get((Object)bundleDescriptorId)).add(bundleProcessor);
            }
            catch (Exception e) {
                LOG.warn("Was unable to reset bundle processor safely. Bundle processor will be discarded and re-instantiated on next bundle for descriptor {}.", (Object)bundleDescriptorId, (Object)e);
            }
        }

        void discard(@UnknownKeyFor @NonNull @Initialized BundleProcessor bundleProcessor) {
            bundleProcessor.discard();
            this.activeBundleProcessors.remove(bundleProcessor.getInstructionId());
        }

        void shutdown() throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.cachedBundleProcessors.invalidateAll();
        }
    }

    @VisibleForTesting
    static class MetricsEnvironmentStateForBundle {
        private // Could not load outer class - annotation placement on inner may be incorrect
        @Nullable @UnknownKeyFor @Initialized MetricsEnvironment.MetricsEnvironmentState currentThreadState;

        MetricsEnvironmentStateForBundle() {
        }

        public void start(@UnknownKeyFor @NonNull @Initialized MetricsContainer container) {
            this.currentThreadState = MetricsEnvironment.getMetricsEnvironmentStateForCurrentThread();
            this.currentThreadState.activate(container);
        }

        public void reset() {
            this.currentThreadState.activate(null);
            this.currentThreadState = null;
        }

        public void discard() {
            this.currentThreadState.activate(null);
        }
    }
}

