/*
 * Decompiled with CFR 0.152.
 */
package com.cosylab.epics.caj.impl;

import com.cosylab.epics.caj.CAJContext;
import com.cosylab.epics.caj.impl.CAContext;
import com.cosylab.epics.caj.impl.CachedByteBufferAllocator;
import com.cosylab.epics.caj.impl.Request;
import com.cosylab.epics.caj.impl.ResponseHandler;
import com.cosylab.epics.caj.impl.Transport;
import com.cosylab.epics.caj.impl.TransportClient;
import com.cosylab.epics.caj.impl.reactor.ReactorHandler;
import com.cosylab.epics.caj.impl.reactor.lf.LeaderFollowersThreadPool;
import com.cosylab.epics.caj.impl.requests.EchoRequest;
import com.cosylab.epics.caj.impl.requests.EventsOffRequest;
import com.cosylab.epics.caj.impl.requests.EventsOnRequest;
import com.cosylab.epics.caj.impl.requests.UserNameRequest;
import com.cosylab.epics.caj.util.Timer;
import gov.aps.jca.CAStatus;
import gov.aps.jca.event.ContextExceptionListener;
import gov.aps.jca.event.ContextVirtualCircuitExceptionEvent;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CATransport
implements Transport,
ReactorHandler,
Timer.TimerRunnable {
    private static final Logger logger = Logger.getLogger(CATransport.class.getName());
    private volatile boolean closed = false;
    private CAJContext context;
    protected ResponseHandler responseHandler = null;
    private SocketChannel channel;
    private InetSocketAddress socketAddress;
    private ByteBuffer[] receiveBuffer;
    private static final int FLOW_CONTROL_BUFFER_FULL_COUNT_LIMIT = 4;
    private boolean flowControlActive = false;
    private ByteBuffer socketBuffer;
    private LinkedList sendQueue;
    private short remoteTransportRevision;
    private Map owners;
    private ReentrantLock sendLock = new ReentrantLock();
    private volatile boolean flushPending = false;
    private ByteBuffer sendBuffer;
    private CachedByteBufferAllocator bufferAllocator;
    private ByteBuffer lastActiveSendBuffer = null;
    protected short priority;
    private static final int INITIAL_RX_BUFFER_SIZE = 1024;
    private Runnable flushTask = new Runnable(){

        @Override
        public void run() {
            CATransport.this.flushInternal();
        }
    };
    private boolean probeResponsePending = false;
    private boolean probeTimeoutDetected = false;
    private Object probeLock = new Object();
    private long connectionTimeout;
    private boolean unresponsiveTransport = false;
    private Object taskID;

    public CATransport(CAJContext context, TransportClient client, ResponseHandler responseHandler, SocketChannel channel, short remoteTransportRevision, short priority) {
        this.context = context;
        this.responseHandler = responseHandler;
        this.channel = channel;
        this.remoteTransportRevision = remoteTransportRevision;
        this.priority = priority;
        this.socketAddress = (InetSocketAddress)channel.socket().getRemoteSocketAddress();
        this.receiveBuffer = new ByteBuffer[]{ByteBuffer.allocateDirect(24), ByteBuffer.allocateDirect(1024)};
        this.receiveBuffer[0].limit(16);
        this.socketBuffer = ByteBuffer.allocateDirect(16408);
        this.owners = new HashMap();
        this.acquire(client);
        this.sendQueue = new LinkedList();
        this.bufferAllocator = context.getCachedBufferAllocator();
        this.sendBuffer = this.bufferAllocator.get();
        this.connectionTimeout = (long)(context.getConnectionTimeout() * 1000.0f);
        this.taskID = context.getTimer().executeAfterDelay(this.connectionTimeout, this);
        context.getTransportRegistry().put(this.socketAddress, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(boolean forced) {
        if (this.closed) {
            return;
        }
        CATransport cATransport = this;
        synchronized (cATransport) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            Timer.cancel(this.taskID);
            this.context.getTransportRegistry().remove(this.socketAddress, this.priority);
        }
        if (!forced) {
            this.flushInternal();
        }
        this.freeSendBuffers();
        if (forced) {
            this.closedNotifyContext();
        }
        this.closedNotifyClients();
        this.context.getLogger().finer("Connection to " + this.socketAddress + " closed.");
        this.context.getReactor().unregisterAndClose(this.channel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void freeSendBuffers() {
        LinkedList linkedList = this.sendQueue;
        synchronized (linkedList) {
            this.sendBuffer = null;
            this.lastActiveSendBuffer = null;
            while (this.sendQueue.size() > 0) {
                ByteBuffer buf = (ByteBuffer)this.sendQueue.removeFirst();
                if (buf.capacity() != CachedByteBufferAllocator.bufferSize) continue;
                this.bufferAllocator.put(buf);
            }
        }
    }

    private void closedNotifyContext() {
        ContextVirtualCircuitExceptionEvent cvcee = new ContextVirtualCircuitExceptionEvent(this.context, this.socketAddress.getAddress(), CAStatus.DISCONN);
        ContextExceptionListener[] listeners = this.context.getContextExceptionListeners();
        for (int i = 0; i < listeners.length; ++i) {
            try {
                listeners[i].contextVirtualCircuitException(cvcee);
                continue;
            }
            catch (Throwable th) {
                logger.log(Level.SEVERE, "", th);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closedNotifyClients() {
        TransportClient[] clients;
        Map map = this.owners;
        synchronized (map) {
            int refs = this.owners.size();
            if (refs == 0) {
                return;
            }
            this.context.getLogger().fine("Transport to " + this.socketAddress + " still has " + refs + " client(s) active and closing...");
            clients = new TransportClient[refs];
            this.owners.keySet().toArray(clients);
            this.owners.clear();
        }
        for (int i = 0; i < clients.length; ++i) {
            try {
                clients[i].transportClosed();
                continue;
            }
            catch (Throwable th) {
                logger.log(Level.SEVERE, "", th);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized boolean acquire(TransportClient client) {
        if (this.closed) {
            return false;
        }
        this.context.getLogger().finer("Acquiring transport to " + this.socketAddress + ".");
        Map map = this.owners;
        synchronized (map) {
            if (this.closed) {
                return false;
            }
            this.owners.put(client, null);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void release(TransportClient client) {
        if (this.closed) {
            return;
        }
        this.context.getLogger().finer("Releasing transport to " + this.socketAddress + ".");
        Map map = this.owners;
        synchronized (map) {
            this.owners.remove(client);
            if (this.owners.size() == 0) {
                this.close(false);
            }
        }
    }

    @Override
    public short getMinorRevision() {
        return this.remoteTransportRevision;
    }

    public void setMinorRevision(short rev) {
        this.remoteTransportRevision = rev;
    }

    @Override
    public void handleEvent(SelectionKey key) {
        if (key.isValid() && key.isReadable()) {
            this.processRead();
        }
        if (key.isValid() && key.isWritable()) {
            this.processWrite();
        }
    }

    protected void processRead() {
        try {
            int bufferFullCount = 0;
            while (!this.closed) {
                this.socketBuffer.clear();
                int bytesRead = this.channel.read(this.socketBuffer);
                if (bytesRead < 0) {
                    this.close(true);
                    return;
                }
                if (bytesRead == 0) {
                    bufferFullCount = 0;
                    if (this.flowControlActive) {
                        this.disableFlowControl();
                    }
                    break;
                }
                if (this.socketBuffer.hasRemaining()) {
                    bufferFullCount = 0;
                    if (this.flowControlActive) {
                        this.disableFlowControl();
                    }
                } else if (bufferFullCount >= 4) {
                    if (!this.flowControlActive) {
                        this.enableFlowControl();
                    }
                } else {
                    ++bufferFullCount;
                }
                this.socketBuffer.flip();
                this.processRead(this.socketBuffer);
            }
        }
        catch (IOException ioex) {
            this.close(true);
        }
    }

    protected void processRead(ByteBuffer socketBuffer) {
        while (true) {
            ByteBuffer headerBuffer = this.receiveBuffer[0];
            ByteBuffer payloadBuffer = this.receiveBuffer[1];
            if (headerBuffer.hasRemaining()) {
                CATransport.readFromByteBuffer(socketBuffer, headerBuffer);
                if (headerBuffer.hasRemaining()) break;
                int payloadSize = headerBuffer.getShort(2) & 0xFFFF;
                if (payloadSize == 65535) {
                    if (headerBuffer.limit() == 24) {
                        payloadSize = headerBuffer.getInt(16);
                    } else {
                        headerBuffer.limit(24);
                        continue;
                    }
                }
                if (payloadSize > payloadBuffer.capacity()) {
                    int maxPayloadSize = this.context.getMaxArrayBytes();
                    if (payloadSize > maxPayloadSize) {
                        logger.log(Level.SEVERE, "Received payload size (" + payloadSize + ") is larger than configured maximum array size (" + this.context.getMaxArrayBytes() + "), disconnecting...");
                        this.close(true);
                        return;
                    }
                    int PAGE_SIZE = 4096;
                    int newSize = Math.min(maxPayloadSize, (payloadSize & 0xFFFFF000) + 4096);
                    this.receiveBuffer[1] = ByteBuffer.allocateDirect(newSize);
                    payloadBuffer = this.receiveBuffer[1];
                }
                payloadBuffer.clear();
                payloadBuffer.limit(payloadSize);
            }
            if (payloadBuffer.limit() == 0) {
                try {
                    headerBuffer.flip();
                    this.responseHandler.handleResponse(this.socketAddress, this, this.receiveBuffer);
                }
                catch (Throwable th) {
                    logger.log(Level.SEVERE, "", th);
                }
                headerBuffer.clear();
                headerBuffer.limit(16);
                continue;
            }
            if (!payloadBuffer.hasRemaining()) continue;
            CATransport.readFromByteBuffer(socketBuffer, payloadBuffer);
            if (payloadBuffer.hasRemaining()) break;
            headerBuffer.flip();
            payloadBuffer.flip();
            try {
                this.responseHandler.handleResponse(this.socketAddress, this, this.receiveBuffer);
            }
            catch (Throwable th) {
                logger.log(Level.SEVERE, "", th);
            }
            headerBuffer.clear();
            headerBuffer.limit(16);
        }
    }

    private static final void readFromByteBuffer(ByteBuffer srcBuffer, ByteBuffer destBuffer) {
        int srcBufferPosition = srcBuffer.position();
        int destPosition = destBuffer.position();
        int bytesToRead = Math.min(destBuffer.remaining(), srcBuffer.remaining());
        ByteBuffer toCopy = srcBuffer.slice();
        toCopy.limit(bytesToRead);
        destBuffer.put(toCopy);
        destBuffer.position(destPosition + bytesToRead);
        srcBuffer.position(srcBufferPosition + bytesToRead);
    }

    protected void processWrite() {
        this.flushInternal();
    }

    public void updateUserName() {
        try {
            new UserNameRequest(this).submit();
        }
        catch (IOException e) {
            logger.log(Level.SEVERE, "", e);
        }
    }

    protected void disableFlowControl() {
        try {
            new EventsOnRequest(this).submit();
            this.flowControlActive = false;
        }
        catch (IOException e) {
            logger.log(Level.SEVERE, "", e);
        }
    }

    protected void enableFlowControl() {
        try {
            new EventsOffRequest(this).submit();
            this.flowControlActive = true;
        }
        catch (IOException e) {
            logger.log(Level.SEVERE, "", e);
        }
    }

    public void send(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException {
        this.sendLock.lock();
        try {
            this.noSyncSend(buffer, asyncCloseOnError);
        }
        finally {
            this.sendLock.unlock();
        }
    }

    private void noSyncSend(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException {
        try {
            buffer.flip();
            int SEND_BUFFER_LIMIT = 16000;
            int bufferLimit = buffer.limit();
            this.context.getLogger().finest("Sending " + bufferLimit + " bytes to " + this.socketAddress + ".");
            int parts = (buffer.limit() - 1) / 16000 + 1;
            block4: for (int part = 1; part <= parts; ++part) {
                if (parts > 1) {
                    buffer.limit(Math.min(part * 16000, bufferLimit));
                    this.context.getLogger().finest("[Parted] Sending (part " + part + "/" + parts + ") " + (buffer.limit() - buffer.position()) + " bytes to " + this.socketAddress + ".");
                }
                int TRIES = 10;
                int tries = 0;
                while (true) {
                    int bytesSent;
                    if ((bytesSent = this.channel.write(buffer)) < 0) {
                        throw new IOException("bytesSent < 0");
                    }
                    if (buffer.position() == buffer.limit()) continue block4;
                    if (this.closed) {
                        throw new IOException("transport closed on the client side");
                    }
                    if (tries >= 10) {
                        this.context.getLogger().warning("Failed to send message to " + this.socketAddress + " - buffer full, will retry.");
                    }
                    this.context.getLogger().finest("Send buffer full for " + this.socketAddress + ", waiting...");
                    this.channel.socket().getOutputStream().flush();
                    try {
                        Thread.sleep(Math.min(15000, 10 + tries * 100));
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    ++tries;
                }
            }
        }
        catch (IOException ioex) {
            this.close(true);
            throw ioex;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized boolean flush() {
        LinkedList linkedList = this.sendQueue;
        synchronized (linkedList) {
            if (this.closed || this.sendBuffer == null) {
                return false;
            }
            if (this.sendBuffer.position() == 0) {
                return true;
            }
            if (this.lastActiveSendBuffer != null && this.lastActiveSendBuffer.position() + this.sendBuffer.position() <= this.lastActiveSendBuffer.capacity()) {
                this.sendBuffer.flip();
                this.lastActiveSendBuffer.put(this.sendBuffer);
                this.sendBuffer.clear();
                return true;
            }
            this.sendQueue.add(this.sendBuffer);
            this.lastActiveSendBuffer = this.sendBuffer;
            this.sendBuffer = this.bufferAllocator.get();
            if (this.flushPending) {
                return true;
            }
            this.flushPending = true;
        }
        return this.spawnFlushing();
    }

    private boolean spawnFlushing() {
        LeaderFollowersThreadPool lftp = this.context.getLeaderFollowersThreadPool();
        if (lftp != null) {
            lftp.execute(this.flushTask);
            return true;
        }
        this.context.getReactor().setInterestOps(this.channel, 5);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    public boolean flushInternal() {
        if (this.sendBuffer == null) {
            return false;
        }
lbl3:
        // 3 sources

        try {
            while (this.sendQueue.size() > 0) {
                var2_4 = this.sendQueue;
                synchronized (var2_4) {
                    block29: {
                        if (this.sendQueue.size() != 0) break block29;
                        var3_6 = true;
                        return var3_6;
                    }
                    buf = (ByteBuffer)this.sendQueue.removeFirst();
                    if (buf == this.lastActiveSendBuffer) {
                        this.lastActiveSendBuffer = null;
                    }
                }
                try {
                    this.send(buf, false);
                }
                finally {
                    if (buf.capacity() != CachedByteBufferAllocator.bufferSize) ** GOTO lbl3
                    this.bufferAllocator.put(buf);
                }
            }
            buf = true;
            return buf;
        }
        catch (IOException ioex) {
            this.close(true);
            var2_5 = false;
            return var2_5;
        }
        finally {
            var3_7 = this.sendQueue;
            synchronized (var3_7) {
                this.flushPending = false;
                if (!this.closed && this.sendQueue.size() > 0) {
                    this.spawnFlushing();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submit(Request requestMessage) throws IOException {
        ByteBuffer message = requestMessage.getRequestMessage();
        if (message.capacity() == 0) {
            return;
        }
        if (requestMessage.getPriority() == 100) {
            try {
                if (this.sendLock.tryLock(100L, TimeUnit.MILLISECONDS)) {
                    try {
                        this.noSyncSend(message, true);
                        return;
                    }
                    finally {
                        this.sendLock.unlock();
                    }
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        message.flip();
        LinkedList linkedList = this.sendQueue;
        synchronized (linkedList) {
            if (this.sendBuffer == null) {
                throw new IllegalStateException("transport closed");
            }
            if (message.limit() + this.sendBuffer.position() >= this.sendBuffer.capacity()) {
                this.flush();
            }
            try {
                this.sendBuffer.put(message);
            }
            catch (BufferOverflowException ex) {
                try {
                    logger.fine("Expending sendBuffer for " + Integer.toString(message.limit()));
                    this.sendBuffer.flip();
                    ByteBuffer bigbuf = ByteBuffer.allocateDirect(this.sendBuffer.remaining() + message.limit());
                    bigbuf.put(this.sendBuffer);
                    bigbuf.put(message);
                    if (this.sendBuffer.capacity() == CachedByteBufferAllocator.bufferSize) {
                        this.bufferAllocator.put(this.sendBuffer);
                    }
                    this.sendBuffer = bigbuf;
                }
                catch (Exception ex2) {
                    System.out.println("XX " + Integer.toString(this.sendBuffer.remaining()) + " " + Integer.toString(message.limit()));
                    throw new RuntimeException("Message exceeds write buffer size (com.cosylab.epics.caj.impl.CachedByteBufferAllocator.buffer_size)", ex2);
                }
            }
        }
    }

    @Override
    public CAContext getContext() {
        return this.context;
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        return this.socketAddress;
    }

    @Override
    public short getPriority() {
        return this.priority;
    }

    public void beaconArrivalNotify() {
        if (!this.probeResponsePending) {
            this.rescheduleTimer(this.connectionTimeout);
        }
    }

    private void rescheduleTimer(long timeout) {
        Timer.cancel(this.taskID);
        if (!this.closed) {
            this.taskID = this.context.getTimer().executeAfterDelay(timeout, this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void timeout(long timeToRun) {
        Object object = this.probeLock;
        synchronized (object) {
            if (this.probeResponsePending) {
                this.probeTimeoutDetected = true;
                this.unresponsiveTransport();
            } else {
                this.sendEcho();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void echoNotify() {
        Object object = this.probeLock;
        synchronized (object) {
            if (this.probeResponsePending) {
                if (this.probeTimeoutDetected) {
                    this.sendEcho();
                } else {
                    this.probeTimeoutDetected = false;
                    this.probeResponsePending = false;
                    this.responsiveTransport();
                    this.rescheduleTimer(this.connectionTimeout);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendEcho() {
        Object object = this.probeLock;
        synchronized (object) {
            this.probeTimeoutDetected = false;
            this.probeResponsePending = this.remoteTransportRevision >= 3;
            try {
                new EchoRequest(this).submit();
            }
            catch (IOException ex) {
                this.probeResponsePending = false;
            }
            this.rescheduleTimer(this.context.getEchoTimeout());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void responsiveTransport() {
        if (this.unresponsiveTransport) {
            TransportClient[] clients;
            this.unresponsiveTransport = false;
            Map map = this.owners;
            synchronized (map) {
                clients = new TransportClient[this.owners.size()];
                this.owners.keySet().toArray(clients);
            }
            for (int i = 0; i < clients.length; ++i) {
                try {
                    clients[i].transportResponsive(this);
                    continue;
                }
                catch (Throwable th) {
                    logger.log(Level.SEVERE, "", th);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unresponsiveTransport() {
        if (!this.unresponsiveTransport) {
            TransportClient[] clients;
            this.unresponsiveTransport = true;
            ContextVirtualCircuitExceptionEvent cvcee = new ContextVirtualCircuitExceptionEvent(this.context, this.socketAddress.getAddress(), CAStatus.UNRESPTMO);
            ContextExceptionListener[] listeners = this.context.getContextExceptionListeners();
            for (int i = 0; i < listeners.length; ++i) {
                try {
                    listeners[i].contextVirtualCircuitException(cvcee);
                    continue;
                }
                catch (Throwable th) {
                    logger.log(Level.SEVERE, "", th);
                }
            }
            Map th = this.owners;
            synchronized (th) {
                clients = new TransportClient[this.owners.size()];
                this.owners.keySet().toArray(clients);
            }
            for (int i = 0; i < clients.length; ++i) {
                try {
                    clients[i].transportUnresponsive();
                    continue;
                }
                catch (Throwable th2) {
                    logger.log(Level.SEVERE, "", th2);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void changedTransport() {
        TransportClient[] clients;
        Map map = this.owners;
        synchronized (map) {
            clients = new TransportClient[this.owners.size()];
            this.owners.keySet().toArray(clients);
        }
        for (int i = 0; i < clients.length; ++i) {
            try {
                clients[i].transportChanged();
                continue;
            }
            catch (Throwable th) {
                logger.log(Level.SEVERE, "", th);
            }
        }
    }
}

