/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.stream;

import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.stream.BufferServerSubscriber;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FastSubscriber
extends BufferServerSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(FastSubscriber.class);

    public FastSubscriber(String id, int queueCapacity) {
        super(id, queueCapacity);
    }

    @Override
    public void activate(StreamContext context) {
        InetSocketAddress address = context.getBufferServerAddress();
        this.eventloop = (EventLoop)context.get(StreamContext.EVENT_LOOP);
        this.eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, (Listener.ClientListener)this);
        logger.debug("registering subscriber: id={} upstreamId={} streamLogicalName={} windowId={} mask={} partitions={} server={}", new Object[]{context.getSinkId(), context.getSourceId(), context.getId(), context.getFinishedWindowId(), context.getPartitionMask(), context.getPartitions(), context.getBufferServerAddress()});
        this.activate("1.1", context.getId() + '/' + context.getSinkId(), context.getSourceId(), context.getPartitionMask(), context.getPartitions(), context.getFinishedWindowId(), this.freeFragments.capacity());
    }

    public int readSize() {
        if (this.writeOffset - this.readOffset < 2) {
            return -1;
        }
        short s = this.buffer[this.readOffset++];
        return s | this.buffer[this.readOffset++] << 8;
    }
}

