/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.OrderedListUserState;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.junit.Assert;

public class FakeBeamFnStateClient
implements BeamFnStateClient {
    private static final int DEFAULT_CHUNK_SIZE = 6;
    private final Map<BeamFnApi.StateKey, List<ByteString>> data;
    private int currentId;
    private final Map<BeamFnApi.StateKey, NavigableSet<Long>> orderedListSortKeysFromStateKey;

    public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<BeamFnApi.StateKey, List<V>> initialData) {
        this(valueCoder, initialData, 6);
    }

    public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<BeamFnApi.StateKey, List<V>> initialData, int chunkSize) {
        this(Maps.transformValues(initialData, value -> KV.of((Object)valueCoder, (Object)value)), chunkSize);
    }

    public FakeBeamFnStateClient(Map<BeamFnApi.StateKey, KV<Coder<?>, List<?>>> initialData) {
        this(initialData, 6);
    }

    public FakeBeamFnStateClient(Map<BeamFnApi.StateKey, KV<Coder<?>, List<?>>> initialData, int chunkSize) {
        HashMap encodedData = new HashMap(Maps.transformValues(initialData, coderAndValues -> {
            ArrayList<ByteString> chunks = new ArrayList<ByteString>();
            ByteStringOutputStream output = new ByteStringOutputStream();
            for (Object value : (List)coderAndValues.getValue()) {
                try {
                    ((Coder)coderAndValues.getKey()).encode(value, (OutputStream)output);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                if (output.size() < chunkSize) continue;
                ByteString chunk = output.toByteStringAndReset();
                int i = 0;
                while (i + chunkSize <= chunk.size()) {
                    chunks.add(ByteString.copyFrom((byte[])chunk.substring(i, i + chunkSize).toByteArray()));
                    i += chunkSize;
                }
                if (i >= chunk.size()) continue;
                chunks.add(ByteString.copyFrom((byte[])chunk.substring(i, chunk.size()).toByteArray()));
            }
            if (output.size() > 0) {
                chunks.add(output.toByteString());
            }
            return chunks;
        }));
        List orderedListStateKeys = initialData.keySet().stream().filter(k -> k.getTypeCase() == BeamFnApi.StateKey.TypeCase.ORDERED_LIST_USER_STATE).collect(Collectors.toList());
        this.orderedListSortKeysFromStateKey = new HashMap<BeamFnApi.StateKey, NavigableSet<Long>>();
        for (BeamFnApi.StateKey key : orderedListStateKeys) {
            long sortKey = key.getOrderedListUserState().getRange().getStart();
            BeamFnApi.StateKey.Builder keyBuilder = key.toBuilder();
            keyBuilder.getOrderedListUserStateBuilder().clearRange();
            this.orderedListSortKeysFromStateKey.computeIfAbsent(keyBuilder.build(), unused -> new TreeSet()).add(sortKey);
        }
        this.data = new ConcurrentHashMap<BeamFnApi.StateKey, List<ByteString>>(Maps.filterValues(encodedData, byteStrings -> !byteStrings.isEmpty()));
    }

    public Map<BeamFnApi.StateKey, ByteString> getData() {
        return Maps.transformValues(this.data, bs -> {
            ByteString all = ByteString.EMPTY;
            for (ByteString b : bs) {
                all = all.concat(b);
            }
            return all;
        });
    }

    public Map<BeamFnApi.StateKey, List<ByteString>> getRawData() {
        return this.data;
    }

    public CompletableFuture<BeamFnApi.StateResponse> handle(BeamFnApi.StateRequest.Builder requestBuilder) {
        BeamFnApi.StateResponse.Builder response;
        Assert.assertEquals("", requestBuilder.getId());
        requestBuilder.setId(this.generateId());
        BeamFnApi.StateRequest request = requestBuilder.build();
        BeamFnApi.StateKey key = request.getStateKey();
        Assert.assertNotEquals(BeamFnApi.StateRequest.RequestCase.REQUEST_NOT_SET, request.getRequestCase());
        Assert.assertNotEquals(BeamFnApi.StateKey.TypeCase.TYPE_NOT_SET, key.getTypeCase());
        if (key.getTypeCase() == BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT || key.getTypeCase() == BeamFnApi.StateKey.TypeCase.RUNNER) {
            Assert.assertEquals(BeamFnApi.StateRequest.RequestCase.GET, request.getRequestCase());
        }
        if (key.getTypeCase() == BeamFnApi.StateKey.TypeCase.MULTIMAP_KEYS_VALUES_SIDE_INPUT && !this.data.containsKey(key)) {
            throw new UnsupportedOperationException("No multimap keys values states provided.");
        }
        switch (request.getRequestCase()) {
            case GET: {
                if (key.getTypeCase() == BeamFnApi.StateKey.TypeCase.ORDERED_LIST_USER_STATE) {
                    ByteString continuationToken;
                    long start = key.getOrderedListUserState().getRange().getStart();
                    long end = key.getOrderedListUserState().getRange().getEnd();
                    KvCoder coder = KvCoder.of((Coder)VarLongCoder.of(), (Coder)VarIntCoder.of());
                    long sortKey = start;
                    int index = 0;
                    if (!request.getGet().getContinuationToken().isEmpty()) {
                        try {
                            KV cursor = coder.decode(request.getGet().getContinuationToken().newInput());
                            sortKey = (Long)cursor.getKey();
                            index = (Integer)cursor.getValue();
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    ByteString returnBlock = ByteString.EMPTY;
                    try {
                        if (sortKey < start || sortKey >= end) {
                            throw new IndexOutOfBoundsException("sort key out of range");
                        }
                        BeamFnApi.StateKey.Builder stateKeyWithoutRange = request.getStateKey().toBuilder();
                        stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange();
                        NavigableSet<Long> subset = ((NavigableSet)this.orderedListSortKeysFromStateKey.getOrDefault(stateKeyWithoutRange.build(), new TreeSet())).subSet(sortKey, true, end, false);
                        Long nextSortKey = (Long)subset.first();
                        BeamFnApi.StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
                        keyBuilder.getOrderedListUserStateBuilder().getRangeBuilder().setStart(nextSortKey.longValue()).setEnd(nextSortKey + 1L);
                        List<ByteString> byteStrings = this.data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY));
                        returnBlock = byteStrings.get(index);
                        if (byteStrings.size() > index + 1) {
                            ++index;
                        } else {
                            nextSortKey = (Long)subset.tailSet(nextSortKey, false).first();
                            index = 0;
                        }
                        ByteStringOutputStream outputStream = new ByteStringOutputStream();
                        try {
                            KV cursor = KV.of((Object)nextSortKey, (Object)index);
                            coder.encode(cursor, (OutputStream)outputStream);
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                        continuationToken = outputStream.toByteString();
                    }
                    catch (IndexOutOfBoundsException | NoSuchElementException e) {
                        continuationToken = ByteString.EMPTY;
                    }
                    response = BeamFnApi.StateResponse.newBuilder().setGet(BeamFnApi.StateGetResponse.newBuilder().setData(returnBlock).setContinuationToken(continuationToken));
                    break;
                }
                List<ByteString> byteStrings = this.data.getOrDefault(request.getStateKey(), Collections.singletonList(ByteString.EMPTY));
                int block = 0;
                if (!request.getGet().getContinuationToken().isEmpty()) {
                    block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
                }
                ByteString returnBlock = byteStrings.get(block);
                ByteString continuationToken = ByteString.EMPTY;
                if (byteStrings.size() > block + 1) {
                    continuationToken = ByteString.copyFromUtf8((String)Integer.toString(block + 1));
                }
                response = BeamFnApi.StateResponse.newBuilder().setGet(BeamFnApi.StateGetResponse.newBuilder().setData(returnBlock).setContinuationToken(continuationToken));
                break;
            }
            case CLEAR: {
                if (key.getTypeCase() == BeamFnApi.StateKey.TypeCase.ORDERED_LIST_USER_STATE) {
                    BeamFnApi.OrderedListRange r = request.getStateKey().getOrderedListUserState().getRange();
                    BeamFnApi.StateKey.Builder stateKeyWithoutRange = request.getStateKey().toBuilder();
                    stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange();
                    ArrayList<Long> keysToRemove = new ArrayList<Long>(((NavigableSet)this.orderedListSortKeysFromStateKey.getOrDefault(stateKeyWithoutRange.build(), new TreeSet())).subSet(r.getStart(), true, r.getEnd(), false));
                    for (Long l : keysToRemove) {
                        BeamFnApi.StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
                        keyBuilder.getOrderedListUserStateBuilder().getRangeBuilder().setStart(l.longValue()).setEnd(l + 1L);
                        this.data.remove(keyBuilder.build());
                        this.orderedListSortKeysFromStateKey.get(stateKeyWithoutRange.build()).remove(l);
                    }
                } else {
                    this.data.remove(request.getStateKey());
                }
                response = BeamFnApi.StateResponse.newBuilder().setClear(BeamFnApi.StateClearResponse.getDefaultInstance());
                break;
            }
            case APPEND: {
                if (key.getTypeCase() == BeamFnApi.StateKey.TypeCase.ORDERED_LIST_USER_STATE) {
                    InputStream inStream = request.getAppend().getData().newInput();
                    OrderedListUserState.TimestampedValueCoder coder = OrderedListUserState.TimestampedValueCoder.of((Coder)ByteArrayCoder.of());
                    try {
                        while (inStream.available() > 0) {
                            TimestampedValue tv = coder.decode(inStream);
                            ByteStringOutputStream outStream = new ByteStringOutputStream();
                            coder.encode(tv, (OutputStream)outStream);
                            ByteString output = outStream.toByteString();
                            BeamFnApi.StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
                            long sortKey = tv.getTimestamp().getMillis();
                            keyBuilder.getOrderedListUserStateBuilder().getRangeBuilder().setStart(sortKey).setEnd(sortKey + 1L);
                            List previousValues = this.data.computeIfAbsent(keyBuilder.build(), unused -> new ArrayList());
                            previousValues.add(output);
                            BeamFnApi.StateKey.Builder stateKeyWithoutRange = request.getStateKey().toBuilder();
                            stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange();
                            this.orderedListSortKeysFromStateKey.computeIfAbsent(stateKeyWithoutRange.build(), unused -> new TreeSet()).add(sortKey);
                        }
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                } else {
                    List previousValue = this.data.computeIfAbsent(request.getStateKey(), unused -> new ArrayList());
                    previousValue.add(request.getAppend().getData());
                }
                response = BeamFnApi.StateResponse.newBuilder().setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance());
                break;
            }
            default: {
                throw new IllegalStateException(String.format("Unknown request type %s", request.getRequestCase()));
            }
        }
        return CompletableFuture.completedFuture(response.setId(requestBuilder.getId()).build());
    }

    private String generateId() {
        return Integer.toString(++this.currentId);
    }

    public int getCallCount() {
        return this.currentId;
    }
}

