/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.api.Sink;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.netlet.util.UnsafeBlockingQueue;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.tuple.Tuple;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractReservoir
implements SweepableReservoir,
BlockingQueue<Object> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractReservoir.class);
    static final String reservoirClassNameProperty = "com.datatorrent.stram.engine.Reservoir";
    private static final int SPSC_ARRAY_BLOCKING_QUEUE_CAPACITY_THRESHOLD = 65536;
    private Sink<Object> sink;
    private String id;
    protected int count;

    public static AbstractReservoir newReservoir(String id, int capacity) {
        String reservoirClassName = System.getProperty(reservoirClassNameProperty);
        if (reservoirClassName == null) {
            if (capacity >= 65536) {
                return new SpscArrayQueueReservoir(id, capacity);
            }
            return new SpscArrayBlockingQueueReservoir(id, capacity);
        }
        if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) {
            return new SpscArrayQueueReservoir(id, capacity);
        }
        if (reservoirClassName.equals(SpscArrayBlockingQueueReservoir.class.getName())) {
            return new SpscArrayBlockingQueueReservoir(id, capacity);
        }
        if (reservoirClassName.equals(CircularBufferReservoir.class.getName())) {
            return new CircularBufferReservoir(id, capacity);
        }
        if (reservoirClassName.equals(ArrayBlockingQueueReservoir.class.getName())) {
            return new ArrayBlockingQueueReservoir(id, capacity);
        }
        try {
            Constructor<?> constructor = Class.forName(reservoirClassName).getConstructor(String.class, Integer.TYPE);
            return (AbstractReservoir)constructor.newInstance(id, capacity);
        }
        catch (ReflectiveOperationException e) {
            logger.debug("Fail to construct reservoir {}", (Object)reservoirClassName, (Object)e);
            throw new RuntimeException("Fail to construct reservoir " + reservoirClassName, e);
        }
    }

    protected AbstractReservoir(String id) {
        this.id = id;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Sink<Object> setSink(Sink<Object> sink) {
        try {
            Sink<Object> sink2 = this.sink;
            return sink2;
        }
        finally {
            this.sink = sink;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getCount(boolean reset) {
        try {
            int n = this.count;
            return n;
        }
        finally {
            if (reset) {
                this.count = 0;
            }
        }
    }

    public abstract int capacity();

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    protected Sink<Object> getSink() {
        return this.sink;
    }

    public String toString() {
        return this.getClass().getName() + '@' + Integer.toHexString(this.hashCode()) + "{sink=" + this.sink + ", id=" + this.id + ", count=" + this.count + '}';
    }

    private static class CircularBufferReservoir
    extends AbstractReservoir
    implements UnsafeBlockingQueue<Object> {
        private final CircularBuffer<Object> circularBuffer;

        private CircularBufferReservoir(String id, int capacity) {
            super(id);
            this.circularBuffer = new CircularBuffer(capacity);
        }

        @Override
        public Tuple sweep() {
            CircularBuffer<Object> circularBuffer = this.circularBuffer;
            Sink<Object> sink = this.getSink();
            int size = circularBuffer.size();
            for (int i = 0; i < size; ++i) {
                if (circularBuffer.peekUnsafe() instanceof Tuple) {
                    this.count += i;
                    return (Tuple)this.peekUnsafe();
                }
                sink.put(this.pollUnsafe());
            }
            this.count += size;
            return null;
        }

        @Override
        public boolean add(Object o) {
            return this.circularBuffer.add(o);
        }

        @Override
        public Object remove() {
            return this.circularBuffer.remove();
        }

        @Override
        public Object peek() {
            return this.circularBuffer.peek();
        }

        @Override
        public int size(boolean dataTupleAware) {
            int size = this.circularBuffer.size();
            if (dataTupleAware) {
                Iterator iterator = this.circularBuffer.getFrozenIterator();
                while (iterator.hasNext()) {
                    if (!(iterator.next() instanceof Tuple)) continue;
                    --size;
                }
            }
            return size;
        }

        @Override
        public int capacity() {
            return this.circularBuffer.capacity();
        }

        @Override
        public int drainTo(Collection<? super Object> container) {
            return this.circularBuffer.drainTo(container);
        }

        @Override
        public boolean offer(Object o) {
            return this.circularBuffer.offer(o);
        }

        @Override
        public void put(Object o) throws InterruptedException {
            this.circularBuffer.put(o);
        }

        @Override
        public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException {
            return this.circularBuffer.offer(o, timeout, unit);
        }

        @Override
        public Object take() throws InterruptedException {
            return this.circularBuffer.take();
        }

        @Override
        public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
            return this.circularBuffer.poll(timeout, unit);
        }

        @Override
        public int remainingCapacity() {
            return this.circularBuffer.remainingCapacity();
        }

        @Override
        public boolean remove(Object o) {
            return this.circularBuffer.remove(o);
        }

        @Override
        public boolean contains(Object o) {
            return this.circularBuffer.contains(o);
        }

        @Override
        public int drainTo(Collection<? super Object> collection, int maxElements) {
            return this.circularBuffer.drainTo(collection, maxElements);
        }

        @Override
        public Object poll() {
            return this.circularBuffer.poll();
        }

        public Object pollUnsafe() {
            return this.circularBuffer.pollUnsafe();
        }

        @Override
        public Object element() {
            return this.circularBuffer.element();
        }

        @Override
        public boolean isEmpty() {
            return this.circularBuffer.isEmpty();
        }

        public Iterator<Object> getFrozenIterator() {
            return this.circularBuffer.getFrozenIterator();
        }

        public Iterable<Object> getFrozenIterable() {
            return this.circularBuffer.getFrozenIterable();
        }

        @Override
        public Iterator<Object> iterator() {
            return this.circularBuffer.iterator();
        }

        @Override
        public Object[] toArray() {
            return this.circularBuffer.toArray();
        }

        @Override
        public <T> T[] toArray(T[] a) {
            return this.circularBuffer.toArray((Object[])a);
        }

        @Override
        public boolean containsAll(Collection<?> c) {
            return this.circularBuffer.containsAll(c);
        }

        @Override
        public boolean addAll(Collection<?> c) {
            return this.circularBuffer.addAll(c);
        }

        @Override
        public boolean removeAll(Collection<?> c) {
            return this.circularBuffer.removeAll(c);
        }

        @Override
        public boolean retainAll(Collection<?> c) {
            return this.circularBuffer.retainAll(c);
        }

        @Override
        public int size() {
            return this.circularBuffer.size();
        }

        @Override
        public void clear() {
            this.circularBuffer.clear();
        }

        public Object peekUnsafe() {
            return this.circularBuffer.peekUnsafe();
        }

        public CircularBuffer<Object> getWhitehole(String exceptionMessage) {
            return this.circularBuffer.getWhitehole(exceptionMessage);
        }
    }

    private static class ArrayBlockingQueueReservoir
    extends AbstractReservoir {
        private final ArrayBlockingQueue<Object> queue;

        private ArrayBlockingQueueReservoir(String id, int capacity) {
            super(id);
            this.queue = new ArrayBlockingQueue(capacity);
        }

        @Override
        public Tuple sweep() {
            Object o;
            ArrayBlockingQueue<Object> queue = this.queue;
            Sink<Object> sink = this.getSink();
            while ((o = queue.peek()) != null) {
                if (o instanceof Tuple) {
                    return (Tuple)o;
                }
                ++this.count;
                sink.put(queue.poll());
            }
            return null;
        }

        @Override
        public boolean add(Object o) {
            return this.queue.add(o);
        }

        @Override
        public boolean offer(Object o) {
            return this.queue.offer(o);
        }

        @Override
        public void put(Object o) throws InterruptedException {
            this.queue.put(o);
        }

        @Override
        public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException {
            return this.queue.offer(o, timeout, unit);
        }

        @Override
        public Object poll() {
            return this.queue.poll();
        }

        @Override
        public Object take() throws InterruptedException {
            return this.queue.take();
        }

        @Override
        public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
            return this.queue.poll(timeout, unit);
        }

        @Override
        public Object peek() {
            return this.queue.peek();
        }

        @Override
        public int size() {
            return this.queue.size();
        }

        @Override
        public int size(boolean dataTupleAware) {
            return this.queue.size();
        }

        @Override
        public int capacity() {
            throw new UnsupportedOperationException();
        }

        @Override
        public int remainingCapacity() {
            return this.queue.remainingCapacity();
        }

        @Override
        public boolean remove(Object o) {
            return this.queue.remove(o);
        }

        @Override
        public boolean contains(Object o) {
            return this.queue.contains(o);
        }

        @Override
        public Object[] toArray() {
            return this.queue.toArray();
        }

        @Override
        public <T> T[] toArray(T[] a) {
            return this.queue.toArray(a);
        }

        @Override
        public String toString() {
            return this.queue.toString();
        }

        @Override
        public void clear() {
            this.queue.clear();
        }

        @Override
        public int drainTo(Collection<? super Object> c) {
            return this.queue.drainTo(c);
        }

        @Override
        public int drainTo(Collection<? super Object> c, int maxElements) {
            return this.queue.drainTo(c, maxElements);
        }

        @Override
        public Iterator<Object> iterator() {
            return this.queue.iterator();
        }

        @Override
        public Object remove() {
            return this.queue.remove();
        }

        @Override
        public Object element() {
            return this.queue.element();
        }

        @Override
        public boolean addAll(Collection<?> c) {
            return this.queue.addAll(c);
        }

        @Override
        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        @Override
        public boolean containsAll(Collection<?> c) {
            return this.queue.containsAll(c);
        }

        @Override
        public boolean removeAll(Collection<?> c) {
            return this.queue.removeAll(c);
        }

        @Override
        public boolean retainAll(Collection<?> c) {
            return this.queue.retainAll(c);
        }
    }

    private static class SpscArrayBlockingQueueReservoir
    extends SpscArrayQueueReservoir {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition notFull = this.lock.newCondition();

        private SpscArrayBlockingQueueReservoir(String id, int capacity) {
            super(id, capacity);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Tuple sweep() {
            ReentrantLock lock = this.lock;
            SpscArrayQueue<Object> queue = this.getQueue();
            Sink<Object> sink = this.getSink();
            lock.lock();
            try {
                Object o;
                while ((o = queue.peek()) != null) {
                    if (o instanceof Tuple) {
                        Tuple tuple = (Tuple)o;
                        return tuple;
                    }
                    ++this.count;
                    sink.put(queue.poll());
                    this.notFull.signal();
                    if (!lock.hasQueuedThreads()) continue;
                    Tuple tuple = null;
                    return tuple;
                }
                Tuple tuple = null;
                return tuple;
            }
            finally {
                lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void put(Object o) throws InterruptedException {
            SpscArrayQueue<Object> queue = this.getQueue();
            if (!queue.offer(o)) {
                ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    while (!queue.offer(o)) {
                        this.notFull.await();
                    }
                }
                finally {
                    lock.unlock();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Object remove() {
            SpscArrayQueue<Object> queue = this.getQueue();
            ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object o = queue.remove();
                if (o != null) {
                    this.notFull.signal();
                }
                Object object = o;
                return object;
            }
            finally {
                lock.unlock();
            }
        }
    }

    private static class SpscArrayQueueReservoir
    extends AbstractReservoir {
        private final int maxSpinMillis = 10;
        private final SpscArrayQueue<Object> queue;

        private SpscArrayQueueReservoir(String id, int capacity) {
            super(id);
            this.queue = new SpscArrayQueue(capacity);
        }

        @Override
        public Tuple sweep() {
            Object o;
            SpscArrayQueue<Object> queue = this.queue;
            Sink<Object> sink = this.getSink();
            while ((o = queue.peek()) != null) {
                if (o instanceof Tuple) {
                    return (Tuple)o;
                }
                ++this.count;
                sink.put(queue.poll());
            }
            return null;
        }

        @Override
        public boolean add(Object o) {
            return this.queue.add(o);
        }

        @Override
        public Object remove() {
            return this.queue.remove();
        }

        @Override
        public Object peek() {
            return this.queue.peek();
        }

        @Override
        public int size(boolean dataTupleAware) {
            return this.queue.size();
        }

        @Override
        public int capacity() {
            return this.queue.capacity();
        }

        @Override
        public int drainTo(final Collection<? super Object> container) {
            return this.queue.drain((MessagePassingQueue.Consumer)new MessagePassingQueue.Consumer<Object>(){

                public void accept(Object o) {
                    container.add(o);
                }
            });
        }

        @Override
        public boolean offer(Object o) {
            return this.queue.offer(o);
        }

        @Override
        public void put(Object o) throws InterruptedException {
            long spinMillis = 0L;
            SpscArrayQueue<Object> queue = this.queue;
            while (!queue.offer(o)) {
                Thread.sleep(spinMillis);
                spinMillis = Math.min(10L, spinMillis + 1L);
            }
        }

        @Override
        public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Object take() throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override
        public int remainingCapacity() {
            SpscArrayQueue<Object> queue = this.queue;
            return queue.capacity() - queue.size();
        }

        @Override
        public boolean remove(Object o) {
            return this.queue.remove(o);
        }

        @Override
        public boolean contains(Object o) {
            return this.queue.contains(o);
        }

        @Override
        public int drainTo(final Collection<? super Object> collection, int maxElements) {
            return this.queue.drain((MessagePassingQueue.Consumer)new MessagePassingQueue.Consumer<Object>(){

                public void accept(Object o) {
                    collection.add(o);
                }
            }, maxElements);
        }

        @Override
        public Object poll() {
            return this.queue.poll();
        }

        @Override
        public Object element() {
            return this.queue.element();
        }

        @Override
        public boolean isEmpty() {
            return this.queue.peek() == null;
        }

        @Override
        public Iterator<Object> iterator() {
            return this.queue.iterator();
        }

        @Override
        public Object[] toArray() {
            return this.queue.toArray();
        }

        @Override
        public <T> T[] toArray(T[] a) {
            return this.queue.toArray((Object[])a);
        }

        @Override
        public boolean containsAll(Collection<?> c) {
            return this.queue.containsAll(c);
        }

        @Override
        public boolean addAll(Collection<?> c) {
            return this.queue.addAll(c);
        }

        @Override
        public boolean removeAll(Collection<?> c) {
            return this.queue.removeAll(c);
        }

        @Override
        public boolean retainAll(Collection<?> c) {
            return this.queue.retainAll(c);
        }

        @Override
        public int size() {
            return this.queue.size();
        }

        @Override
        public void clear() {
            this.queue.clear();
        }

        protected SpscArrayQueue<Object> getQueue() {
            return this.queue;
        }
    }
}

