/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.AutoValue_StateFetchingIterators_CachingStateIterable_Block;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;

public class StateFetchingIterators {
    private StateFetchingIterators() {
    }

    public static <T> @UnknownKeyFor @NonNull @Initialized CachingStateIterable<T> readAllAndDecodeStartingFrom(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> cache, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient,  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
        return new CachingStateIterable(cache, beamFnStateClient, stateRequestForFirstChunk, valueCoder);
    }

    @VisibleForTesting
    static class LazyBlockingStateFetchingIterator
    implements PrefetchableIterator<ByteString> {
        private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
        private final  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk;
        private @UnknownKeyFor @NonNull @Initialized ByteString continuationToken;
        private @UnknownKeyFor @NonNull @Initialized CompletableFuture< @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse> prefetchedResponse;

        LazyBlockingStateFetchingIterator(@UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient,  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequestForFirstChunk;
            this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
        }

        @javax.annotation.Nullable
        public @UnknownKeyFor @Nullable @Initialized ByteString getContinuationToken() {
            return this.continuationToken;
        }

        public void seekToContinuationToken(@javax.annotation.Nullable @UnknownKeyFor @Nullable @Initialized ByteString continuationToken) {
            if (Objects.equals(this.continuationToken, continuationToken)) {
                return;
            }
            this.continuationToken = continuationToken;
            this.prefetchedResponse = null;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized boolean isReady() {
            if (this.prefetchedResponse == null) {
                return this.continuationToken == null;
            }
            return this.prefetchedResponse.isDone();
        }

        @Override
        public void prefetch() {
            if (this.continuationToken != null && this.prefetchedResponse == null) {
                this.prefetchedResponse = this.loadPrefetchedResponse(this.continuationToken);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized CompletableFuture< @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse> loadPrefetchedResponse(@UnknownKeyFor @NonNull @Initialized ByteString continuationToken) {
            return this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(continuationToken)));
        }

        @Override
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
            return this.continuationToken != null;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized ByteString next() {
            BeamFnApi.StateResponse stateResponse;
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            this.prefetch();
            try {
                stateResponse = this.prefetchedResponse.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
            catch (ExecutionException e) {
                if (e.getCause() == null) {
                    throw new IllegalStateException(e);
                }
                Throwables.throwIfUnchecked((Throwable)e.getCause());
                throw new IllegalStateException(e.getCause());
            }
            this.prefetchedResponse = null;
            if (ByteString.EMPTY.equals((Object)stateResponse.getGet().getContinuationToken())) {
                this.continuationToken = null;
            } else {
                this.continuationToken = stateResponse.getGet().getContinuationToken();
                this.prefetch();
            }
            return stateResponse.getGet().getData();
        }
    }

    static class CachingStateIterable<@UnknownKeyFor T>
    extends PrefetchableIterables.Default<T> {
        private final @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized IterableCacheKey, @UnknownKeyFor @NonNull @Initialized Blocks<T>> cache;
        private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
        private final  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk;
        private final @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder;

        private static <T> @UnknownKeyFor @NonNull @Initialized long sumWeight(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> blocks) {
            try {
                long sum = 0L;
                for (int i = 0; i < blocks.size(); ++i) {
                    sum = Math.addExact(sum, blocks.get(i).getWeight());
                }
                return sum;
            }
            catch (ArithmeticException e) {
                return Long.MAX_VALUE;
            }
        }

        private static @UnknownKeyFor @NonNull @Initialized long addBoundByMax(@UnknownKeyFor @NonNull @Initialized long first, @UnknownKeyFor @NonNull @Initialized long second) {
            try {
                return Math.addExact(first, second);
            }
            catch (ArithmeticException e) {
                return Long.MAX_VALUE;
            }
        }

        public CachingStateIterable(@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized IterableCacheKey, @UnknownKeyFor @NonNull @Initialized Blocks<T>> cache, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient,  @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
            this.cache = cache;
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequestForFirstChunk;
            this.valueCoder = valueCoder;
        }

        public void remove(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized Object> toRemoveStructuralValues) {
            if (toRemoveStructuralValues.isEmpty()) {
                return;
            }
            Blocks<T> existing = this.cache.peek(IterableCacheKey.INSTANCE);
            if (existing == null) {
                return;
            }
            if (existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken() != null) {
                this.cache.remove(IterableCacheKey.INSTANCE);
            }
            List<Block<T>> blocks = existing.getBlocks();
            long totalWeight = 0L;
            int totalSize = 0;
            for (int i = 0; i < blocks.size(); ++i) {
                totalSize += blocks.get(i).getValues().size();
            }
            ArrayList<T> allValues = new ArrayList<T>(totalSize);
            for (int i = 0; i < blocks.size(); ++i) {
                int startIndex = allValues.size();
                for (T value : blocks.get(i).getValues()) {
                    if (toRemoveStructuralValues.contains(this.valueCoder.structuralValue(value))) continue;
                    allValues.add(value);
                }
                if (startIndex + blocks.get(i).getValues().size() == allValues.size()) {
                    totalWeight = CachingStateIterable.addBoundByMax(totalWeight, blocks.get(i).getWeight());
                    continue;
                }
                for (int j = startIndex; j < allValues.size(); ++j) {
                    totalWeight = CachingStateIterable.addBoundByMax(totalWeight, Caches.weigh(allValues.get(j)));
                }
            }
            this.cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks(Block.mutatedBlock(allValues, totalWeight)));
        }

        public void clearAndAppend(@UnknownKeyFor @NonNull @Initialized List<T> values) {
            this.cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<T>(Block.mutatedBlock(values, Caches.weigh(values))));
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized PrefetchableIterator<T> createIterator() {
            return new CachingStateIterator();
        }

        public void append(@UnknownKeyFor @NonNull @Initialized List<T> values) {
            if (values.isEmpty()) {
                return;
            }
            Blocks<T> existing = this.cache.peek(IterableCacheKey.INSTANCE);
            if (existing == null) {
                return;
            }
            if (existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken() != null) {
                this.cache.remove(IterableCacheKey.INSTANCE);
            }
            List<Block<T>> blocks = existing.getBlocks();
            long totalWeight = CachingStateIterable.addBoundByMax(Caches.weigh(values), CachingStateIterable.sumWeight(blocks));
            int totalSize = values.size();
            for (int i = 0; i < blocks.size(); ++i) {
                totalSize += blocks.get(i).getValues().size();
            }
            ArrayList<T> allValues = new ArrayList<T>(totalSize);
            for (int i = 0; i < blocks.size(); ++i) {
                allValues.addAll(blocks.get(i).getValues());
            }
            allValues.addAll(values);
            this.cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks(Block.mutatedBlock(allValues, totalWeight)));
        }

        class CachingStateIterator
        implements PrefetchableIterator<T> {
            private final @UnknownKeyFor @NonNull @Initialized LazyBlockingStateFetchingIterator underlyingStateFetchingIterator;
            private final  @UnknownKeyFor @NonNull @Initialized DataStreams.DataStreamDecoder<T> dataStreamDecoder;
            private @UnknownKeyFor @NonNull @Initialized Block<T> currentBlock;
            private @UnknownKeyFor @NonNull @Initialized int currentCachedBlockValueIndex;

            public CachingStateIterator() {
                this.underlyingStateFetchingIterator = new LazyBlockingStateFetchingIterator(CachingStateIterable.this.beamFnStateClient, CachingStateIterable.this.stateRequestForFirstChunk);
                this.dataStreamDecoder = new DataStreams.DataStreamDecoder(CachingStateIterable.this.valueCoder, this.underlyingStateFetchingIterator);
                this.currentBlock = Block.fromValues(Collections.emptyList(), CachingStateIterable.this.stateRequestForFirstChunk.getGet().getContinuationToken());
                this.currentCachedBlockValueIndex = 0;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized boolean isReady() {
                while (true) {
                    int currentBlockIndex;
                    if (this.currentBlock.getValues().size() > this.currentCachedBlockValueIndex) {
                        return true;
                    }
                    if (this.currentBlock.getNextToken() == null) {
                        return true;
                    }
                    Blocks existing = (Blocks)CachingStateIterable.this.cache.peek(IterableCacheKey.INSTANCE);
                    boolean isFirstBlock = ByteString.EMPTY.equals((Object)this.currentBlock.getNextToken());
                    if (existing == null) {
                        return false;
                    }
                    if (isFirstBlock) {
                        this.currentBlock = existing.getBlocks().get(0);
                        this.currentCachedBlockValueIndex = 0;
                        continue;
                    }
                    List blocks = existing.getBlocks();
                    for (currentBlockIndex = 0; currentBlockIndex < blocks.size() && !this.currentBlock.getNextToken().equals((Object)blocks.get(currentBlockIndex).getNextToken()); ++currentBlockIndex) {
                    }
                    if (currentBlockIndex + 1 >= blocks.size()) break;
                    this.currentBlock = blocks.get(currentBlockIndex + 1);
                    this.currentCachedBlockValueIndex = 0;
                }
                return false;
            }

            @Override
            public void prefetch() {
                if (!this.isReady()) {
                    this.underlyingStateFetchingIterator.seekToContinuationToken(this.currentBlock.getNextToken());
                    this.underlyingStateFetchingIterator.prefetch();
                }
            }

            @Override
            @Pure
            public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
                while (this.currentBlock.getValues().size() <= this.currentCachedBlockValueIndex) {
                    if (this.currentBlock.getNextToken() == null) {
                        return false;
                    }
                    Blocks existing = (Blocks)CachingStateIterable.this.cache.peek(IterableCacheKey.INSTANCE);
                    boolean isFirstBlock = ByteString.EMPTY.equals((Object)this.currentBlock.getNextToken());
                    if (existing == null) {
                        this.currentBlock = this.loadNextBlock(this.currentBlock.getNextToken());
                        if (isFirstBlock) {
                            CachingStateIterable.this.cache.put(IterableCacheKey.INSTANCE, new BlocksPrefix(Collections.singletonList(this.currentBlock)));
                        }
                    } else if (isFirstBlock) {
                        this.currentBlock = existing.getBlocks().get(0);
                    } else {
                        int currentBlockIndex;
                        Preconditions.checkState((boolean)(existing instanceof BlocksPrefix), (String)"Unexpected blocks type %s, expected a %s.", existing.getClass(), BlocksPrefix.class);
                        List blocks = existing.getBlocks();
                        for (currentBlockIndex = 0; currentBlockIndex < blocks.size() && !this.currentBlock.getNextToken().equals((Object)blocks.get(currentBlockIndex).getNextToken()); ++currentBlockIndex) {
                        }
                        if (currentBlockIndex + 1 < blocks.size()) {
                            this.currentBlock = blocks.get(currentBlockIndex + 1);
                        } else {
                            this.currentBlock = this.loadNextBlock(this.currentBlock.getNextToken());
                            if (currentBlockIndex == blocks.size() - 1) {
                                ArrayList newBlocks = new ArrayList(currentBlockIndex + 1);
                                newBlocks.addAll(blocks);
                                newBlocks.add(this.currentBlock);
                                CachingStateIterable.this.cache.put(IterableCacheKey.INSTANCE, new BlocksPrefix(newBlocks));
                            }
                        }
                    }
                    this.currentCachedBlockValueIndex = 0;
                }
                return true;
            }

            @VisibleForTesting
            @UnknownKeyFor @NonNull @Initialized Block<T> loadNextBlock(@UnknownKeyFor @NonNull @Initialized ByteString continuationToken) {
                this.underlyingStateFetchingIterator.seekToContinuationToken(continuationToken);
                List values = this.dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary();
                ByteString nextToken = this.underlyingStateFetchingIterator.getContinuationToken();
                if (ByteString.EMPTY.equals((Object)nextToken)) {
                    nextToken = null;
                }
                return Block.fromValues(values, nextToken);
            }

            @Override
            public T next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                return this.currentBlock.getValues().get(this.currentCachedBlockValueIndex++);
            }
        }

        @AutoValue
        static abstract class Block<@UnknownKeyFor T>
        implements Weighted {
            Block() {
            }

            public static <T> @UnknownKeyFor @NonNull @Initialized Block<T> mutatedBlock(@UnknownKeyFor @NonNull @Initialized List<T> values, @UnknownKeyFor @NonNull @Initialized long weight) {
                return new AutoValue_StateFetchingIterators_CachingStateIterable_Block<T>(values, null, weight);
            }

            public static <T> @UnknownKeyFor @NonNull @Initialized Block<T> fromValues(@UnknownKeyFor @NonNull @Initialized List<T> values, @javax.annotation.Nullable @UnknownKeyFor @Nullable @Initialized ByteString nextToken) {
                return new AutoValue_StateFetchingIterators_CachingStateIterable_Block<T>(values, nextToken, Caches.weigh(values) + Caches.weigh(nextToken));
            }

            abstract @UnknownKeyFor @NonNull @Initialized List<T> getValues();

            @javax.annotation.Nullable
            abstract @UnknownKeyFor @Nullable @Initialized ByteString getNextToken();

            public abstract @UnknownKeyFor @NonNull @Initialized long getWeight();
        }

        static class BlocksPrefix<@UnknownKeyFor T>
        extends Blocks<T>
        implements Cache.Shrinkable<BlocksPrefix<T>> {
            private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> blocks;

            public @UnknownKeyFor @NonNull @Initialized long getWeight() {
                return CachingStateIterable.sumWeight(this.blocks);
            }

            BlocksPrefix(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> blocks) {
                this.blocks = blocks;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized BlocksPrefix<T> shrink() {
                ArrayList<Block<T>> subList = new ArrayList<Block<T>>(this.getBlocks().subList(0, this.getBlocks().size() / 2));
                if (subList.isEmpty()) {
                    return null;
                }
                return new BlocksPrefix<T>(subList);
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> getBlocks() {
                return this.blocks;
            }
        }

        static class MutatedBlocks<@UnknownKeyFor T>
        extends Blocks<T> {
            private final @UnknownKeyFor @NonNull @Initialized Block<T> wholeBlock;

            MutatedBlocks(@UnknownKeyFor @NonNull @Initialized Block<T> wholeBlock) {
                this.wholeBlock = wholeBlock;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> getBlocks() {
                return Collections.singletonList(this.wholeBlock);
            }

            public @UnknownKeyFor @NonNull @Initialized long getWeight() {
                return this.wholeBlock.getWeight();
            }
        }

        static abstract class Blocks<@UnknownKeyFor T>
        implements Weighted {
            Blocks() {
            }

            public abstract @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> getBlocks();
        }
    }

    @VisibleForTesting
    static class IterableCacheKey
    implements Weighted {
        static final @UnknownKeyFor @NonNull @Initialized IterableCacheKey INSTANCE = new IterableCacheKey();

        private IterableCacheKey() {
        }

        public @UnknownKeyFor @NonNull @Initialized long getWeight() {
            return 0L;
        }
    }
}

