/*
 * Decompiled with CFR 0.152.
 */
package hu.akarnokd.asyncenum;

import hu.akarnokd.asyncenum.AsyncEnumerable;
import hu.akarnokd.asyncenum.AsyncEnumerator;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

final class AsyncFlatMap<T, R>
implements AsyncEnumerable<R> {
    final AsyncEnumerable<T> upstream;
    final Function<? super T, ? extends AsyncEnumerable<? extends R>> mapper;

    AsyncFlatMap(AsyncEnumerable<T> upstream, Function<? super T, ? extends AsyncEnumerable<? extends R>> mapper) {
        this.upstream = upstream;
        this.mapper = mapper;
    }

    @Override
    public AsyncEnumerator<R> enumerator() {
        FlatMapEnumerator en = new FlatMapEnumerator(this.upstream.enumerator(), this.mapper);
        en.moveNextUpstream();
        return en;
    }

    static final class FlatMapEnumerator<T, R>
    implements AsyncEnumerator<R>,
    BiConsumer<Boolean, Throwable> {
        final AsyncEnumerator<T> upstream;
        final Function<? super T, ? extends AsyncEnumerable<? extends R>> mapper;
        final Queue<InnerAsyncEnumerator<R>> queue;
        final AtomicReference<CompletableFuture<Boolean>> next;
        final AtomicInteger wip;
        final AtomicInteger active;
        final ConcurrentMap<InnerAsyncEnumerator<R>, Object> inners;
        final AtomicInteger upstreamWip;
        final AtomicReference<Throwable> error;
        R current;
        volatile boolean cancelled;

        FlatMapEnumerator(AsyncEnumerator<T> upstream, Function<? super T, ? extends AsyncEnumerable<? extends R>> mapper) {
            this.upstream = upstream;
            this.mapper = mapper;
            this.queue = new ConcurrentLinkedQueue<InnerAsyncEnumerator<R>>();
            this.next = new AtomicReference();
            this.wip = new AtomicInteger();
            this.active = new AtomicInteger(1);
            this.inners = new ConcurrentHashMap<InnerAsyncEnumerator<R>, Object>();
            this.upstreamWip = new AtomicInteger();
            this.error = new AtomicReference();
        }

        @Override
        public CompletionStage<Boolean> moveNext() {
            CompletableFuture<Boolean> nx = new CompletableFuture<Boolean>();
            this.next.set(nx);
            this.drain();
            return nx;
        }

        @Override
        public R current() {
            return this.current;
        }

        void drain() {
            if (this.wip.getAndIncrement() != 0) {
                return;
            }
            do {
                CompletableFuture<Boolean> nx;
                if ((nx = this.next.get()) == null) continue;
                if (this.error.get() != null) {
                    nx.completeExceptionally(this.error.get());
                    return;
                }
                int n = this.active.get();
                InnerAsyncEnumerator<R> inner = this.queue.peek();
                if (n == 0 && inner == null) {
                    nx.complete(false);
                    return;
                }
                if (inner == null) continue;
                this.queue.poll();
                this.next.set(null);
                this.current = inner.current();
                nx.complete(true);
                inner.moveNext();
            } while (this.wip.decrementAndGet() != 0);
        }

        void hasNext(InnerAsyncEnumerator<R> inner) {
            this.queue.offer(inner);
            this.drain();
        }

        void finish(InnerAsyncEnumerator<R> inner) {
            this.inners.remove(inner);
            this.active.decrementAndGet();
            this.drain();
        }

        void error(InnerAsyncEnumerator<R> inner, Throwable ex) {
            this.error.compareAndSet(null, ex);
            this.cancel();
            this.active.decrementAndGet();
            this.drain();
        }

        void moveNextUpstream() {
            if (this.upstreamWip.getAndIncrement() == 0) {
                do {
                    this.upstream.moveNext().whenComplete(this);
                } while (this.upstreamWip.decrementAndGet() != 0);
            }
        }

        void cancelAllInner() {
            for (InnerAsyncEnumerator inner : this.inners.keySet()) {
                inner.cancel();
            }
        }

        @Override
        public void accept(Boolean aBoolean, Throwable throwable) {
            if (throwable != null) {
                this.error.compareAndSet(null, throwable);
                this.cancelAllInner();
                this.active.decrementAndGet();
                this.drain();
                return;
            }
            if (aBoolean.booleanValue()) {
                T t = this.upstream.current();
                AsyncEnumerator<? extends R> ae = this.mapper.apply(t).enumerator();
                InnerAsyncEnumerator<R> inner = new InnerAsyncEnumerator<R>(ae, this);
                this.inners.put(inner, inner);
                if (this.cancelled) {
                    this.inners.remove(inner);
                    inner.cancel();
                } else {
                    this.active.getAndIncrement();
                    inner.moveNext();
                    this.moveNextUpstream();
                }
            } else {
                this.active.decrementAndGet();
                this.drain();
            }
        }

        @Override
        public void cancel() {
            this.cancelled = true;
            this.upstream.cancel();
            this.cancelAllInner();
        }

        static final class InnerAsyncEnumerator<R>
        extends AtomicInteger
        implements BiConsumer<Boolean, Throwable> {
            final AsyncEnumerator<? extends R> source;
            final FlatMapEnumerator<?, R> parent;

            InnerAsyncEnumerator(AsyncEnumerator<? extends R> source, FlatMapEnumerator<?, R> parent) {
                this.source = source;
                this.parent = parent;
            }

            R current() {
                return this.source.current();
            }

            void moveNext() {
                if (this.getAndIncrement() == 0) {
                    do {
                        this.source.moveNext().whenComplete(this);
                    } while (this.decrementAndGet() != 0);
                }
            }

            @Override
            public void accept(Boolean hasMore, Throwable throwable) {
                if (throwable != null) {
                    this.parent.error(this, throwable);
                    return;
                }
                if (hasMore.booleanValue()) {
                    this.parent.hasNext(this);
                } else {
                    this.parent.finish(this);
                }
            }

            void cancel() {
                this.source.cancel();
            }
        }
    }
}

