/*
 * Decompiled with CFR 0.152.
 */
package org.brackit.xquery.node.stream;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.brackit.xquery.xdm.DocumentException;
import org.brackit.xquery.xdm.Stream;

public class ParallelCLQStream<E>
implements Stream<E> {
    private final Stream<? extends E> stream;
    private final ConcurrentLinkedQueue<E> queue;
    private volatile boolean finished;
    private volatile DocumentException error;
    private int outerRetry;
    private int innerRetry;
    private int takes;

    public ParallelCLQStream(Stream<? extends E> stream) {
        this.stream = stream;
        this.queue = new ConcurrentLinkedQueue();
        this.finished = false;
        final Stream<? extends E> s = stream;
        new Thread(){

            @Override
            public void run() {
                System.out.println("internal starting");
                try {
                    Object next;
                    block2: while ((next = s.next()) != null) {
                        while (!ParallelCLQStream.this.queue.offer(next)) {
                            ++ParallelCLQStream.this.innerRetry;
                            if (!ParallelCLQStream.this.finished) continue;
                            continue block2;
                        }
                    }
                    ParallelCLQStream.this.finished = true;
                    s.close();
                }
                catch (DocumentException e) {
                    ParallelCLQStream.this.error = e;
                    ParallelCLQStream.this.finished = true;
                }
                System.out.println("internal stopping");
            }
        }.start();
    }

    @Override
    public void close() {
        this.finished = true;
        System.out.println("Inner retry " + this.innerRetry);
        System.out.println("Outer retry " + this.outerRetry);
        System.out.println("Takes: " + this.takes);
    }

    @Override
    public E next() throws DocumentException {
        E current;
        DocumentException deliverError = this.error;
        if (deliverError != null) {
            this.error = null;
            throw deliverError;
        }
        while ((current = this.queue.poll()) == null) {
            ++this.outerRetry;
            if (!this.finished) continue;
        }
        ++this.takes;
        return current;
    }
}

