/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.netlet;

import com.datatorrent.netlet.AbstractClientListener;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.NetletThrowable;
import com.datatorrent.netlet.util.Slice;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteOnlyClient
extends AbstractClientListener {
    private static final Logger logger = LoggerFactory.getLogger(WriteOnlyClient.class);
    protected final ByteBuffer writeBuffer;
    protected Queue<Slice> sendQueue;
    protected final SpscArrayQueue<Slice> freeQueue;
    protected final Queue<NetletThrowable> throwables = new ConcurrentLinkedQueue<NetletThrowable>();
    protected boolean isWriteEnabled = true;
    protected Lock lock = new ReentrantLock();

    public WriteOnlyClient() {
        this(65536, 1024);
    }

    public WriteOnlyClient(int writeBufferCapacity, int sendQueueCapacity) {
        this(ByteBuffer.allocateDirect(writeBufferCapacity), sendQueueCapacity);
    }

    public WriteOnlyClient(ByteBuffer writeBuffer, int sendQueueCapacity) {
        this.writeBuffer = writeBuffer;
        this.sendQueue = new SpscArrayQueue(sendQueueCapacity);
        this.freeQueue = new SpscArrayQueue(sendQueueCapacity);
    }

    @Override
    public void connected() {
        super.connected();
        this.shutdownIO(true);
        this.suspendReadIfResumed();
        try {
            this.lock.lock();
            this.isWriteEnabled = !this.isWriteSuspended();
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void unregistered(SelectionKey key) {
        try {
            this.lock.lock();
            if (this.sendQueue.peek() == null && !this.isWriteEnabled) {
                super.unregistered(key);
            } else {
                this.sendQueue = new ForwardingQueue(this.sendQueue){

                    @Override
                    public boolean offer(Slice f) {
                        throw new RuntimeException("Client " + this + " does not accept new data.");
                    }
                };
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public final void read() throws IOException {
        if (this.suspendReadIfResumed()) {
            logger.warn("{} OP_READ should be disabled", (Object)this);
        } else {
            logger.error("{} read is not expected", (Object)this);
        }
    }

    @Override
    public boolean resumeReadIfSuspended() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void write() throws IOException {
        Slice slice;
        int remaining = this.writeBuffer.remaining();
        while ((slice = this.sendQueue.peek()) != null) {
            while (remaining < slice.length) {
                if (remaining > 0) {
                    this.writeBuffer.put(slice.buffer, slice.offset, remaining);
                    slice.offset += remaining;
                    slice.length -= remaining;
                }
                if (this.channelWrite() == 0) {
                    return;
                }
                remaining = this.writeBuffer.remaining();
            }
            this.writeBuffer.put(slice.buffer, slice.offset, slice.length);
            slice.buffer = null;
            remaining -= slice.length;
            this.freeQueue.offer((Object)this.sendQueue.poll());
        }
        this.channelWrite();
    }

    protected int channelWrite() throws IOException {
        if (this.writeBuffer.flip().remaining() > 0) {
            SocketChannel channel = (SocketChannel)this.key.channel();
            int write = channel.write(this.writeBuffer);
            this.writeBuffer.compact();
            return write;
        }
        this.writeBuffer.clear();
        try {
            this.lock.lock();
            if (this.sendQueue.peek() == null) {
                this.suspendWriteIfResumed();
                this.isWriteEnabled = false;
            }
        }
        finally {
            this.lock.unlock();
        }
        return 0;
    }

    public boolean send(byte[] array) {
        return this.send(array, 0, array.length);
    }

    public boolean send(byte[] array, int offset, int len) {
        Slice f = (Slice)this.freeQueue.poll();
        if (f == null) {
            f = new Slice(array, offset, len);
        } else {
            if (f.buffer != null) {
                throw new RuntimeException("Unexpected slice " + f.toString());
            }
            f.buffer = array;
            f.offset = offset;
            f.length = len;
        }
        return this.send(f);
    }

    public boolean send(Slice f) {
        NetletThrowable throwable = this.throwables.poll();
        if (throwable != null) {
            NetletThrowable.Util.throwRuntime(throwable);
        }
        if (this.sendQueue.offer(f)) {
            try {
                this.lock.lock();
                if (!this.isWriteEnabled) {
                    this.resumeWriteIfSuspended();
                    this.isWriteEnabled = true;
                }
            }
            finally {
                this.lock.unlock();
            }
            return true;
        }
        return false;
    }

    @Override
    public void handleException(Exception e, EventLoop el) {
        if (!this.key.isValid()) {
            super.handleException(e, el);
            this.throwables.offer(NetletThrowable.Util.rewrap(e, el));
        } else if (this.key.attachment() == this) {
            if (e instanceof IOException) {
                logger.error("Disconnecting {} because of an exception.", (Object)this, (Object)e);
                if (this.isConnected()) {
                    el.disconnect(this);
                }
            } else {
                super.handleException(e, el);
                this.throwables.offer(NetletThrowable.Util.rewrap(e, el));
            }
        } else {
            logger.error("Ignoring exception received after discarding the connection.", (Throwable)e);
        }
    }

    private static class ForwardingQueue
    extends AbstractQueue<Slice> {
        private final Queue<Slice> queue;

        private ForwardingQueue(Queue<Slice> queue) {
            this.queue = queue;
        }

        @Override
        public Iterator<Slice> iterator() {
            return this.queue.iterator();
        }

        @Override
        public int size() {
            return this.queue.size();
        }

        @Override
        public boolean offer(Slice slice) {
            return this.queue.offer(slice);
        }

        @Override
        public Slice poll() {
            return this.queue.poll();
        }

        @Override
        public Slice peek() {
            return this.queue.peek();
        }
    }
}

