/*
 * Decompiled with CFR 0.152.
 */
package net.tascalate.concurrent;

import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import net.tascalate.concurrent.SharedFunctions;
import net.tascalate.concurrent.Try;

public class AsyncCompletions<T>
implements Iterator<T>,
AutoCloseable {
    private final int chunkSize;
    private final Cancel cancelStrategy;
    private final Iterator<? extends CompletionStage<? extends T>> pendingPromises;
    private final BlockingQueue<Try<T>> settledResults;
    private final Set<CompletionStage<?>> enlistedPromises;
    private final AtomicInteger inProgress = new AtomicInteger(0);

    AsyncCompletions(Iterator<? extends CompletionStage<? extends T>> pendingValues, int chunkSize) {
        this(pendingValues, chunkSize, Cancel.NONE);
    }

    protected AsyncCompletions(Iterator<? extends CompletionStage<? extends T>> pendingValues, int chunkSize, Cancel cancelStrategy) {
        this.chunkSize = chunkSize;
        this.cancelStrategy = cancelStrategy == null ? Cancel.NONE : cancelStrategy;
        this.pendingPromises = pendingValues;
        this.settledResults = chunkSize > 0 ? new LinkedBlockingQueue<Try<T>>(chunkSize) : new LinkedBlockingQueue();
        this.enlistedPromises = Collections.newSetFromMap(new ConcurrentHashMap());
    }

    @Override
    public boolean hasNext() {
        int unprocessed = this.inProgress.get();
        if (unprocessed < 0) {
            return false;
        }
        if (!this.settledResults.isEmpty()) {
            return true;
        }
        if (unprocessed > 0) {
            return true;
        }
        return this.enlistPending();
    }

    @Override
    public T next() {
        do {
            int unprocessed;
            if ((unprocessed = this.inProgress.get()) < 0) {
                throw new NoSuchElementException("This sequence was closed");
            }
            if (!this.settledResults.isEmpty()) {
                return (T)((Try)this.settledResults.poll()).done();
            }
            if (unprocessed <= 0) continue;
            try {
                return this.settledResults.take().done();
            }
            catch (InterruptedException ex) {
                throw new NoSuchElementException(ex.getMessage());
            }
        } while (this.enlistPending());
        throw new NoSuchElementException();
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void close() {
        this.inProgress.set(Integer.MIN_VALUE);
        this.settledResults.clear();
        this.cancelStrategy.apply(this.enlistedPromises, this.pendingPromises);
    }

    public static <T> Iterator<T> iterate(Stream<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize) {
        return AsyncCompletions.iterate(pendingPromises.iterator(), chunkSize);
    }

    public static <T> Iterator<T> iterate(Iterable<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize) {
        return AsyncCompletions.iterate(pendingPromises.iterator(), chunkSize);
    }

    private static <T> Iterator<T> iterate(Iterator<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize) {
        return new AsyncCompletions<T>(pendingPromises, chunkSize);
    }

    public static <T> Stream<T> stream(Stream<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize) {
        return AsyncCompletions.stream(pendingPromises, chunkSize, Cancel.ENLISTED);
    }

    public static <T> Stream<T> stream(Stream<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize, Cancel cancelOption) {
        return AsyncCompletions.stream(pendingPromises.iterator(), chunkSize, cancelOption);
    }

    public static <T> Stream<T> stream(Iterable<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize) {
        return AsyncCompletions.stream(pendingPromises, chunkSize, Cancel.ENLISTED);
    }

    public static <T> Stream<T> stream(Iterable<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize, Cancel cancelOption) {
        return AsyncCompletions.stream(pendingPromises.iterator(), chunkSize, cancelOption);
    }

    private static <T> Stream<T> stream(Iterator<? extends CompletionStage<? extends T>> pendingPromises, int chunkSize, Cancel cancelOption) {
        return AsyncCompletions.toStream(new AsyncCompletions<T>(pendingPromises, chunkSize, cancelOption));
    }

    private static <T> Stream<T> toStream(AsyncCompletions<T> iterator) {
        return (Stream)StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false).onClose(iterator::close);
    }

    private boolean enlistPending() {
        int isClosed;
        boolean enlisted = false;
        int i = 0;
        while (this.pendingPromises.hasNext() && (isClosed = this.inProgress.getAndIncrement()) >= 0) {
            CompletionStage<? extends T> nextPromise = this.pendingPromises.next();
            this.enlistedPromises.add(nextPromise);
            nextPromise.whenComplete(this.enlistResolved(nextPromise));
            enlisted = true;
            if (this.chunkSize <= 0 || ++i < this.chunkSize) continue;
            break;
        }
        return enlisted;
    }

    private BiConsumer<T, Throwable> enlistResolved(CompletionStage<? extends T> promise) {
        return (v, ex) -> {
            this.enlistedPromises.remove(promise);
            this.enlistResolved((T)v, (Throwable)ex);
        };
    }

    private void enlistResolved(T resolvedValue, Throwable ex) {
        try {
            if (ex == null) {
                this.settledResults.put(Try.success(resolvedValue));
            } else {
                this.settledResults.put(Try.failure(ex));
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        this.inProgress.decrementAndGet();
    }

    public static enum Cancel {
        NONE{

            @Override
            void apply(Set<CompletionStage<?>> enlistedPromises, Iterator<? extends CompletionStage<?>> pendingPromises) {
            }
        }
        ,
        ENLISTED{

            @Override
            void apply(Set<CompletionStage<?>> enlistedPromises, Iterator<? extends CompletionStage<?>> pendingPromises) {
                enlistedPromises.forEach(p -> SharedFunctions.cancelPromise(p, true));
            }
        }
        ,
        ALL{

            @Override
            void apply(Set<CompletionStage<?>> enlistedPromises, Iterator<? extends CompletionStage<?>> pendingPromises) {
                ENLISTED.apply(enlistedPromises, pendingPromises);
                while (pendingPromises.hasNext()) {
                    CompletionStage<?> nextPromise = pendingPromises.next();
                    SharedFunctions.cancelPromise(nextPromise, true);
                }
            }
        };


        abstract void apply(Set<CompletionStage<?>> var1, Iterator<? extends CompletionStage<?>> var2);
    }
}

