/*
 * Decompiled with CFR 0.152.
 */
package org.zeromq.jms.protocol;

import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
import org.zeromq.jms.ZmqException;
import org.zeromq.jms.ZmqMessage;
import org.zeromq.jms.protocol.ZmqAckEvent;
import org.zeromq.jms.protocol.ZmqEvent;
import org.zeromq.jms.protocol.ZmqGateway;
import org.zeromq.jms.protocol.ZmqGatewayListener;
import org.zeromq.jms.protocol.ZmqHeartbeatEvent;
import org.zeromq.jms.protocol.ZmqSendEvent;
import org.zeromq.jms.protocol.ZmqSocketListener;
import org.zeromq.jms.protocol.ZmqSocketMetrics;
import org.zeromq.jms.protocol.ZmqSocketSession;
import org.zeromq.jms.protocol.ZmqSocketStatus;
import org.zeromq.jms.protocol.ZmqSocketType;
import org.zeromq.jms.protocol.event.ZmqEventHandler;
import org.zeromq.jms.protocol.filter.ZmqFilterPolicy;
import org.zeromq.jms.protocol.redelivery.ZmqRedeliveryPolicy;
import org.zeromq.jms.selector.ZmqMessageSelector;
import org.zeromq.jms.util.Stopwatch;

public abstract class AbstractZmqGateway
implements ZmqGateway {
    private static final Logger LOGGER = Logger.getLogger(AbstractZmqGateway.class.getCanonicalName());
    private static final int HEARTBEAT_RATE_MILLI_SECOND = 1000;
    private static final int AUTO_PAUSE_IDLE_MILLI_SECOND = 3000;
    private AtomicBoolean active = new AtomicBoolean(false);
    private final String name;
    private final ZmqSocketType type;
    private final boolean bound;
    private final String addr;
    private final int flags;
    private final ZMQ.Context context;
    private final List<ZmqSocketMetrics> metrics;
    private final List<ZmqSocketSession> sessions;
    private final boolean transacted;
    private final boolean acknowledged;
    private final boolean heartbeat;
    private final ZmqGateway.Direction direction;
    private final Date startDateTime;
    private final ZmqRedeliveryPolicy redelivery;
    private final ZmqFilterPolicy filterPolicy;
    private final ZmqEventHandler eventHandler;
    private final ZmqMessageSelector messageSelector;
    private ZmqGatewayListener listener = null;
    private ExecutorService socketExecutor = null;
    private ExecutorService listenerExecutor = null;
    private static final int SOCKET_WAIT_MILLI_SECOND = 500;
    private static final int SOCKET_METRIC_BUCKET_COUNT = 360;
    private static final int SOCKET_METRIC_BUCKET_INTERVAL_MILLI_SECOND = 10000;
    private static final int LISTENER_THREAD_POOL = 1;
    private static final int LISTENER_WAIT_MILLI_SECOND = 500;
    private final TransferQueue<ZmqSendEvent> incomingQueue = new LinkedTransferQueue<ZmqSendEvent>();
    private final Queue<ZmqSendEvent> incomingSnapshot = new LinkedList<ZmqSendEvent>();
    private final TransferQueue<ZmqSendEvent> outgoingQueue = new LinkedTransferQueue<ZmqSendEvent>();
    private final Queue<ZmqSendEvent> outgoingSnapshot = new LinkedList<ZmqSendEvent>();
    private final ConcurrentMap<Object, TrackEvent> trackEventMap = new ConcurrentHashMap<Object, TrackEvent>();

    public AbstractZmqGateway(String name, ZMQ.Context context, ZmqSocketType type, boolean isBound, String addr, int flags, ZmqFilterPolicy filter, ZmqEventHandler handler, ZmqGatewayListener listener, ZmqMessageSelector selector, ZmqRedeliveryPolicy redelivery, boolean transacted, boolean acknowledged, boolean heartbeat, ZmqGateway.Direction direction) {
        this.name = name;
        this.context = context;
        this.type = type;
        this.bound = isBound;
        this.addr = addr;
        this.flags = flags;
        this.filterPolicy = filter;
        this.eventHandler = handler;
        this.listener = listener;
        this.messageSelector = selector;
        this.redelivery = redelivery;
        this.transacted = transacted;
        this.acknowledged = acknowledged;
        this.heartbeat = heartbeat;
        this.direction = direction;
        this.startDateTime = new Date();
        this.metrics = new LinkedList<ZmqSocketMetrics>();
        this.sessions = new LinkedList<ZmqSocketSession>();
    }

    protected String[] getSocketAddrs() {
        String[] addrs = this.addr.split(",");
        return addrs;
    }

    @Override
    public void open() {
        this.active.set(true);
        this.listenerExecutor = Executors.newFixedThreadPool(1);
        if (this.listener != null) {
            ListenerThread listenerThread = new ListenerThread();
            this.listenerExecutor.execute(listenerThread);
        }
        String[] socketAddrs = this.getSocketAddrs();
        this.socketExecutor = Executors.newFixedThreadPool(socketAddrs.length);
        boolean socketOutgoing = this.direction == ZmqGateway.Direction.OUTGOING || this.heartbeat;
        boolean socketIncoming = this.direction == ZmqGateway.Direction.INCOMING || this.acknowledged;
        for (String socketAddr : socketAddrs) {
            String[] filters;
            ZMQ.Socket socket = this.getSocket(this.context, this.type.getType());
            if (this.type == ZmqSocketType.SUB && this.filterPolicy != null && (filters = this.filterPolicy.getConsumerFilters()) != null) {
                for (String filter : filters) {
                    byte[] filterAsBytes = filter.getBytes();
                    socket.subscribe(filterAsBytes);
                }
            }
            ZmqSocketMetrics socketMetrics = new ZmqSocketMetrics(socketAddr, 360, 10000, socketOutgoing, socketIncoming);
            this.metrics.add(socketMetrics);
            ZmqSocketListener socketListener = this.getSocketListener(socketAddr, socketIncoming, socketOutgoing);
            ZmqSocketSession socketSession = new ZmqSocketSession(this.active, socket, this.type, socketAddr, this.bound, socketIncoming, socketOutgoing, this.flags, 500, this.heartbeat, socketListener, this.filterPolicy, this.eventHandler, socketMetrics);
            this.sessions.add(socketSession);
            this.socketExecutor.execute(socketSession);
        }
        LOGGER.info("Gateway openned: " + this.toString());
    }

    protected ZMQ.Socket getSocket(ZMQ.Context context, int socketType) {
        ZMQ.Socket socket = context.socket(socketType);
        socket.setSendTimeOut(0);
        return socket;
    }

    protected ZmqSocketListener getSocketListener(final String socketAddr, boolean socketIncoming, final boolean socketOutgoing) {
        ZmqSocketListener socketListener = new ZmqSocketListener(){

            @Override
            public void open(ZmqSocketSession session) {
            }

            @Override
            public ZmqEvent send(ZmqSocketSession source) {
                ZmqEvent sendEvent = null;
                try {
                    sendEvent = (ZmqEvent)AbstractZmqGateway.this.outgoingQueue.poll(500L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException ex) {
                    LOGGER.log(Level.WARNING, "Polling of outgoing queue interrupted", ex);
                }
                if (AbstractZmqGateway.this.heartbeat && socketOutgoing && sendEvent == null) {
                    long lastReceiveTime = source.getLastReceiveTime();
                    long lastSendTime = source.getLastSendTime();
                    long currentTime = System.nanoTime();
                    long lastReceiveLaspedTime = (currentTime - lastReceiveTime) / 1000000L;
                    long lastSendLaspedTime = (currentTime - lastSendTime) / 1000000L;
                    ZmqSocketStatus status = source.getStatus();
                    if (lastReceiveLaspedTime > 1000L && lastSendLaspedTime > 1000L) {
                        if (lastReceiveLaspedTime > 3000L && status == ZmqSocketStatus.RUNNING) {
                            source.pause();
                        } else {
                            sendEvent = AbstractZmqGateway.this.eventHandler.createHeartbeatEvent();
                        }
                    }
                }
                if (sendEvent instanceof ZmqHeartbeatEvent && AbstractZmqGateway.this.acknowledged) {
                    long messageSent = System.currentTimeMillis();
                    Object messageId = sendEvent.getMessageId();
                    TrackEvent tackEvent = new TrackEvent(sendEvent, messageSent);
                    if (LOGGER.isLoggable(Level.FINEST)) {
                        LOGGER.log(Level.FINEST, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " tacking event: " + sendEvent);
                    }
                    AbstractZmqGateway.this.trackEventMap.put(messageId, tackEvent);
                }
                if (LOGGER.isLoggable(Level.FINEST) && sendEvent != null) {
                    LOGGER.log(Level.FINEST, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " send event: " + sendEvent);
                }
                return sendEvent;
            }

            @Override
            public void error(ZmqSocketSession source, ZmqEvent event) {
                if (event instanceof ZmqSendEvent) {
                    ZmqSendEvent sendEvent = (ZmqSendEvent)event;
                    try {
                        AbstractZmqGateway.this.outgoingQueue.put(sendEvent);
                        if (LOGGER.isLoggable(Level.FINEST) && sendEvent != null) {
                            LOGGER.log(Level.FINEST, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " send event: " + sendEvent);
                        }
                    }
                    catch (InterruptedException ex) {
                        LOGGER.log(Level.SEVERE, "Unable to re-send event: " + event, ex);
                    }
                }
            }

            @Override
            public ZmqEvent receive(ZmqSocketSession session, ZmqEvent event) {
                if (LOGGER.isLoggable(Level.FINEST)) {
                    LOGGER.log(Level.FINEST, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " consume event: " + event);
                }
                if (event instanceof ZmqSendEvent) {
                    try {
                        AbstractZmqGateway.this.incomingQueue.put((ZmqSendEvent)event);
                    }
                    catch (InterruptedException ex) {
                        LOGGER.log(Level.SEVERE, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " cannot soncume messagew dues to intenral error: " + event, ex);
                        return null;
                    }
                }
                ZmqAckEvent replyEvent = null;
                if (event instanceof ZmqHeartbeatEvent) {
                    try {
                        if (session.isHeartbeat() && session.isOutgoing()) {
                            replyEvent = AbstractZmqGateway.this.eventHandler.createAckEvent(event);
                        }
                    }
                    catch (ZmqException ex) {
                        LOGGER.log(Level.SEVERE, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " received corrupt event: " + event, (Throwable)((Object)ex));
                    }
                } else if (event instanceof ZmqAckEvent) {
                    ZmqAckEvent ackEvent = (ZmqAckEvent)event;
                    Object messageId = ackEvent.getMessageId();
                    if (messageId == null) {
                        LOGGER.log(Level.SEVERE, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " received corrupt event: " + event);
                    } else {
                        TrackEvent trackedEvent = (TrackEvent)AbstractZmqGateway.this.trackEventMap.remove(messageId);
                        if (trackedEvent == null) {
                            LOGGER.log(Level.WARNING, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " received ACK for untracked event: " + event);
                        }
                    }
                }
                if (replyEvent != null && LOGGER.isLoggable(Level.FINEST)) {
                    LOGGER.log(Level.FINEST, "Socket " + socketAddr + " for gateway " + AbstractZmqGateway.this.name + " reply event: " + event);
                }
                return replyEvent;
            }

            @Override
            public void close(ZmqSocketSession session) {
            }
        };
        return socketListener;
    }

    @Override
    public boolean isActive() {
        return this.active.get();
    }

    @Override
    public void close() {
        if (this.acknowledged && this.trackEventMap.size() > 0) {
            LOGGER.info("Gateway " + this.name + " waiting for acknowledgement message(s): " + this.trackEventMap.size());
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException ex) {
                LOGGER.throwing(AbstractZmqGateway.class.getCanonicalName(), "close()", ex);
            }
        }
        this.active.set(false);
        if (this.listenerExecutor != null) {
            try {
                this.listenerExecutor.shutdown();
                boolean success = this.listenerExecutor.awaitTermination(3L, TimeUnit.SECONDS);
                if (!success) {
                    LOGGER.severe("Listener threads failed to stop: " + this.toString());
                }
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, "Listener threads failed to stop: " + this.toString(), ex);
            }
        }
        if (this.socketExecutor != null) {
            try {
                this.socketExecutor.shutdown();
                boolean success = this.socketExecutor.awaitTermination(3L, TimeUnit.SECONDS);
                if (!success) {
                    LOGGER.severe("Socket threads failed to stop: " + this.toString());
                }
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, "Socket threads failed to stop: " + this.toString(), ex);
            }
        }
        if (this.acknowledged) {
            for (TrackEvent tackMessage : this.trackEventMap.values()) {
                LOGGER.warning("Gateway " + this.name + " has un-acknowledged message (LOST): " + tackMessage);
            }
            this.trackEventMap.clear();
        }
        LOGGER.info("Gateway closed: " + this.toString());
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit() throws ZmqException {
        if (!this.active.get()) {
            throw new ZmqException("The gateway has been close: " + this.toString());
        }
        if (!this.transacted) {
            throw new ZmqException("No transacion started: " + this.toString());
        }
        Queue<ZmqSendEvent> queue = this.outgoingSnapshot;
        synchronized (queue) {
            ZmqSendEvent event = this.outgoingSnapshot.poll();
            while (event != null) {
                this.outgoingQueue.add(event);
                event = this.outgoingSnapshot.poll();
            }
            this.outgoingSnapshot.clear();
        }
        queue = this.incomingSnapshot;
        synchronized (queue) {
            if (this.redelivery != null) {
                this.redelivery.delivered(this.incomingSnapshot);
            }
            this.incomingSnapshot.clear();
        }
        LOGGER.fine("Transaction committed: " + this.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback() throws ZmqException {
        if (!this.active.get()) {
            throw new ZmqException("The gateway has been close: " + this.toString());
        }
        if (!this.transacted) {
            throw new ZmqException("No transacion started: " + this.toString());
        }
        Queue<ZmqSendEvent> queue = this.outgoingSnapshot;
        synchronized (queue) {
            this.outgoingSnapshot.clear();
        }
        queue = this.incomingSnapshot;
        synchronized (queue) {
            if (this.redelivery != null) {
                this.redelivery.redeliver(this.incomingSnapshot);
            }
            this.incomingSnapshot.clear();
        }
        LOGGER.fine("Transaction rolledback: " + this.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(ZmqMessage message) throws ZmqException {
        ZmqSendEvent event = this.eventHandler.createSendEvent(message);
        if (this.transacted) {
            Queue<ZmqSendEvent> queue = this.outgoingSnapshot;
            synchronized (queue) {
                this.outgoingSnapshot.add(event);
            }
        } else {
            this.outgoingQueue.add(event);
        }
    }

    protected boolean isValidMessage(ZmqMessage message) {
        if (this.messageSelector != null) {
            Map<String, Object> variables = message.getProperties();
            boolean validMessage = this.messageSelector.evaluate(variables);
            return validMessage;
        }
        return true;
    }

    @Override
    public ZmqMessage receive() throws ZmqException {
        ZmqMessage message = this.receive(500);
        while (message == null && this.active.get()) {
            message = this.receive(500);
        }
        if (message == null) {
            throw new ZmqException("The gateway has been close: " + this.toString());
        }
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ZmqMessage receive(int timeout) throws ZmqException {
        long startTime;
        Stopwatch stopwatch;
        block19: {
            ZmqSendEvent event;
            stopwatch = null;
            if (LOGGER.isLoggable(Level.FINER)) {
                stopwatch = new Stopwatch();
            }
            if (!this.active.get()) {
                throw new ZmqException("The gateway has been close: " + this.toString());
            }
            if (this.redelivery != null && (event = this.redelivery.getNextRedeliver()) != null) {
                ZmqMessage message = event.getMessage();
                if (this.transacted) {
                    Queue<ZmqSendEvent> queue = this.incomingSnapshot;
                    synchronized (queue) {
                        this.incomingSnapshot.add(event);
                    }
                }
                if (stopwatch != null) {
                    LOGGER.log(Level.FINER, "Receive re-delivery message: " + stopwatch.elapsedTime() + " (msec)");
                }
                return message;
            }
            startTime = System.currentTimeMillis();
            try {
                ZmqMessage message;
                ZmqSendEvent event2 = (ZmqSendEvent)this.incomingQueue.poll(timeout, TimeUnit.MILLISECONDS);
                if (event2 == null || !this.isValidMessage(message = event2.getMessage())) break block19;
                if (this.transacted) {
                    Queue<ZmqSendEvent> queue = this.incomingSnapshot;
                    synchronized (queue) {
                        this.incomingSnapshot.add(event2);
                    }
                }
                if (stopwatch != null) {
                    LOGGER.log(Level.FINER, "Receive incoming message: " + stopwatch.elapsedTime() + " (msec)");
                }
                return message;
            }
            catch (InterruptedException ex) {
                throw new ZmqException("Unable to poll internal queue: " + this.toString(), ex);
            }
        }
        long endTime = System.currentTimeMillis();
        long lapsedTime = endTime - startTime;
        if (lapsedTime < (long)timeout) {
            int remainingTimeout = (int)((long)timeout - lapsedTime);
            ZmqMessage message = this.receive(remainingTimeout);
            if (stopwatch != null) {
                if (message == null) {
                    LOGGER.log(Level.FINER, "Receive incoming message (Wait): " + stopwatch.elapsedTime() + " (msec)");
                } else {
                    LOGGER.log(Level.FINER, "Receive no message (Timeout): " + stopwatch.elapsedTime() + " (msec)");
                }
            }
            return message;
        }
        return null;
    }

    @Override
    public void setListener(ZmqGatewayListener listener) {
        if (listener != null && this.listener == null) {
            ListenerThread listenerThread = new ListenerThread();
            this.listenerExecutor.execute(listenerThread);
        }
        this.listener = listener;
    }

    @Override
    public List<ZmqSocketMetrics> getMetrics() {
        return this.metrics;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public String getAddr() {
        return this.addr;
    }

    @Override
    public ZmqSocketType getType() {
        return this.type;
    }

    @Override
    public boolean isBound() {
        return this.bound;
    }

    @Override
    public boolean isTransacted() {
        return this.transacted;
    }

    @Override
    public boolean isAcknowledged() {
        return this.acknowledged;
    }

    @Override
    public boolean isHeartbeat() {
        return this.heartbeat;
    }

    @Override
    public ZmqGateway.Direction getDirection() {
        return this.direction;
    }

    @Override
    public Date getStartTime() {
        return this.startDateTime;
    }

    protected ZMQ.Context getContext() {
        return this.context;
    }

    protected ZmqFilterPolicy getFilterPolicy() {
        return this.filterPolicy;
    }

    protected ZmqEventHandler getEventHandler() {
        return this.eventHandler;
    }

    protected ZmqMessageSelector getMessageSelector() {
        return this.messageSelector;
    }

    public String toString() {
        return "AbstractZmqGateway [active=" + this.active + ", name=" + this.name + ", type=" + (Object)((Object)this.type) + ", isBound=" + this.bound + ", addr=" + this.addr + ", transacted=" + this.transacted + ", acknowleged=" + this.acknowledged + ", heartbeat=" + this.heartbeat + ", direction=" + (Object)((Object)this.direction) + "]";
    }

    private class ListenerThread
    implements Runnable {
        private ListenerThread() {
        }

        @Override
        public void run() {
            while (AbstractZmqGateway.this.active.get() && AbstractZmqGateway.this.listener != null) {
                try {
                    ZmqMessage message = AbstractZmqGateway.this.receive(500);
                    if (message == null) continue;
                    AbstractZmqGateway.this.listener.onMessage(message);
                }
                catch (ZmqException ex) {
                    AbstractZmqGateway.this.listener.onException(ex);
                }
            }
        }
    }

    private static final class TrackEvent {
        private final ZmqEvent event;
        private final long eventSent;

        TrackEvent(ZmqEvent event, long eventSent) {
            this.event = event;
            this.eventSent = eventSent;
        }

        public String toString() {
            return "TrackEvent [eventSent=" + this.eventSent + ", event=" + this.event + "]";
        }
    }
}

