/*
 * Decompiled with CFR 0.152.
 */
package org.brackit.xquery.operator;

import java.util.ArrayList;
import org.brackit.xquery.QueryContext;
import org.brackit.xquery.QueryException;
import org.brackit.xquery.Tuple;
import org.brackit.xquery.operator.Cursor;
import org.brackit.xquery.operator.Operator;

public class BlockingParallelizer
implements Operator {
    private final Operator in;

    public BlockingParallelizer(Operator in) {
        this.in = in;
    }

    @Override
    public Cursor create(QueryContext ctx, Tuple tuple) throws QueryException {
        return new BlockingParallelizerCursor(this.in.create(ctx, tuple), ctx);
    }

    @Override
    public Cursor create(QueryContext ctx, Tuple[] buf, int len) throws QueryException {
        return new BlockingParallelizerCursor(this.in.create(ctx, buf, len), ctx);
    }

    @Override
    public int tupleWidth(int initSize) {
        return this.in.tupleWidth(initSize);
    }

    private static class BlockingParallelizerCursor
    implements Cursor {
        private final Cursor c;
        private final QueryContext ctx;
        private volatile boolean finished;
        private volatile QueryException error;
        private Tuple current;
        private Tuple[] currentBuffer;
        private Tuple[][] queue;
        private int start;
        private int end;
        private int pos = 0;
        private long producerBlock;
        private long consumerBlock;
        private ArrayList<Long> firstConsumerBlock = new ArrayList();

        BlockingParallelizerCursor(Cursor c, QueryContext ctx) {
            this.c = c;
            this.ctx = ctx;
        }

        @Override
        public void open(QueryContext ctx) throws QueryException {
            int noOfBuffers = 3;
            this.queue = new Tuple[noOfBuffers][1000];
            this.finished = false;
            this.start = noOfBuffers - 1;
            this.end = 0;
            new Thread(){

                @Override
                public void run() {
                    this.fill();
                }
            }.start();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void fill() {
            try {
                Tuple t;
                this.c.open(this.ctx);
                int pos = 0;
                Tuple[] buffer = this.queue[0];
                int length = buffer.length;
                while ((t = this.c.next(this.ctx)) != null) {
                    buffer[pos++] = t;
                    if (pos != length) continue;
                    if (this.finished) break;
                    buffer = this.enqueue();
                    length = buffer.length;
                    pos = 0;
                }
                this.enqueue();
                this.finished = true;
            }
            catch (QueryException e) {
                this.error = e;
                this.finished = true;
            }
            finally {
                this.c.close(this.ctx);
            }
        }

        private synchronized Tuple[] enqueue() {
            int newQueueEnd = (this.end + 1) % this.queue.length;
            while (newQueueEnd == this.start) {
                long start = System.currentTimeMillis();
                try {
                    this.wait();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                long end = System.currentTimeMillis();
                this.producerBlock += end - start;
            }
            this.end = newQueueEnd;
            this.notifyAll();
            return this.queue[newQueueEnd];
        }

        private synchronized Tuple[] dequeue() throws QueryException {
            QueryException deliverError = this.error;
            if (deliverError != null) {
                this.error = null;
                throw deliverError;
            }
            int newQueueStart = (this.start + 1) % this.queue.length;
            while (newQueueStart == this.end) {
                long start = System.currentTimeMillis();
                try {
                    this.wait();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                long end = System.currentTimeMillis();
                this.consumerBlock += end - start;
            }
            this.start = newQueueStart;
            this.notifyAll();
            return this.queue[newQueueStart];
        }

        @Override
        public void close(QueryContext ctx) {
            this.finished = true;
        }

        @Override
        public Tuple next(QueryContext ctx) throws QueryException {
            if (this.currentBuffer == null || this.pos == this.currentBuffer.length) {
                this.currentBuffer = this.dequeue();
                this.pos = 0;
            }
            this.current = this.currentBuffer[this.pos];
            this.currentBuffer[this.pos++] = null;
            if (this.current == null) {
                return null;
            }
            Tuple deliver = this.current;
            this.current = null;
            return deliver;
        }
    }
}

