/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.fnexecution.state;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.repackaged.direct_java.runners.core.InMemoryStateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;

public class InMemoryBagUserStateFactory<K, V, W extends BoundedWindow>
implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
    private List<InMemorySingleKeyBagState> handlers = new ArrayList<InMemorySingleKeyBagState>();

    @Override
    public StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String pTransformId, String userStateId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
        InMemorySingleKeyBagState bagUserStateHandler = new InMemorySingleKeyBagState(userStateId, valueCoder, windowCoder);
        this.handlers.add(bagUserStateHandler);
        return bagUserStateHandler;
    }

    public void resetForNewKey() {
        for (InMemorySingleKeyBagState stateBags : this.handlers) {
            stateBags.reset();
        }
    }

    static class InMemorySingleKeyBagState<K, V, W extends BoundedWindow>
    implements StateRequestHandlers.BagUserStateHandler<K, V, W> {
        private final StateTag<BagState<V>> stateTag;
        private final Coder<W> windowCoder;
        private final ByteString cacheToken;
        private volatile StateInternals stateInternals;

        InMemorySingleKeyBagState(String userStateId, Coder<V> valueCoder, Coder<W> windowCoder) {
            this.windowCoder = windowCoder;
            this.stateTag = StateTags.bag(userStateId, valueCoder);
            this.cacheToken = ByteString.copyFrom((byte[])UUID.randomUUID().toString().getBytes(Charsets.UTF_8));
        }

        @Override
        public Iterable<V> get(K key, W window) {
            this.initStateInternals(key);
            StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
            BagState<V> bagState = this.stateInternals.state(namespace, this.stateTag);
            return bagState.read();
        }

        @Override
        public void append(K key, W window, Iterator<V> values) {
            this.initStateInternals(key);
            StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
            BagState<V> bagState = this.stateInternals.state(namespace, this.stateTag);
            while (values.hasNext()) {
                bagState.add(values.next());
            }
        }

        @Override
        public void clear(K key, W window) {
            this.initStateInternals(key);
            StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
            BagState<V> bagState = this.stateInternals.state(namespace, this.stateTag);
            bagState.clear();
        }

        @Override
        public Optional<ByteString> getCacheToken() {
            return Optional.of(this.cacheToken);
        }

        private void initStateInternals(K key) {
            if (this.stateInternals == null) {
                this.stateInternals = InMemoryStateInternals.forKey(key);
            }
        }

        void reset() {
            this.stateInternals = null;
        }
    }
}

