/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateFetchingIterators;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;

public class MultimapUserState<K, V> {
    private final BeamFnStateClient beamFnStateClient;
    private final Coder<K> mapKeyCoder;
    private final Coder<V> valueCoder;
    private final String stateId;
    private final BeamFnApi.StateRequest keysStateRequest;
    private final BeamFnApi.StateRequest userStateRequest;
    private boolean isClosed;
    private boolean isCleared;
    private HashSet<K> pendingRemoves = Sets.newHashSet();
    private HashMap<K, List<V>> pendingAdds = Maps.newHashMap();
    private HashSet<K> negativeCache = Sets.newHashSet();
    private Multimap<K, V> persistedValues = ArrayListMultimap.create();
    private @Nullable Iterable<K> persistedKeys = null;

    public MultimapUserState(BeamFnStateClient beamFnStateClient, String instructionId, String pTransformId, String stateId, ByteString encodedWindow, ByteString encodedKey, Coder<K> mapKeyCoder, Coder<V> valueCoder) {
        this.beamFnStateClient = beamFnStateClient;
        this.mapKeyCoder = mapKeyCoder;
        this.valueCoder = valueCoder;
        this.stateId = stateId;
        BeamFnApi.StateRequest.Builder keysStateRequestBuilder = BeamFnApi.StateRequest.newBuilder();
        keysStateRequestBuilder.setInstructionId(instructionId).getStateKeyBuilder().getMultimapKeysUserStateBuilder().setTransformId(pTransformId).setUserStateId(stateId).setKey(encodedKey).setWindow(encodedWindow);
        this.keysStateRequest = keysStateRequestBuilder.build();
        BeamFnApi.StateRequest.Builder userStateRequestBuilder = BeamFnApi.StateRequest.newBuilder();
        userStateRequestBuilder.setInstructionId(instructionId).getStateKeyBuilder().getMultimapUserStateBuilder().setTransformId(pTransformId).setUserStateId(stateId).setWindow(encodedWindow).setKey(encodedKey);
        this.userStateRequest = userStateRequestBuilder.build();
    }

    public void clear() {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        this.isCleared = true;
        this.persistedValues = ArrayListMultimap.create();
        this.persistedKeys = null;
        this.pendingRemoves = Sets.newHashSet();
        this.pendingAdds = Maps.newHashMap();
        this.negativeCache = Sets.newHashSet();
    }

    public Iterable<V> get(K key) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        List pendingAddValues = this.pendingAdds.getOrDefault(key, Collections.emptyList());
        Collection pendingValues = Collections.unmodifiableCollection(pendingAddValues.subList(0, pendingAddValues.size()));
        if (this.isCleared || this.pendingRemoves.contains(key)) {
            return pendingValues;
        }
        Iterable<V> persistedValues = this.getPersistedValues(key);
        return Iterables.concat(persistedValues, pendingValues);
    }

    public Iterable<K> keys() {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        if (this.isCleared) {
            return Collections.unmodifiableCollection(Lists.newArrayList(this.pendingAdds.keySet()));
        }
        HashSet<K> keys = Sets.newHashSet(this.getPersistedKeys());
        keys.removeAll(this.pendingRemoves);
        keys.addAll(this.pendingAdds.keySet());
        return Collections.unmodifiableCollection(keys);
    }

    public void put(K key, V value) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        this.pendingAdds.putIfAbsent(key, new ArrayList());
        this.pendingAdds.get(key).add(value);
    }

    public void remove(K key) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        this.pendingAdds.remove(key);
        if (!this.isCleared) {
            this.pendingRemoves.add(key);
        }
    }

    public void asyncClose() throws Exception {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        this.isClosed = true;
        if (!this.isCleared && this.pendingRemoves.isEmpty() && this.pendingAdds.isEmpty()) {
            return;
        }
        if (this.isCleared) {
            this.beamFnStateClient.handle(this.keysStateRequest.toBuilder().setClear(BeamFnApi.StateClearRequest.getDefaultInstance())).get();
        } else if (!this.pendingRemoves.isEmpty()) {
            for (K k : this.pendingRemoves) {
                this.beamFnStateClient.handle(this.createUserStateRequest(k).toBuilder().setClear(BeamFnApi.StateClearRequest.getDefaultInstance())).get();
            }
        }
        if (!this.pendingAdds.isEmpty()) {
            for (Map.Entry entry : this.pendingAdds.entrySet()) {
                this.beamFnStateClient.handle(this.createUserStateRequest(entry.getKey()).toBuilder().setAppend(BeamFnApi.StateAppendRequest.newBuilder().setData(this.encodeValues((Iterable)entry.getValue())))).get();
            }
        }
    }

    private ByteString encodeValues(Iterable<V> values) {
        try {
            ByteString.Output output = ByteString.newOutput();
            for (V value : values) {
                this.valueCoder.encode(value, output);
            }
            return output.toByteString();
        }
        catch (IOException e) {
            throw new IllegalStateException(String.format("Failed to encode values for multimap user state id %s.", this.stateId), e);
        }
    }

    private BeamFnApi.StateRequest createUserStateRequest(K key) {
        try {
            ByteString.Output output = ByteString.newOutput();
            this.mapKeyCoder.encode(key, output);
            BeamFnApi.StateRequest.Builder request = this.userStateRequest.toBuilder();
            request.getStateKeyBuilder().getMultimapUserStateBuilder().setMapKey(output.toByteString());
            return request.build();
        }
        catch (IOException e) {
            throw new IllegalStateException(String.format("Failed to encode key for multimap user state id %s.", this.stateId), e);
        }
    }

    private Iterable<V> getPersistedValues(K key) {
        if (this.negativeCache.contains(key)) {
            return Collections.emptyList();
        }
        if (this.persistedValues.get(key).isEmpty()) {
            PrefetchableIterable<V> values = StateFetchingIterators.readAllAndDecodeStartingFrom(this.beamFnStateClient, this.createUserStateRequest(key), this.valueCoder);
            if (Iterables.isEmpty(values)) {
                this.negativeCache.add(key);
            }
            this.persistedValues.putAll(key, values);
        }
        return Iterables.unmodifiableIterable(this.persistedValues.get(key));
    }

    private Iterable<K> getPersistedKeys() {
        Preconditions.checkState(!this.isCleared);
        if (this.persistedKeys == null) {
            PrefetchableIterable<K> keys = StateFetchingIterators.readAllAndDecodeStartingFrom(this.beamFnStateClient, this.keysStateRequest, this.mapKeyCoder);
            this.persistedKeys = Iterables.unmodifiableIterable(keys);
        }
        return this.persistedKeys;
    }
}

