/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.stream;

import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.client.Subscriber;
import com.datatorrent.bufferserver.packet.Tuple;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.codec.StatefulStreamCodec;
import com.datatorrent.stram.engine.ByteCounterStream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.tuple.CheckpointTuple;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferServerSubscriber
extends Subscriber
implements ByteCounterStream {
    private boolean suspended;
    private long baseSeconds;
    protected StreamCodec<Object> serde;
    protected StatefulStreamCodec<Object> statefulSerde;
    protected EventLoop eventloop;
    private final StatefulStreamCodec.DataStatePair dsp;
    CircularBuffer<Slice> offeredFragments;
    CircularBuffer<Slice> polledFragments;
    CircularBuffer<Slice> freeFragments;
    private final ArrayDeque<CircularBuffer<Slice>> backlog;
    private int lastWindowId = 15999;
    private final AtomicLong readByteCount;
    private volatile BufferReservoir[] reservoirs = new BufferReservoir[0];
    private final HashMap<String, BufferReservoir> reservoirMap = new HashMap();
    private static final Logger logger = LoggerFactory.getLogger(BufferServerSubscriber.class);

    public BufferServerSubscriber(String id, int queueCapacity) {
        super(id);
        this.readByteCount = new AtomicLong(0L);
        this.dsp = new StatefulStreamCodec.DataStatePair();
        this.polledFragments = this.offeredFragments = new CircularBuffer(queueCapacity);
        this.freeFragments = new CircularBuffer(queueCapacity);
        this.backlog = new ArrayDeque();
    }

    public void read(int len) {
        super.read(len);
        this.readByteCount.addAndGet(len);
    }

    public void activate(StreamContext context) {
        this.setToken((byte[])context.get(StreamContext.BUFFER_SERVER_TOKEN));
        InetSocketAddress address = context.getBufferServerAddress();
        this.eventloop = (EventLoop)context.get(StreamContext.EVENT_LOOP);
        this.eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, (Listener.ClientListener)this);
        logger.debug("Registering subscriber: id={} upstreamId={} streamLogicalName={} windowId={} mask={} partitions={} server={}", new Object[]{context.getSinkId(), context.getSourceId(), context.getId(), Codec.getStringWindowId((long)context.getFinishedWindowId()), context.getPartitionMask(), context.getPartitions(), context.getBufferServerAddress()});
        this.activate(null, context.getId() + '/' + context.getSinkId(), context.getSourceId(), context.getPartitionMask(), context.getPartitions(), context.getFinishedWindowId(), this.freeFragments.capacity());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(byte[] buffer, int offset, int length) {
        Slice f;
        if (this.freeFragments.isEmpty()) {
            f = new Slice(buffer, offset, length);
        } else {
            f = (Slice)this.freeFragments.pollUnsafe();
            f.buffer = buffer;
            f.offset = offset;
            f.length = length;
        }
        if (!this.offeredFragments.offer((Object)f)) {
            ArrayDeque<CircularBuffer<Slice>> arrayDeque = this.backlog;
            synchronized (arrayDeque) {
                if (!this.suspended) {
                    this.suspendRead();
                    this.suspended = true;
                }
                int newsize = this.offeredFragments.capacity() == MAX_SENDBUFFER_SIZE ? this.offeredFragments.capacity() : this.offeredFragments.capacity() << 1;
                this.offeredFragments = new CircularBuffer(newsize);
                this.backlog.add((CircularBuffer<Slice>)this.offeredFragments);
                this.offeredFragments.add((Object)f);
            }
        }
    }

    public void setup(StreamContext context) {
        StreamCodec codec = (StreamCodec)context.get(StreamContext.CODEC);
        if (codec == null) {
            this.statefulSerde = ((StatefulStreamCodec)StreamContext.CODEC.defaultValue).newInstance();
        } else if (codec instanceof StatefulStreamCodec) {
            this.statefulSerde = ((StatefulStreamCodec)codec).newInstance();
        } else {
            this.serde = codec;
        }
        this.baseSeconds = context.getFinishedWindowId() & 0xFFFFFFFF00000000L;
    }

    public void deactivate() {
        this.eventloop.disconnect((Listener.ClientListener)this);
        this.setToken(null);
    }

    public void teardown() {
    }

    public SweepableReservoir acquireReservoir(String id, int capacity) {
        BufferReservoir r = this.reservoirMap.get(id);
        if (r == null) {
            r = new BufferReservoir(capacity);
            this.reservoirMap.put(id, r);
            BufferReservoir[] newReservoirs = new BufferReservoir[this.reservoirs.length + 1];
            newReservoirs[this.reservoirs.length] = r;
            int i = this.reservoirs.length;
            while (i-- > 0) {
                newReservoirs[i] = this.reservoirs[i];
            }
            this.reservoirs = newReservoirs;
        }
        return r;
    }

    public SweepableReservoir acquireReservoirForPersistStream(String id, int capacity, StreamCodec<?> streamCodec) {
        BufferReservoir r = this.reservoirMap.get(id);
        if (r == null) {
            r = new BufferReservoirForPersistStream(capacity, (StreamCodecWrapperForPersistance)streamCodec);
            this.reservoirMap.put(id, r);
            BufferReservoir[] newReservoirs = new BufferReservoir[this.reservoirs.length + 1];
            newReservoirs[this.reservoirs.length] = r;
            int i = this.reservoirs.length;
            while (i-- > 0) {
                newReservoirs[i] = this.reservoirs[i];
            }
            this.reservoirs = newReservoirs;
        }
        return r;
    }

    public void put(Object tuple) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public SweepableReservoir releaseReservoir(String sinkId) {
        BufferReservoir r = this.reservoirMap.remove(sinkId);
        if (r != null) {
            BufferReservoir[] newReservoirs = new BufferReservoir[this.reservoirs.length - 1];
            int j = 0;
            for (BufferReservoir reservoir : this.reservoirs) {
                if (reservoir == r) continue;
                newReservoirs[j++] = reservoir;
            }
            this.reservoirs = newReservoirs;
        }
        return r;
    }

    public int getCount(boolean reset) {
        return 0;
    }

    @Override
    public long getByteCount(boolean reset) {
        if (reset) {
            return this.readByteCount.getAndSet(0L);
        }
        return this.readByteCount.get();
    }

    public class BufferReservoirForPersistStream
    extends BufferReservoir {
        StreamCodecWrapperForPersistance wrapperStreamCodec;

        BufferReservoirForPersistStream(int capacity, StreamCodecWrapperForPersistance<Object> streamCodec) {
            super(capacity);
            this.wrapperStreamCodec = streamCodec;
        }

        @Override
        protected Object processPayload(Tuple data) {
            Object o = this.wrapperStreamCodec.fromByteArray(data.getData());
            if (!this.wrapperStreamCodec.shouldCaptureEvent(o)) {
                this.skipObject = true;
            }
            return o;
        }
    }

    class BufferReservoir
    extends CircularBuffer<Object>
    implements SweepableReservoir {
        protected boolean skipObject;
        private Sink<Object> sink;
        int count;

        BufferReservoir(int capacity) {
            super(capacity);
            this.skipObject = false;
        }

        @Override
        public int size(boolean dataTupleAware) {
            int size = this.size();
            if (dataTupleAware) {
                Iterator iterator = this.getFrozenIterator();
                while (iterator.hasNext()) {
                    if (!(iterator.next() instanceof com.datatorrent.stram.tuple.Tuple)) continue;
                    --size;
                }
            }
            return size;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Sink<Object> setSink(Sink<Object> sink) {
            try {
                Sink<Object> sink2 = this.sink;
                return sink2;
            }
            finally {
                this.sink = sink;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public com.datatorrent.stram.tuple.Tuple sweep() {
            int size = this.size();
            if (size > 0) {
                for (int i = 0; i < size; ++i) {
                    if (this.peekUnsafe() instanceof com.datatorrent.stram.tuple.Tuple) {
                        this.count += i;
                        return (com.datatorrent.stram.tuple.Tuple)this.peekUnsafe();
                    }
                    this.sink.put(this.pollUnsafe());
                }
                this.count += size;
            }
            ArrayDeque arrayDeque = BufferServerSubscriber.this.backlog;
            synchronized (arrayDeque) {
                int min = BufferServerSubscriber.this.polledFragments.size();
                if (min == 0) {
                    if (BufferServerSubscriber.this.offeredFragments == BufferServerSubscriber.this.polledFragments) {
                        if (BufferServerSubscriber.this.suspended) {
                            BufferServerSubscriber.this.resumeRead();
                            BufferServerSubscriber.this.suspended = false;
                        }
                        return null;
                    }
                    BufferServerSubscriber.this.polledFragments = (CircularBuffer)BufferServerSubscriber.this.backlog.remove();
                    min = BufferServerSubscriber.this.polledFragments.size();
                }
                int i = BufferServerSubscriber.this.reservoirs.length;
                while (i-- > 0) {
                    if (BufferServerSubscriber.this.reservoirs[i].remainingCapacity() >= min) continue;
                    min = BufferServerSubscriber.this.reservoirs[i].remainingCapacity();
                }
                block15: while (min-- > 0) {
                    Object o;
                    Slice fm = (Slice)BufferServerSubscriber.this.polledFragments.pollUnsafe();
                    Tuple data = Tuple.getTuple((byte[])fm.buffer, (int)fm.offset, (int)fm.length);
                    switch (data.getType()) {
                        case NO_MESSAGE: {
                            BufferServerSubscriber.this.freeFragments.offer((Object)fm);
                            continue block15;
                        }
                        case CODEC_STATE: {
                            ((BufferServerSubscriber)BufferServerSubscriber.this).dsp.state = data.getData();
                            BufferServerSubscriber.this.freeFragments.offer((Object)fm);
                            continue block15;
                        }
                        case RESET_WINDOW: {
                            BufferServerSubscriber.this.baseSeconds = (long)data.getBaseSeconds() << 32;
                            if (BufferServerSubscriber.this.lastWindowId < 15999) {
                                BufferServerSubscriber.this.freeFragments.offer((Object)fm);
                                continue block15;
                            }
                            o = new ResetWindowTuple(BufferServerSubscriber.this.baseSeconds | (long)data.getWindowWidth());
                            break;
                        }
                        case PAYLOAD: {
                            o = this.processPayload(data);
                            break;
                        }
                        case CHECKPOINT: {
                            if (BufferServerSubscriber.this.statefulSerde != null) {
                                BufferServerSubscriber.this.statefulSerde.resetState();
                            }
                            o = new CheckpointTuple(BufferServerSubscriber.this.baseSeconds | (long)data.getWindowId());
                            break;
                        }
                        case END_WINDOW: {
                            o = new EndWindowTuple(BufferServerSubscriber.this.baseSeconds | (long)(BufferServerSubscriber.this.lastWindowId = data.getWindowId()));
                            break;
                        }
                        case END_STREAM: {
                            o = new EndStreamTuple(BufferServerSubscriber.this.baseSeconds | (long)data.getWindowId());
                            break;
                        }
                        case BEGIN_WINDOW: {
                            o = new com.datatorrent.stram.tuple.Tuple(data.getType(), BufferServerSubscriber.this.baseSeconds | (long)data.getWindowId());
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException("Unhandled Message Type " + data.getType());
                        }
                    }
                    BufferServerSubscriber.this.freeFragments.offer((Object)fm);
                    if (this.skipObject) {
                        this.skipObject = false;
                        continue;
                    }
                    int i2 = BufferServerSubscriber.this.reservoirs.length;
                    while (i2-- > 0) {
                        BufferServerSubscriber.this.reservoirs[i2].add(o);
                    }
                }
            }
            return null;
        }

        protected Object processPayload(Tuple data) {
            Object o;
            if (BufferServerSubscriber.this.statefulSerde == null) {
                o = BufferServerSubscriber.this.serde.fromByteArray(data.getData());
            } else {
                ((BufferServerSubscriber)BufferServerSubscriber.this).dsp.data = data.getData();
                o = BufferServerSubscriber.this.statefulSerde.fromDataStatePair(BufferServerSubscriber.this.dsp);
            }
            return o;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int getCount(boolean reset) {
            try {
                int n = this.count;
                return n;
            }
            finally {
                if (reset) {
                    this.count = 0;
                }
            }
        }
    }
}

