/*
 * Decompiled with CFR 0.152.
 */
package org.zeromq.jms.protocol;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.jms.ZmqException;
import org.zeromq.jms.protocol.ZmqEvent;
import org.zeromq.jms.protocol.ZmqSocketListener;
import org.zeromq.jms.protocol.ZmqSocketMetrics;
import org.zeromq.jms.protocol.ZmqSocketStatus;
import org.zeromq.jms.protocol.ZmqSocketType;
import org.zeromq.jms.protocol.event.ZmqEventHandler;
import org.zeromq.jms.protocol.filter.ZmqFilterPolicy;

public class ZmqSocketSession
implements Runnable {
    private static final Logger LOGGER = Logger.getLogger(ZmqSocketSession.class.getCanonicalName());
    private volatile ZmqSocketStatus status = ZmqSocketStatus.STOPPED;
    private volatile long lastReceiveTime = System.nanoTime();
    private volatile long lastSendTime = System.nanoTime();
    private final AtomicBoolean process;
    private final ZMQ.Socket socket;
    private final ZmqSocketType socketType;
    private final String socketAddr;
    private final boolean socketBound;
    private final boolean socketIncoming;
    private final boolean socketOutgoing;
    private final int socketFlags;
    private final int socketWaitTime;
    private final boolean socketHeartbeat;
    private final ZmqSocketListener socketListener;
    private final ZmqSocketMetrics metrics;
    private final ZmqEventHandler handler;
    private final ZmqFilterPolicy filter;

    public ZmqSocketSession(AtomicBoolean process, ZMQ.Socket socket, ZmqSocketType socketType, String socketAddr, boolean socketBound, boolean socketIncoming, boolean socketOutgoing, int socketFlags, int socketWaitTime, boolean socketHeartbeat, ZmqSocketListener socketListener, ZmqFilterPolicy filter, ZmqEventHandler handler, ZmqSocketMetrics metrics) {
        this.process = process;
        this.socket = socket;
        this.socketType = socketType;
        this.socketAddr = socketAddr;
        this.socketBound = socketBound;
        this.socketIncoming = socketIncoming;
        this.socketOutgoing = socketOutgoing;
        this.socketFlags = socketFlags;
        this.socketWaitTime = socketWaitTime;
        this.socketHeartbeat = socketHeartbeat;
        this.socketListener = socketListener;
        this.filter = filter;
        this.handler = handler;
        this.metrics = metrics;
    }

    public String getAddr() {
        return this.socketAddr;
    }

    public boolean isBound() {
        return this.socketBound;
    }

    public boolean isIncoming() {
        return this.socketIncoming;
    }

    public boolean isOutgoing() {
        return this.socketOutgoing;
    }

    public boolean isHeartbeat() {
        return this.socketHeartbeat;
    }

    public ZmqSocketStatus getStatus() {
        return this.status;
    }

    public void pause() {
        if (this.status == ZmqSocketStatus.RUNNING) {
            this.status = ZmqSocketStatus.WAITING;
            LOGGER.warning("Socket paused: " + this);
        }
    }

    public long getLastReceiveTime() {
        return this.lastReceiveTime;
    }

    public long getLastSendTime() {
        return this.lastSendTime;
    }

    @Override
    public void run() {
        this.openSocket(this);
        if (this.socketOutgoing) {
            this.socket.setReceiveTimeOut(0);
        } else {
            this.socket.setReceiveTimeOut(this.socketWaitTime);
        }
        LOGGER.info("Started socket: " + this);
        while (this.process.get()) {
            if (this.socketOutgoing) {
                this.sendSocket(this);
            }
            if (this.socketIncoming) {
                this.receiveSocket(this);
            }
            this.metrics.setStatus(this.status);
        }
        LOGGER.info("Stopped socket: " + this);
        if (this.socketHeartbeat && this.socketOutgoing && this.socketIncoming) {
            this.socket.setReceiveTimeOut(this.socketWaitTime);
            this.sendSocket(this);
        }
        this.closeSocket(this);
    }

    protected void openSocket(ZmqSocketSession socketSession) {
        String socketAddr = socketSession.socketAddr;
        ZMQ.Socket socket = socketSession.socket;
        if (this.socketBound) {
            try {
                socket.bind(socketAddr);
            }
            catch (Exception ex) {
                LOGGER.log(Level.SEVERE, "Socket binding failure: " + this);
                throw ex;
            }
            LOGGER.info("Bind socket successful: " + this);
        } else {
            try {
                socket.connect(socketAddr);
            }
            catch (Exception ex) {
                LOGGER.log(Level.SEVERE, "Socket connect failure: " + this, ex);
                throw ex;
            }
            LOGGER.info("Connect socket successful: " + this);
        }
        this.socketListener.open(this);
        this.status = ZmqSocketStatus.RUNNING;
    }

    protected void closeSocket(ZmqSocketSession socketSession) {
        String socketAddr = socketSession.socketAddr;
        ZMQ.Socket socket = socketSession.socket;
        if (this.socketBound) {
            try {
                socket.unbind(socketAddr);
                socket.close();
            }
            catch (Exception ex) {
                LOGGER.log(Level.SEVERE, "Socketing unbind failure: " + this, ex);
                throw ex;
            }
            LOGGER.info("Unbind socket successful: " + this);
        } else {
            try {
                socket.disconnect(socketAddr);
                socket.close();
            }
            catch (Exception ex) {
                LOGGER.log(Level.SEVERE, "Socket disconnect failure: " + this, ex);
                throw ex;
            }
            LOGGER.info("Disconnect socket successful: " + this);
        }
        this.socketListener.close(this);
        this.status = ZmqSocketStatus.STOPPED;
    }

    protected void sendSocket(ZmqSocketSession socketSession) {
        block4: {
            boolean active = this.status == ZmqSocketStatus.RUNNING;
            ZmqEvent socketEvent = null;
            try {
                if (this.socketListener == null) break block4;
                do {
                    if ((socketEvent = this.socketListener.send(this)) == null) continue;
                    ZMsg msg = this.handler.createMsg(this.socketType, this.filter, socketEvent);
                    boolean success = msg.send(this.socket, true);
                    if (success) {
                        this.metrics.incrementSend();
                        this.lastSendTime = System.nanoTime();
                        continue;
                    }
                    LOGGER.log(Level.SEVERE, "Unable to send message: " + this);
                    this.status = ZmqSocketStatus.WAITING;
                    this.socketListener.error(this, socketEvent);
                    break;
                } while (socketEvent != null && active);
            }
            catch (ZmqException ex) {
                LOGGER.log(Level.SEVERE, "Unable to send message due to internal error: " + this, (Throwable)((Object)ex));
                this.socketListener.error(this, socketEvent);
            }
        }
    }

    protected void receiveSocket(ZmqSocketSession socketSession) {
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "Receive and wait (" + this.socket.getReceiveTimeOut() + ") : " + this);
        }
        ZMsg msg = ZMsg.recvMsg((ZMQ.Socket)this.socket, (int)this.socketFlags);
        while (msg != null) {
            this.metrics.incrementReceive();
            this.lastReceiveTime = System.nanoTime();
            try {
                ZmqEvent replyEvent;
                ZmqEvent event = this.handler.createEvent(this.socketType, msg);
                if (event != null && this.socketListener != null && (replyEvent = this.socketListener.receive(this, event)) != null) {
                    if (this.socketOutgoing) {
                        ZMsg replyMsg = this.handler.createMsg(this.socketType, this.filter, replyEvent);
                        replyMsg.send(this.socket, true);
                        this.metrics.incrementSend();
                        this.lastSendTime = System.nanoTime();
                    } else {
                        LOGGER.log(Level.SEVERE, "Socketing has not outgoing state: " + this);
                    }
                }
            }
            catch (ZmqException ex) {
                LOGGER.log(Level.SEVERE, "Socketing incoming failure: " + this, (Throwable)((Object)ex));
            }
            msg.destroy();
            msg = ZMsg.recvMsg((ZMQ.Socket)this.socket);
        }
    }

    public String toString() {
        return "ZmqSocketSession [socketType=" + (Object)((Object)this.socketType) + ", socketAddr=" + this.socketAddr + ", socketBound=" + this.socketBound + ", socketIncoming=" + this.socketIncoming + ", socketOutgoing=" + this.socketOutgoing + ", socketFlags=" + this.socketFlags + ", socketWaitTime=" + this.socketWaitTime + ", socketHeartbeat=" + this.socketHeartbeat + "]";
    }
}

