/*
 * Decompiled with CFR 0.152.
 */
package org.bsc.async;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import org.bsc.async.AsyncGenerator;

public class AsyncGeneratorQueue {
    public static <E, Q extends BlockingQueue<AsyncGenerator.Data<E>>> AsyncGenerator<E> of(Q queue, Consumer<Q> consumer) {
        return AsyncGeneratorQueue.of(queue, consumer, ForkJoinPool.commonPool());
    }

    public static <E, Q extends BlockingQueue<AsyncGenerator.Data<E>>> AsyncGenerator<E> of(Q queue, Consumer<Q> consumer, Executor executor) {
        Objects.requireNonNull(queue);
        Objects.requireNonNull(executor);
        Objects.requireNonNull(consumer);
        executor.execute(() -> {
            try {
                consumer.accept(queue);
            }
            catch (Throwable ex) {
                CompletableFuture error = new CompletableFuture();
                error.completeExceptionally(ex);
                queue.add(AsyncGenerator.Data.of(error));
            }
            finally {
                queue.add(AsyncGenerator.Data.done());
            }
        });
        return new Generator(queue);
    }

    public static class Generator<E>
    extends AsyncGenerator.BaseCancellable<E> {
        private volatile Thread executorThread = null;
        private volatile AsyncGenerator.Data<E> endData = null;
        private final BlockingQueue<AsyncGenerator.Data<E>> queue;

        public Generator(BlockingQueue<AsyncGenerator.Data<E>> queue) {
            this.queue = queue;
        }

        public BlockingQueue<AsyncGenerator.Data<E>> queue() {
            return this.queue;
        }

        private boolean isEnded() {
            return this.endData != null;
        }

        @Override
        public AsyncGenerator.Data<E> next() {
            if (this.isEnded()) {
                return this.endData;
            }
            if (this.executorThread != null) {
                this.endData = AsyncGenerator.Data.error(new IllegalStateException("illegal concurrent next() invocation"));
                return this.endData;
            }
            this.executorThread = Thread.currentThread();
            try {
                AsyncGenerator.Data<E> value = this.queue.take();
                if (value.isDone()) {
                    this.endData = value;
                }
                AsyncGenerator.Data<E> data = value;
                return data;
            }
            catch (InterruptedException e) {
                AsyncGenerator.Data<E> data = this.endData = AsyncGenerator.Data.done(CANCELLED);
                return data;
            }
            finally {
                this.executorThread = null;
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (super.cancel(mayInterruptIfRunning)) {
                if (this.executorThread != null) {
                    this.executorThread.interrupt();
                }
                return true;
            }
            return false;
        }
    }
}

