/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.BagUserState;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.IterableSideInput;
import org.apache.beam.fn.harness.state.MultimapSideInput;
import org.apache.beam.fn.harness.state.MultimapUserState;
import org.apache.beam.fn.harness.state.SideInputSpec;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.MultimapState;
import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateBinder;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class FnApiStateAccessor<@UnknownKeyFor K>
implements SideInputReader,
StateBinder {
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions;
    private final @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilites;
    private final @UnknownKeyFor @NonNull @Initialized Map< @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey, @UnknownKeyFor @NonNull @Initialized Object> stateKeyObjectCache;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized SideInputSpec> sideInputSpecMap;
    private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
    private final @UnknownKeyFor @NonNull @Initialized String ptransformId;
    private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> processBundleInstructionId;
    private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized List< @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> bundleCache;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> processWideCache;
    private final @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized ThrowingRunnable> stateFinalizers;
    private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized BoundedWindow> currentWindowSupplier;
    private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized ByteString> encodedCurrentKeySupplier;
    private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized ByteString> encodedCurrentWindowSupplier;

    public FnApiStateAccessor(@UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> runnerCapabilites, @UnknownKeyFor @NonNull @Initialized String ptransformId, @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> processBundleInstructionId, @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized List< @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> bundleCache, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> processWideCache, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized SideInputSpec> sideInputSpecMap, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized BoundedWindow> windowCoder, @UnknownKeyFor @NonNull @Initialized Supplier<K> currentKeySupplier, @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized BoundedWindow> currentWindowSupplier) {
        this.pipelineOptions = pipelineOptions;
        this.runnerCapabilites = runnerCapabilites;
        this.stateKeyObjectCache = Maps.newHashMap();
        this.sideInputSpecMap = sideInputSpecMap;
        this.beamFnStateClient = beamFnStateClient;
        this.ptransformId = ptransformId;
        this.processBundleInstructionId = processBundleInstructionId;
        this.cacheTokens = cacheTokens;
        this.bundleCache = bundleCache;
        this.processWideCache = processWideCache;
        this.stateFinalizers = new ArrayList<ThrowingRunnable>();
        this.currentWindowSupplier = currentWindowSupplier;
        this.encodedCurrentKeySupplier = FnApiStateAccessor.memoizeFunction(currentKeySupplier, key -> {
            Preconditions.checkState((keyCoder != null ? 1 : 0) != 0, (Object)"Accessing state in unkeyed context, no key coder available");
            ByteStringOutputStream encodedKeyOut = new ByteStringOutputStream();
            try {
                keyCoder.encode(key, (OutputStream)encodedKeyOut, Coder.Context.NESTED);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            return encodedKeyOut.toByteString();
        });
        this.encodedCurrentWindowSupplier = FnApiStateAccessor.memoizeFunction(currentWindowSupplier, window -> {
            ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
            try {
                windowCoder.encode(window, (OutputStream)encodedWindowOut);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            return encodedWindowOut.toByteString();
        });
    }

    private static <ArgT, ResultT> @UnknownKeyFor @NonNull @Initialized Supplier<ResultT> memoizeFunction(final @UnknownKeyFor @NonNull @Initialized Supplier<ArgT> arg, final @UnknownKeyFor @NonNull @Initialized Function<ArgT, ResultT> f) {
        return new Supplier<ResultT>(){
            private ArgT memoizedArg;
            private ResultT memoizedResult;
            private @UnknownKeyFor @NonNull @Initialized boolean initialized;

            @Override
            public ResultT get() {
                Object currentArg = arg.get();
                if (currentArg != this.memoizedArg || !this.initialized) {
                    this.memoizedArg = currentArg;
                    this.memoizedResult = f.apply(this.memoizedArg);
                    this.initialized = true;
                }
                return this.memoizedResult;
            }
        };
    }

    @Override
    public <T> @Nullable T get(@UnknownKeyFor @NonNull @Initialized PCollectionView<T> view, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
        TupleTag tag = view.getTagInternal();
        SideInputSpec sideInputSpec = this.sideInputSpecMap.get(tag);
        Preconditions.checkArgument((sideInputSpec != null ? 1 : 0) != 0, (String)"Attempting to access unknown side input %s.", view);
        ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
        try {
            sideInputSpec.getWindowCoder().encode((Object)sideInputSpec.getWindowMappingFn().getSideInputWindow(window), (OutputStream)encodedWindowOut);
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
        ByteString encodedWindow = encodedWindowOut.toByteString();
        BeamFnApi.StateKey.Builder cacheKeyBuilder = BeamFnApi.StateKey.newBuilder();
        switch (sideInputSpec.getAccessPattern()) {
            case "beam:side_input:iterable:v1": {
                cacheKeyBuilder.getIterableSideInputBuilder().setTransformId(this.ptransformId).setSideInputId(tag.getId()).setWindow(encodedWindow);
                break;
            }
            case "beam:side_input:multimap:v1": {
                Preconditions.checkState((boolean)(sideInputSpec.getCoder() instanceof KvCoder), (String)"Expected %s but received %s.", KvCoder.class, sideInputSpec.getCoder().getClass());
                cacheKeyBuilder.getMultimapKeysSideInputBuilder().setTransformId(this.ptransformId).setSideInputId(tag.getId()).setWindow(encodedWindow);
                break;
            }
            default: {
                throw new IllegalStateException(String.format("This SDK is only capable of dealing with %s materializations but was asked to handle %s for PCollectionView with tag %s.", ImmutableList.of((Object)"beam:side_input:iterable:v1", (Object)"beam:side_input:multimap:v1"), sideInputSpec.getAccessPattern(), tag));
            }
        }
        return (T)this.stateKeyObjectCache.computeIfAbsent(cacheKeyBuilder.build(), key -> {
            switch (sideInputSpec.getAccessPattern()) {
                case "beam:side_input:iterable:v1": {
                    return sideInputSpec.getViewFn().apply(new IterableSideInput(this.getCacheFor((BeamFnApi.StateKey)key), this.beamFnStateClient, this.processBundleInstructionId.get(), (BeamFnApi.StateKey)key, sideInputSpec.getCoder()));
                }
                case "beam:side_input:multimap:v1": {
                    return sideInputSpec.getViewFn().apply(new MultimapSideInput(this.getCacheFor((BeamFnApi.StateKey)key), this.beamFnStateClient, this.processBundleInstructionId.get(), (BeamFnApi.StateKey)key, ((KvCoder)sideInputSpec.getCoder()).getKeyCoder(), ((KvCoder)sideInputSpec.getCoder()).getValueCoder(), this.runnerCapabilites.contains(BeamUrns.getUrn((ProtocolMessageEnum)RunnerApi.StandardRunnerProtocols.Enum.MULTIMAP_KEYS_VALUES_SIDE_INPUT))));
                }
            }
            throw new IllegalStateException(String.format("This SDK is only capable of dealing with %s materializations but was asked to handle %s for PCollectionView with tag %s.", ImmutableList.of((Object)"beam:side_input:iterable:v1", (Object)"beam:side_input:multimap:v1"), sideInputSpec.getAccessPattern(), tag));
        });
    }

    @Override
    public <T> @UnknownKeyFor @NonNull @Initialized boolean contains(@UnknownKeyFor @NonNull @Initialized PCollectionView<T> view) {
        return this.sideInputSpecMap.containsKey(view.getTagInternal());
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized boolean isEmpty() {
        return this.sideInputSpecMap.isEmpty();
    }

    public <T> @UnknownKeyFor @NonNull @Initialized ValueState<T> bindValue(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<T>> spec, final @UnknownKeyFor @NonNull @Initialized Coder<T> coder) {
        return (ValueState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public @UnknownKeyFor @NonNull @Initialized Object apply(final  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey key) {
                return new ValueState<T>(){
                    private final @UnknownKeyFor @NonNull @Initialized BagUserState<T> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(key, coder);
                    }

                    public void clear() {
                        this.impl.clear();
                    }

                    public void write(T input) {
                        this.impl.clear();
                        this.impl.append(input);
                    }

                    public T read() {
                        Iterator value = this.impl.get().iterator();
                        if (value.hasNext()) {
                            return value.next();
                        }
                        return null;
                    }

                    public @UnknownKeyFor @NonNull @Initialized ValueState<T> readLater() {
                        this.impl.get().prefetch();
                        return this;
                    }
                };
            }
        });
    }

    public <T> @UnknownKeyFor @NonNull @Initialized BagState<T> bindBag(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized BagState<T>> spec, final @UnknownKeyFor @NonNull @Initialized Coder<T> elemCoder) {
        return (BagState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public @UnknownKeyFor @NonNull @Initialized Object apply(final  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey key) {
                return new BagState<T>(){
                    private final @UnknownKeyFor @NonNull @Initialized BagUserState<T> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(key, elemCoder);
                    }

                    public void add(T value) {
                        this.impl.append(value);
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            public @Nullable @UnknownKeyFor @Initialized Boolean read() {
                                return !impl.get().iterator().hasNext();
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                                return this;
                            }
                        };
                    }

                    public @UnknownKeyFor @NonNull @Initialized Iterable<T> read() {
                        return this.impl.get();
                    }

                    public @UnknownKeyFor @NonNull @Initialized BagState<T> readLater() {
                        this.impl.get().prefetch();
                        return this;
                    }

                    public void clear() {
                        this.impl.clear();
                    }
                };
            }
        });
    }

    public <T> @UnknownKeyFor @NonNull @Initialized SetState<T> bindSet(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized SetState<T>> spec, final @UnknownKeyFor @NonNull @Initialized Coder<T> elemCoder) {
        return (SetState)this.stateKeyObjectCache.computeIfAbsent(this.createMultimapKeysUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public @UnknownKeyFor @NonNull @Initialized Object apply(final  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey key) {
                return new SetState<T>(){
                    private final @UnknownKeyFor @NonNull @Initialized MultimapUserState<T, @UnknownKeyFor @Nullable @Initialized Void> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createMultimapUserState(key, elemCoder, (Coder)VoidCoder.of());
                    }

                    public void clear() {
                        this.impl.clear();
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> contains(final T t2) {
                        return new ReadableState<Boolean>(){

                            public @UnknownKeyFor @NonNull @Initialized Boolean read() {
                                return !Iterables.isEmpty(impl.get(t2));
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                                impl.get(t2).prefetch();
                                return this;
                            }
                        };
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> addIfAbsent(T t2) {
                        boolean isEmpty = Iterables.isEmpty(this.impl.get(t2));
                        if (isEmpty) {
                            this.impl.put(t2, null);
                        }
                        return ReadableStates.immediate((Object)isEmpty);
                    }

                    public void remove(T t2) {
                        this.impl.remove(t2);
                    }

                    public void add(T value) {
                        this.impl.remove(value);
                        this.impl.put(value, null);
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            public @UnknownKeyFor @NonNull @Initialized Boolean read() {
                                return Iterables.isEmpty(impl.keys());
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                                impl.keys().prefetch();
                                return this;
                            }
                        };
                    }

                    public @UnknownKeyFor @NonNull @Initialized Iterable<T> read() {
                        return this.impl.keys();
                    }

                    public @UnknownKeyFor @NonNull @Initialized SetState<T> readLater() {
                        this.impl.keys().prefetch();
                        return this;
                    }
                };
            }
        });
    }

    public <KeyT, ValueT> @UnknownKeyFor @NonNull @Initialized MapState<KeyT, ValueT> bindMap(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized MapState<KeyT, ValueT>> spec, final @UnknownKeyFor @NonNull @Initialized Coder<KeyT> mapKeyCoder, final @UnknownKeyFor @NonNull @Initialized Coder<ValueT> mapValueCoder) {
        return (MapState)this.stateKeyObjectCache.computeIfAbsent(this.createMultimapKeysUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public @UnknownKeyFor @NonNull @Initialized Object apply(final  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey key) {
                return new MapState<KeyT, ValueT>(){
                    private final @UnknownKeyFor @NonNull @Initialized MultimapUserState<KeyT, ValueT> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createMultimapUserState(key, mapKeyCoder, mapValueCoder);
                    }

                    public void clear() {
                        this.impl.clear();
                    }

                    public void put(KeyT key2, ValueT value) {
                        this.impl.remove(key2);
                        this.impl.put(key2, value);
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> computeIfAbsent(KeyT key2, @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @Nullable @Initialized ? super KeyT, @KeyForBottom @NonNull @Initialized ? extends ValueT> mappingFunction) {
                        PrefetchableIterable values = this.impl.get(key2);
                        if (Iterables.isEmpty(values)) {
                            this.impl.put(key2, mappingFunction.apply(key2));
                        }
                        return ReadableStates.immediate((Object)Iterables.getOnlyElement(values, null));
                    }

                    public void remove(KeyT key2) {
                        this.impl.remove(key2);
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> get(KeyT key2) {
                        return this.getOrDefault(key2, null);
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> getOrDefault(final KeyT key2, final @Nullable ValueT defaultValue) {
                        return new ReadableState<ValueT>(){

                            public @Nullable ValueT read() {
                                PrefetchableIterable values = impl.get(key2);
                                return Iterables.getOnlyElement(values, (Object)defaultValue);
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<ValueT> readLater() {
                                impl.get(key2).prefetch();
                                return this;
                            }
                        };
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<KeyT>> keys() {
                        return new ReadableState<Iterable<KeyT>>(){

                            public @UnknownKeyFor @NonNull @Initialized Iterable<KeyT> read() {
                                return impl.keys();
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<KeyT>> readLater() {
                                impl.keys().prefetch();
                                return this;
                            }
                        };
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<ValueT>> values() {
                        return new ReadableState<Iterable<ValueT>>(){

                            public @UnknownKeyFor @NonNull @Initialized Iterable<ValueT> read() {
                                return Iterables.transform((Iterable)((Iterable)this.entries().read()), e -> e.getValue());
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<ValueT>> readLater() {
                                this.entries().readLater();
                                return this;
                            }
                        };
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>>> entries() {
                        return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>(){

                            public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>> read() {
                                Iterable keys = (Iterable)this.keys().read();
                                return Iterables.transform((Iterable)keys, key -> Maps.immutableEntry((Object)key, (Object)this.get(key).read()));
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Map.Entry<KeyT, ValueT>>> readLater() {
                                this.keys().readLater();
                                return this;
                            }
                        };
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            public @UnknownKeyFor @NonNull @Initialized Boolean read() {
                                return Iterables.isEmpty((Iterable)((Iterable)this.keys().read()));
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                                this.keys().readLater();
                                return this;
                            }
                        };
                    }
                };
            }
        });
    }

    public <KeyT, ValueT> @UnknownKeyFor @NonNull @Initialized MultimapState<KeyT, ValueT> bindMultimap(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized MultimapState<KeyT, ValueT>> spec, @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<ValueT> valueCoder) {
        throw new UnsupportedOperationException("Multimap is not currently supported with Fn API.");
    }

    public <T> @UnknownKeyFor @NonNull @Initialized OrderedListState<T> bindOrderedList(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized OrderedListState<T>> spec, @UnknownKeyFor @NonNull @Initialized Coder<T> elemCoder) {
        throw new UnsupportedOperationException("TODO: Add support for a sorted-list state to the Fn API.");
    }

    public <ElementT, AccumT, ResultT> @UnknownKeyFor @NonNull @Initialized CombiningState<ElementT, AccumT, ResultT> bindCombining(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized CombiningState<ElementT, AccumT, ResultT>> spec, final @UnknownKeyFor @NonNull @Initialized Coder<AccumT> accumCoder, final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<ElementT, AccumT, ResultT> combineFn) {
        return (CombiningState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public @UnknownKeyFor @NonNull @Initialized Object apply(final  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey key) {
                return new CombiningState<ElementT, AccumT, ResultT>(){
                    private final @UnknownKeyFor @NonNull @Initialized BagUserState<AccumT> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(key, accumCoder);
                    }

                    public AccumT getAccum() {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            return iterator.next();
                        }
                        return combineFn.createAccumulator();
                    }

                    public void addAccum(AccumT accum) {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            accum = combineFn.mergeAccumulators((Iterable)ImmutableList.of(iterator.next(), accum));
                            this.impl.clear();
                        }
                        this.impl.append(accum);
                    }

                    public AccumT mergeAccumulators(@UnknownKeyFor @NonNull @Initialized Iterable<AccumT> accumulators) {
                        return combineFn.mergeAccumulators(accumulators);
                    }

                    public @UnknownKeyFor @NonNull @Initialized CombiningState<ElementT, AccumT, ResultT> readLater() {
                        this.impl.get().prefetch();
                        return this;
                    }

                    public ResultT read() {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            return combineFn.extractOutput(iterator.next());
                        }
                        return combineFn.defaultValue();
                    }

                    public void add(ElementT value) {
                        Object newAccumulator = combineFn.addInput(this.getAccum(), value);
                        this.impl.clear();
                        this.impl.append(newAccumulator);
                    }

                    public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            public @Nullable @UnknownKeyFor @Initialized Boolean read() {
                                return !impl.get().iterator().hasNext();
                            }

                            public @UnknownKeyFor @NonNull @Initialized ReadableState<@UnknownKeyFor @NonNull @Initialized Boolean> readLater() {
                                impl.get().prefetch();
                                return this;
                            }
                        };
                    }

                    public void clear() {
                        this.impl.clear();
                    }
                };
            }
        });
    }

    public <ElementT, AccumT, ResultT> @UnknownKeyFor @NonNull @Initialized CombiningState<ElementT, AccumT, ResultT> bindCombiningWithContext(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized CombiningState<ElementT, AccumT, ResultT>> spec, @UnknownKeyFor @NonNull @Initialized Coder<AccumT> accumCoder, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized CombineWithContext.CombineFnWithContext<ElementT, AccumT, ResultT> combineFn) {
        return (CombiningState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), key -> this.bindCombining(id, spec, accumCoder, CombineFnUtil.bindContext((CombineWithContext.CombineFnWithContext)combineFn, (StateContext)new StateContext<BoundedWindow>(){

            public @UnknownKeyFor @NonNull @Initialized PipelineOptions getPipelineOptions() {
                return FnApiStateAccessor.this.pipelineOptions;
            }

            public <T> T sideInput(@UnknownKeyFor @NonNull @Initialized PCollectionView<T> view) {
                return FnApiStateAccessor.this.get(view, (BoundedWindow)FnApiStateAccessor.this.currentWindowSupplier.get());
            }

            public @UnknownKeyFor @NonNull @Initialized BoundedWindow window() {
                return (BoundedWindow)FnApiStateAccessor.this.currentWindowSupplier.get();
            }
        })));
    }

    @Deprecated
    public @UnknownKeyFor @NonNull @Initialized WatermarkHoldState bindWatermark(@UnknownKeyFor @NonNull @Initialized String id, @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized WatermarkHoldState> spec, @UnknownKeyFor @NonNull @Initialized TimestampCombiner timestampCombiner) {
        throw new UnsupportedOperationException("WatermarkHoldState is unsupported by the Fn API.");
    }

    private /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> getCacheFor( @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey stateKey) {
        switch (stateKey.getTypeCase()) {
            case BAG_USER_STATE: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasUserState()) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            case MULTIMAP_KEYS_USER_STATE: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasUserState()) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            case ITERABLE_SIDE_INPUT: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasSideInput() || !stateKey.getIterableSideInput().getTransformId().equals(token.getSideInput().getTransformId()) || !stateKey.getIterableSideInput().getSideInputId().equals(token.getSideInput().getSideInputId())) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            case MULTIMAP_KEYS_SIDE_INPUT: {
                for (BeamFnApi.ProcessBundleRequest.CacheToken token : this.cacheTokens.get()) {
                    if (!token.hasSideInput() || !stateKey.getMultimapKeysSideInput().getTransformId().equals(token.getSideInput().getTransformId()) || !stateKey.getMultimapKeysSideInput().getSideInputId().equals(token.getSideInput().getSideInputId())) continue;
                    return Caches.subCache(this.processWideCache, token, stateKey);
                }
                break;
            }
            default: {
                throw new IllegalStateException(String.format("Unknown state key type requested %s.", stateKey));
            }
        }
        return Caches.subCache(this.bundleCache.get(), stateKey, new Object[0]);
    }

    private <T> @UnknownKeyFor @NonNull @Initialized BagUserState<T> createBagUserState( @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey stateKey, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
        BagUserState<T> rval = new BagUserState<T>(this.getCacheFor(stateKey), this.beamFnStateClient, this.processBundleInstructionId.get(), stateKey, valueCoder);
        this.stateFinalizers.add(rval::asyncClose);
        return rval;
    }

    private  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey createBagUserStateKey(@UnknownKeyFor @NonNull @Initialized String stateId) {
        BeamFnApi.StateKey.Builder builder = BeamFnApi.StateKey.newBuilder();
        builder.getBagUserStateBuilder().setWindow(this.encodedCurrentWindowSupplier.get()).setKey(this.encodedCurrentKeySupplier.get()).setTransformId(this.ptransformId).setUserStateId(stateId);
        return builder.build();
    }

    private <KeyT, ValueT> @UnknownKeyFor @NonNull @Initialized MultimapUserState<KeyT, ValueT> createMultimapUserState( @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey stateKey, @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<ValueT> valueCoder) {
        MultimapUserState<KeyT, ValueT> rval = new MultimapUserState<KeyT, ValueT>(this.getCacheFor(stateKey), this.beamFnStateClient, this.processBundleInstructionId.get(), stateKey, keyCoder, valueCoder);
        this.stateFinalizers.add(rval::asyncClose);
        return rval;
    }

    private  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey createMultimapKeysUserStateKey(@UnknownKeyFor @NonNull @Initialized String stateId) {
        BeamFnApi.StateKey.Builder builder = BeamFnApi.StateKey.newBuilder();
        builder.getMultimapKeysUserStateBuilder().setWindow(this.encodedCurrentWindowSupplier.get()).setTransformId(this.ptransformId).setUserStateId(stateId);
        return builder.build();
    }

    public void finalizeState() {
        try {
            for (ThrowingRunnable runnable : this.stateFinalizers) {
                runnable.run();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
        this.stateFinalizers.clear();
        this.stateKeyObjectCache.clear();
    }
}

