/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.relation.accesspath;

import cern.colt.Arrays;
import com.bigdata.bop.engine.QueryTimeoutException;
import com.bigdata.relation.accesspath.BufferClosedException;
import com.bigdata.relation.accesspath.ChunkMergeSortHelper;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBlockingBuffer;
import com.bigdata.util.InnerCause;
import java.lang.reflect.Array;
import java.nio.channels.ClosedByInterruptException;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

public class BlockingBuffer<E>
implements IBlockingBuffer<E> {
    protected static final Logger log = Logger.getLogger(BlockingBuffer.class);
    private static final long initialLogTimeout = TimeUnit.NANOSECONDS.convert(2000L, TimeUnit.MILLISECONDS);
    private static final long maxLogTimeout = TimeUnit.NANOSECONDS.convert(10000L, TimeUnit.MILLISECONDS);
    private static final int NSPIN_ADD = Integer.valueOf(System.getProperty(BlockingBuffer.class.getName() + ".NSPIN.ADD", "100"));
    private static final int NSPIN_READ = Integer.valueOf(System.getProperty(BlockingBuffer.class.getName() + ".NSPIN.READ", "100"));
    private volatile boolean open = true;
    private final ReentrantLock lock = new ReentrantLock();
    private volatile Throwable cause = null;
    private final BlockingQueue<E> queue;
    private final IAsynchronousIterator<E> iterator;
    public static final transient int DEFAULT_PRODUCER_QUEUE_CAPACITY = 100;
    public static final transient int DEFAULT_MINIMUM_CHUNK_SIZE = 100;
    public static final transient long DEFAULT_CONSUMER_CHUNK_TIMEOUT = 20L;
    public static final transient TimeUnit DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
    private final int minimumChunkSize;
    private final long chunkTimeout;
    private final boolean ordered;
    private volatile Future future;
    private static final boolean producerConsumerWarnings = false;
    private static final transient String MSG_ALLOCATION_STACK_FRAME = "Buffer Allocation Stack Frame";
    private static final transient String MSG_CLOSED_STACK_FRAME = "Buffer Closed Stack Frame";
    private static final transient String MSG_PRODUCER_STACK_FRAME = "Buffer Producer Stack Frame";
    private static final transient String MSG_CONSUMER_STACK_FRAME = "Buffer Consumer Stack Frame";
    private RuntimeException openStackFrame;
    private RuntimeException closeStackFrame;
    private long chunksAddedCount = 0L;
    private long elementsAddedCount = 0L;
    private AtomicLong elementsOnQueueCount = new AtomicLong(0L);
    private long chunksDrainedCount = 0L;
    private long elementsDrainedCount = 0L;

    private static final long getTimeoutMillis(int ntries) {
        return ntries < 500 ? 10L : (ntries < 1000 ? 100L : 250L);
    }

    public final int getMinimumChunkSize() {
        return this.minimumChunkSize;
    }

    public final long getChunkTimeout() {
        return this.chunkTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setFuture(Future future) {
        BlockingBuffer blockingBuffer = this;
        synchronized (blockingBuffer) {
            if (future == null) {
                throw new IllegalArgumentException();
            }
            if (this.future != null) {
                throw new IllegalStateException();
            }
            this.future = future;
        }
    }

    @Override
    public Future getFuture() {
        return this.future;
    }

    public BlockingBuffer() {
        this(100);
    }

    public BlockingBuffer(int capacity) {
        this(capacity, 100, 20L, DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT);
    }

    public BlockingBuffer(int capacity, int minimumChunkSize, long chunkTimeout, TimeUnit chunkTimeoutUnit) {
        this((BlockingQueue)((Object)(capacity == 0 ? new SynchronousQueue() : new LinkedBlockingDeque(capacity))), minimumChunkSize, chunkTimeout, chunkTimeoutUnit, false);
    }

    public BlockingBuffer(BlockingQueue<E> queue, int minimumChunkSize, long chunkTimeout, TimeUnit chunkTimeoutUnit, boolean ordered) {
        if (queue == null) {
            throw new IllegalArgumentException();
        }
        if (minimumChunkSize < 0) {
            throw new IllegalArgumentException();
        }
        if (chunkTimeout < 0L) {
            throw new IllegalArgumentException();
        }
        if (chunkTimeoutUnit == null) {
            throw new IllegalArgumentException();
        }
        this.queue = queue;
        this.minimumChunkSize = minimumChunkSize;
        this.chunkTimeout = TimeUnit.NANOSECONDS.convert(chunkTimeout, chunkTimeoutUnit);
        this.ordered = ordered;
        this.iterator = new BlockingIterator();
        if (log.isInfoEnabled()) {
            this.openStackFrame = new RuntimeException(MSG_ALLOCATION_STACK_FRAME);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(super.toString());
        sb.append("{ open=" + this.open);
        sb.append(", hasFuture=" + (this.future != null));
        sb.append(", elementsAddedCount=" + this.elementsAddedCount);
        sb.append(", chunksAddedCount=" + this.chunksAddedCount);
        sb.append(", chunksDrainedCount=" + this.chunksDrainedCount);
        sb.append(", elementsDrainedCount=" + this.elementsDrainedCount);
        sb.append(", size~=" + this.queue.size());
        sb.append(", remainingCapacity~=" + this.queue.remainingCapacity());
        if (this.cause != null) {
            sb.append(", cause=" + this.cause);
        }
        sb.append("}");
        return sb.toString();
    }

    private void assertOpen() {
        if (!this.open) {
            if (this.openStackFrame != null) {
                log.warn((Object)this.openStackFrame);
            }
            if (this.closeStackFrame != null) {
                log.warn((Object)this.closeStackFrame);
            }
            if (this.cause != null) {
                throw new BufferClosedException(this.cause);
            }
            throw new BufferClosedException();
        }
    }

    @Override
    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    @Override
    public int size() {
        return this.queue.size();
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    public boolean isOrdered() {
        return this.ordered;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void close() {
        if (this.open) {
            try {
                if (log.isInfoEnabled()) {
                    this.closeStackFrame = new RuntimeException(MSG_CLOSED_STACK_FRAME);
                }
            }
            finally {
                this.open = false;
            }
        }
    }

    public void clear() throws IllegalStateException {
        if (this.open) {
            throw new IllegalStateException();
        }
        if (log.isInfoEnabled()) {
            log.info((Object)"");
        }
        this.queue.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void abort(Throwable cause) {
        if (cause == null) {
            throw new IllegalArgumentException();
        }
        if (log.isInfoEnabled()) {
            log.info((Object)("cause=" + cause), cause);
        }
        BlockingBuffer blockingBuffer = this;
        synchronized (blockingBuffer) {
            if (this.cause != null) {
                log.warn((Object)("Already aborted with cause=" + this.cause));
                return;
            }
            this.cause = cause;
            this.open = false;
        }
    }

    public long getChunksAddedCount() {
        return this.chunksAddedCount;
    }

    public long getElementsAddedCount() {
        return this.elementsAddedCount;
    }

    public long getElementsOnQueueCount() {
        return this.elementsOnQueueCount.get();
    }

    @Override
    public void add(E e) {
        try {
            if (!this.add(e, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
                throw new AssertionError();
            }
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    private boolean combineChunkOnAdd(E e) {
        assert (this.lock.isHeldByCurrentThread());
        BlockingDeque deque = (BlockingDeque)this.queue;
        Object t = deque.peekLast();
        if (t != null && ((Object[])t).length + ((Object[])e).length <= this.minimumChunkSize && (t = deque.pollLast()) != null) {
            if (((Object[])t).length + ((Object[])e).length <= this.minimumChunkSize) {
                t = BlockingBuffer.combineChunks(t, e);
                if (this.ordered) {
                    ChunkMergeSortHelper.mergeSort((Object[])t);
                }
                deque.add(t);
                ++this.chunksAddedCount;
                this.elementsAddedCount += (long)((Object[])e).length;
                this.elementsOnQueueCount.addAndGet(((Object[])e).length);
                if (log.isDebugEnabled()) {
                    log.debug((Object)("add/combined chunk: len=" + ((Object[])e).length + ", combined length=" + ((Object[])t).length));
                }
                return true;
            }
            deque.add(t);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean add(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (e == null) {
            throw new IllegalArgumentException();
        }
        if (e.getClass().getComponentType() != null && ((Object[])e).length == 0) {
            if (log.isInfoEnabled()) {
                log.info((Object)"Empty chunk.");
            }
            return true;
        }
        long begin = System.nanoTime();
        int ntries = 0;
        long logTimeout = initialLogTimeout;
        boolean didCombine = false;
        long nanos = unit.toNanos(timeout);
        long lastTime = begin;
        long now2;
        while ((nanos -= (now2 = System.nanoTime()) - (lastTime = now2)) > 0L) {
            if (!this.lock.tryLock(nanos, TimeUnit.NANOSECONDS)) continue;
            try {
                boolean added;
                this.assertOpen();
                if (!didCombine && this.minimumChunkSize > 0 && this.queue instanceof BlockingDeque && e.getClass().getComponentType() != null) {
                    if (this.combineChunkOnAdd(e)) {
                        boolean now2 = true;
                        return now2;
                    }
                    didCombine = true;
                }
                if (ntries < NSPIN_ADD) {
                    added = this.queue.offer(e);
                } else {
                    long now3 = System.nanoTime();
                    nanos -= now3 - lastTime;
                    lastTime = now3;
                    long offerTimeoutNanos = Math.min(nanos, TimeUnit.MILLISECONDS.toNanos(BlockingBuffer.getTimeoutMillis(ntries)));
                    try {
                        added = this.queue.offer(e, offerTimeoutNanos, TimeUnit.NANOSECONDS);
                    }
                    catch (InterruptedException ex) {
                        this.abort(ex);
                        throw new RuntimeException("Buffer closed by interrupt", ex);
                    }
                }
                if (!added) {
                    ++ntries;
                    long elapsed = System.nanoTime() - begin;
                    if (elapsed < logTimeout) continue;
                    logTimeout += Math.min(maxLogTimeout, logTimeout);
                    continue;
                }
                if (e.getClass().getComponentType() != null) {
                    ++this.chunksAddedCount;
                    this.elementsAddedCount += (long)((Object[])e).length;
                    this.elementsOnQueueCount.addAndGet(((Object[])e).length);
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("added chunk: len=" + ((Object[])e).length));
                    }
                } else {
                    ++this.elementsAddedCount;
                    this.elementsOnQueueCount.incrementAndGet();
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("added: " + e.toString()));
                    }
                }
                boolean bl = true;
                return bl;
            }
            finally {
                this.lock.unlock();
                continue;
            }
            break;
        }
        return false;
    }

    @Override
    public long flush() {
        return 0L;
    }

    @Override
    public void reset() {
        this.queue.clear();
    }

    @Override
    public IAsynchronousIterator<E> iterator() {
        return this.iterator;
    }

    public static <E> E combineChunks(E chunk1, E chunk2) {
        Object[] e1 = (Object[])chunk1;
        if (e1.length == 0) {
            return chunk2;
        }
        Object[] e2 = (Object[])chunk2;
        if (e2.length == 0) {
            return chunk1;
        }
        assert (e1[0] != null);
        assert (e2[0] != null);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Combining chunks: e.length=" + e1.length + ", t.length=" + e2.length));
        }
        int chunkSize = e1.length + e2.length;
        Object[] a = (Object[])Array.newInstance(e1.getClass().getComponentType(), chunkSize);
        System.arraycopy(e1, 0, a, 0, e1.length);
        System.arraycopy(e2, 0, a, e1.length, e2.length);
        assert (a[0] != null);
        return (E)a;
    }

    protected class BlockingIterator
    implements IAsynchronousIterator<E> {
        private boolean open = true;
        private E nextE = null;
        private boolean futureIsDone = false;

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("BlockingIterator");
            sb.append("{ open=" + this.open);
            sb.append(", futureIsDone=" + this.futureIsDone);
            sb.append(", bufferIsOpen=" + BlockingBuffer.this.open);
            sb.append(", nextE=" + (this.nextE != null));
            sb.append("}");
            return sb.toString();
        }

        private BlockingIterator() {
            if (log.isInfoEnabled()) {
                log.info((Object)"Starting iterator.");
            }
        }

        private void _close() {
            if (log.isDebugEnabled()) {
                log.debug((Object)"");
            }
            if (!this.open) {
                return;
            }
            this.open = false;
        }

        @Override
        public void close() {
            this._close();
            if (BlockingBuffer.this.future == null) {
                if (log.isInfoEnabled()) {
                    String msg = "Future not set: " + this;
                    log.warn((Object)msg, (Throwable)new RuntimeException());
                    if (BlockingBuffer.this.openStackFrame != null) {
                        log.warn((Object)msg, (Throwable)BlockingBuffer.this.openStackFrame);
                    }
                }
            } else if (!BlockingBuffer.this.future.isDone()) {
                if (log.isInfoEnabled()) {
                    log.warn((Object)this, (Throwable)new RuntimeException("Cancelling future: " + BlockingBuffer.this.future));
                }
                BlockingBuffer.this.future.cancel(true);
            }
        }

        private final void checkFuture() {
            if (!this.futureIsDone && BlockingBuffer.this.future != null && BlockingBuffer.this.future.isDone()) {
                if (log.isInfoEnabled()) {
                    log.info((Object)"Future is done");
                }
                this.futureIsDone = true;
                BlockingBuffer.this.close();
                try {
                    BlockingBuffer.this.future.get();
                }
                catch (InterruptedException e) {
                    if (log.isInfoEnabled()) {
                        log.info((Object)e.getMessage());
                    }
                    this._close();
                }
                catch (ExecutionException e) {
                    if (InnerCause.isInnerCause((Throwable)e, QueryTimeoutException.class)) {
                        if (log.isInfoEnabled()) {
                            log.info((Object)e.getMessage());
                        }
                        this._close();
                        throw new RuntimeException(e);
                    }
                    if (InnerCause.isInnerCause((Throwable)e, ClosedByInterruptException.class) || InnerCause.isInnerCause((Throwable)e, InterruptedException.class)) {
                        if (log.isInfoEnabled()) {
                            log.info((Object)e.getMessage());
                        }
                        this._close();
                        return;
                    }
                    log.error((Object)e, (Throwable)e);
                    this._close();
                    throw new RuntimeException(e);
                }
            }
        }

        @Override
        public boolean isExhausted() {
            return !BlockingBuffer.this.open && this.nextE == null && BlockingBuffer.this.queue.isEmpty();
        }

        public boolean hasNext() {
            try {
                return this._hasNext(Long.MAX_VALUE);
            }
            catch (InterruptedException ex) {
                this._close();
                this.checkFuture();
                if (log.isDebugEnabled()) {
                    log.info((Object)("Interrupted: " + this), (Throwable)ex);
                } else if (log.isInfoEnabled()) {
                    log.info((Object)("Interrupted: " + this));
                }
                Thread.currentThread().interrupt();
                return false;
            }
        }

        @Override
        public boolean hasNext(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            return this._hasNext(nanos);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean _hasNext(long nanos) throws InterruptedException {
            long begin;
            if (this.nextE != null) {
                return true;
            }
            boolean logOnce = !log.isDebugEnabled();
            boolean loggedStackTraces = false;
            long lastTime = begin = System.nanoTime();
            int ntries = 0;
            long logTimeout = initialLogTimeout;
            while (true) {
                block27: {
                    if (this.nextE == null && !BlockingBuffer.this.open) {
                        long lockTimeout = Math.min(nanos, TimeUnit.MILLISECONDS.toNanos(5L));
                        if (BlockingBuffer.this.lock.tryLock(lockTimeout, TimeUnit.NANOSECONDS)) {
                            try {
                                this.nextE = BlockingBuffer.this.queue.poll();
                                if (this.nextE == null) {
                                    if (log.isInfoEnabled()) {
                                        log.info((Object)"Exhausted");
                                    }
                                    assert (this.isExhausted());
                                    this.checkFuture();
                                    boolean bl = false;
                                    return bl;
                                }
                                break block27;
                            }
                            finally {
                                BlockingBuffer.this.lock.unlock();
                            }
                        }
                        long now = System.nanoTime();
                        if ((nanos -= now - (lastTime = now)) <= 0L) {
                            if (log.isDebugEnabled()) {
                                log.debug((Object)"Timeout");
                            }
                            return false;
                        }
                        if (!log.isDebugEnabled()) continue;
                        log.debug((Object)"Lock timeout - will retry.");
                        continue;
                    }
                }
                if (this.nextE == null) {
                    this.nextE = BlockingBuffer.this.queue.poll();
                }
                if (this.nextE != null) {
                    return true;
                }
                if (!this.open) {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"iterator is closed");
                    }
                    this.checkFuture();
                    return false;
                }
                if (BlockingBuffer.this.cause != null) {
                    throw new IllegalStateException(BlockingBuffer.this.cause);
                }
                long now = System.nanoTime();
                if ((nanos -= now - (lastTime = now)) <= 0L) {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"Timeout");
                    }
                    return false;
                }
                if (ntries < NSPIN_READ) {
                    this.nextE = BlockingBuffer.this.queue.poll();
                    if (this.nextE != null) {
                        if (log.isDebugEnabled()) {
                            log.debug((Object)("next: " + this.nextE));
                        }
                        return true;
                    }
                } else {
                    long timeout = Math.min(nanos, TimeUnit.MILLISECONDS.toNanos(BlockingBuffer.getTimeoutMillis(ntries)));
                    this.nextE = BlockingBuffer.this.queue.poll(timeout, TimeUnit.NANOSECONDS);
                    if (this.nextE != null) {
                        if (log.isDebugEnabled()) {
                            log.debug((Object)("next: " + (this.nextE.getClass().getComponentType() != null ? Arrays.toString((Object[])((Object[])this.nextE)) : this.nextE)));
                        }
                        return true;
                    }
                }
                assert (this.nextE == null);
                ++ntries;
                long elapsedNanos = now - begin;
                if (elapsedNanos < logTimeout) continue;
                logTimeout += Math.min(maxLogTimeout, logTimeout);
                if (BlockingBuffer.this.future == null) {
                    if (!log.isInfoEnabled()) continue;
                    log.info((Object)"Future not set on buffer.");
                    continue;
                }
                this.checkFuture();
            }
        }

        @Override
        public E next(long timeout, TimeUnit unit) throws InterruptedException {
            long now;
            long startTime;
            long lastTime = startTime = System.nanoTime();
            long nanos = timeout = TimeUnit.NANOSECONDS.convert(timeout, unit);
            if (!this.hasNext(nanos, TimeUnit.NANOSECONDS)) {
                long now2 = System.nanoTime();
                nanos -= now2 - lastTime;
                lastTime = now2;
                if (nanos <= 0L) {
                    return null;
                }
                throw new NoSuchElementException();
            }
            Object e = this._next();
            if (e.getClass().getComponentType() != null && (nanos = (now = System.nanoTime()) - startTime) > 0L) {
                return this.combineChunks(e, 1, startTime, timeout);
            }
            return e;
        }

        public E next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            Object e = this._next();
            if (BlockingBuffer.this.chunkTimeout > 0L && e.getClass().getComponentType() != null) {
                int len0;
                int len1;
                Object chunk = this.combineChunks(e, 1, System.nanoTime(), BlockingBuffer.this.chunkTimeout);
                if (BlockingBuffer.this.ordered && (len1 = ((Object[])chunk).length) != (len0 = ((Object[])e).length)) {
                    ChunkMergeSortHelper.mergeSort((Object[])chunk);
                }
                return chunk;
            }
            return e;
        }

        private E _next() {
            if (this.nextE == null) {
                throw new NoSuchElementException();
            }
            Object e = this.nextE;
            this.nextE = null;
            if (e.getClass().getComponentType() != null) {
                BlockingBuffer.this.chunksDrainedCount++;
                BlockingBuffer.this.elementsDrainedCount += ((Object[])e).length;
                BlockingBuffer.this.elementsOnQueueCount.addAndGet(-((Object[])e).length);
            } else {
                BlockingBuffer.this.elementsDrainedCount++;
                BlockingBuffer.this.elementsOnQueueCount.decrementAndGet();
            }
            return e;
        }

        private E combineChunks(E e, int nchunks, long startTime, long timeout) {
            Object[] chunk = (Object[])e;
            long elapsed = System.nanoTime() - startTime;
            boolean isTimeout = elapsed >= timeout;
            boolean isInterrupt = false;
            long nanos = BlockingBuffer.this.chunkTimeout - elapsed;
            if (chunk.length < BlockingBuffer.this.minimumChunkSize && !isTimeout) {
                try {
                    if (this.hasNext(nanos, TimeUnit.NANOSECONDS)) {
                        return this.combineChunks(this.combineNextChunk(e), nchunks + 1, startTime, timeout);
                    }
                }
                catch (InterruptedException ex) {
                    isInterrupt = true;
                }
            }
            if (!(chunk.length >= BlockingBuffer.this.minimumChunkSize || isTimeout || isInterrupt || BlockingBuffer.this.queue.isEmpty())) {
                this.hasNext();
                return this.combineChunks(this.combineNextChunk(e), nchunks + 1, startTime, timeout);
            }
            if (log.isInfoEnabled()) {
                log.info((Object)("done:\n>>> #combined=" + nchunks + ", #elements=" + chunk.length + ", minimumChunkCapacity=" + BlockingBuffer.this.minimumChunkSize + ", elapsed=" + elapsed + "ns, isTimeout=" + isTimeout + ", queueEmpty=" + BlockingBuffer.this.queue.isEmpty() + ", open=" + BlockingBuffer.this.open));
            }
            return e;
        }

        private E combineNextChunk(E e) {
            return BlockingBuffer.combineChunks(e, this._next());
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

