/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.LazyCachingIteratorToIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;

public class StateFetchingIterators {
    private StateFetchingIterators() {
    }

    public static PrefetchableIterator<ByteString> readAllStartingFrom(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequestForFirstChunk) {
        return new LazyBlockingStateFetchingIterator(beamFnStateClient, stateRequestForFirstChunk);
    }

    public static <T> PrefetchableIterable<T> readAllAndDecodeStartingFrom(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequestForFirstChunk, Coder<T> valueCoder) {
        return new FirstPageAndRemainder<T>(beamFnStateClient, stateRequestForFirstChunk, valueCoder);
    }

    @VisibleForTesting
    static class LazyBlockingStateFetchingIterator
    implements PrefetchableIterator<ByteString> {
        private final BeamFnStateClient beamFnStateClient;
        private final BeamFnApi.StateRequest stateRequestForFirstChunk;
        private State currentState = State.READ_REQUIRED;
        private ByteString continuationToken;
        private ByteString next;
        private CompletableFuture<BeamFnApi.StateResponse> prefetchedResponse;

        LazyBlockingStateFetchingIterator(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequestForFirstChunk) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequestForFirstChunk;
            this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
        }

        @Override
        public boolean isReady() {
            if (this.prefetchedResponse == null) {
                return this.currentState != State.READ_REQUIRED;
            }
            return this.prefetchedResponse.isDone();
        }

        @Override
        public void prefetch() {
            if (this.currentState == State.READ_REQUIRED && this.prefetchedResponse == null) {
                this.prefetchedResponse = this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(this.continuationToken)));
            }
        }

        @Override
        public boolean hasNext() {
            switch (this.currentState) {
                case EOF: {
                    return false;
                }
                case READ_REQUIRED: {
                    BeamFnApi.StateResponse stateResponse;
                    this.prefetch();
                    try {
                        stateResponse = this.prefetchedResponse.get();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException(e);
                    }
                    catch (ExecutionException e) {
                        if (e.getCause() == null) {
                            throw new IllegalStateException(e);
                        }
                        Throwables.throwIfUnchecked(e.getCause());
                        throw new IllegalStateException(e.getCause());
                    }
                    this.prefetchedResponse = null;
                    this.continuationToken = stateResponse.getGet().getContinuationToken();
                    this.next = stateResponse.getGet().getData();
                    this.currentState = State.HAS_NEXT;
                    return true;
                }
                case HAS_NEXT: {
                    return true;
                }
            }
            throw new IllegalStateException(String.format("Unknown state %s", new Object[]{this.currentState}));
        }

        @Override
        public ByteString next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            if (ByteString.EMPTY.equals(this.continuationToken)) {
                this.currentState = State.EOF;
            } else {
                this.currentState = State.READ_REQUIRED;
                this.prefetch();
            }
            return this.next;
        }

        private static enum State {
            READ_REQUIRED,
            HAS_NEXT,
            EOF;

        }
    }

    @VisibleForTesting
    static class FirstPageAndRemainder<T>
    implements PrefetchableIterable<T> {
        private final BeamFnStateClient beamFnStateClient;
        private final BeamFnApi.StateRequest stateRequestForFirstChunk;
        private final Coder<T> valueCoder;
        private LazyCachingIteratorToIterable<T> firstPage;
        private CompletableFuture<BeamFnApi.StateResponse> firstPageResponseFuture;
        private ByteString continuationToken;

        FirstPageAndRemainder(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequestForFirstChunk, Coder<T> valueCoder) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequestForFirstChunk;
            this.valueCoder = valueCoder;
        }

        @Override
        public PrefetchableIterator<T> iterator() {
            return new PrefetchableIterator<T>(){
                PrefetchableIterator<T> delegate;

                private void ensureDelegateExists() {
                    if (this.delegate == null) {
                        this.prefetchFirstPage();
                        if (firstPage == null) {
                            BeamFnApi.StateResponse stateResponse;
                            try {
                                stateResponse = (BeamFnApi.StateResponse)firstPageResponseFuture.get();
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new IllegalStateException(e);
                            }
                            catch (ExecutionException e) {
                                if (e.getCause() == null) {
                                    throw new IllegalStateException(e);
                                }
                                Throwables.throwIfUnchecked(e.getCause());
                                throw new IllegalStateException(e.getCause());
                            }
                            continuationToken = stateResponse.getGet().getContinuationToken();
                            firstPage = new LazyCachingIteratorToIterable(new DataStreams.DataStreamDecoder(valueCoder, PrefetchableIterators.fromArray(stateResponse.getGet().getData())));
                        }
                        this.delegate = ByteString.EMPTY.equals(continuationToken) ? firstPage.iterator() : PrefetchableIterators.concat(firstPage.iterator(), new DataStreams.DataStreamDecoder(valueCoder, new LazyBlockingStateFetchingIterator(beamFnStateClient, stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(continuationToken)).build())));
                    }
                }

                @Override
                public boolean isReady() {
                    if (this.delegate == null) {
                        if (firstPageResponseFuture != null) {
                            return firstPageResponseFuture.isDone();
                        }
                        return false;
                    }
                    return this.delegate.isReady();
                }

                @Override
                public void prefetch() {
                    if (firstPageResponseFuture == null) {
                        this.prefetchFirstPage();
                    } else if (this.delegate != null && !this.delegate.isReady()) {
                        this.delegate.prefetch();
                    }
                }

                @Override
                public boolean hasNext() {
                    if (this.delegate == null) {
                        this.ensureDelegateExists();
                        boolean rval = this.delegate.hasNext();
                        this.delegate.prefetch();
                        return rval;
                    }
                    return this.delegate.hasNext();
                }

                @Override
                public T next() {
                    if (this.delegate == null) {
                        this.ensureDelegateExists();
                        Object rval = this.delegate.next();
                        this.delegate.prefetch();
                        return rval;
                    }
                    return this.delegate.next();
                }
            };
        }

        private void prefetchFirstPage() {
            if (this.firstPageResponseFuture == null) {
                this.firstPageResponseFuture = this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(this.stateRequestForFirstChunk.getGet()));
            }
        }
    }
}

