/*
 * Decompiled with CFR 0.152.
 */
package org.zeromq.jms.protocol;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
import org.zeromq.jms.ZmqException;
import org.zeromq.jms.ZmqMessage;
import org.zeromq.jms.protocol.ZmqAckEvent;
import org.zeromq.jms.protocol.ZmqEvent;
import org.zeromq.jms.protocol.ZmqGateway;
import org.zeromq.jms.protocol.ZmqGatewayListener;
import org.zeromq.jms.protocol.ZmqHeartbeatEvent;
import org.zeromq.jms.protocol.ZmqProxySession;
import org.zeromq.jms.protocol.ZmqSendEvent;
import org.zeromq.jms.protocol.ZmqSocketContext;
import org.zeromq.jms.protocol.ZmqSocketListener;
import org.zeromq.jms.protocol.ZmqSocketMetrics;
import org.zeromq.jms.protocol.ZmqSocketSession;
import org.zeromq.jms.protocol.ZmqSocketStatus;
import org.zeromq.jms.protocol.ZmqSocketType;
import org.zeromq.jms.protocol.event.ZmqEventHandler;
import org.zeromq.jms.protocol.filter.ZmqFilterPolicy;
import org.zeromq.jms.protocol.redelivery.ZmqRedeliveryPolicy;
import org.zeromq.jms.protocol.store.ZmqJournalEntry;
import org.zeromq.jms.protocol.store.ZmqJournalStore;
import org.zeromq.jms.selector.ZmqMessageSelector;
import org.zeromq.jms.util.Stopwatch;

public abstract class AbstractZmqGateway
implements ZmqGateway {
    private static final Logger LOGGER = Logger.getLogger(AbstractZmqGateway.class.getCanonicalName());
    private static final int HEARTBEAT_RATE_MILLI_SECOND = 1000;
    private static final int AUTO_PAUSE_IDLE_MILLI_SECOND = 3000;
    private static final int SOCKET_STATUS_TIMEOUT_MILLI_SECOND = 5000;
    private static final int SOCKET_WAIT_MILLI_SECOND = 500;
    private static final int SOCKET_METRIC_BUCKET_COUNT = 360;
    private static final int SOCKET_METRIC_BUCKET_INTERVAL_MILLI_SECOND = 10000;
    private static final int LISTENER_THREAD_POOL = 1;
    private static final int LISTENER_WAIT_MILLI_SECOND = 500;
    private AtomicBoolean active = new AtomicBoolean(false);
    private final String name;
    private final ZmqSocketType type;
    private final boolean bound;
    private final String addr;
    private final int flags;
    private ZMQ.Context context;
    private ZMQ.Context proxyContext;
    private final List<ZmqSocketMetrics> metrics;
    private final Map<String, ZmqSocketSession> socketSessions;
    private ZmqProxySession proxySession = null;
    private final boolean transacted;
    private final boolean acknowledge;
    private final boolean heartbeat;
    private final ZmqGateway.Direction direction;
    private final Date startDateTime;
    private final ZmqSocketContext socketContext;
    private final ZmqRedeliveryPolicy redelivery;
    private final ZmqFilterPolicy filterPolicy;
    private final ZmqEventHandler eventHandler;
    private final ZmqMessageSelector messageSelector;
    private final ZmqJournalStore journalStore;
    private ZmqGatewayListener listener = null;
    private ExecutorService socketExecutor = null;
    private ExecutorService listenerExecutor = null;
    private ExecutorService proxyExecutor = null;
    private final TransferQueue<ZmqSendEvent> incomingQueue = new LinkedTransferQueue<ZmqSendEvent>();
    private final Queue<ZmqSendEvent> incomingSnapshot = new LinkedList<ZmqSendEvent>();
    private final TransferQueue<ZmqSendEvent> outgoingQueue = new LinkedTransferQueue<ZmqSendEvent>();
    private final Queue<ZmqSendEvent> outgoingSnapshot = new LinkedList<ZmqSendEvent>();

    public AbstractZmqGateway(String name, ZmqSocketContext socketContext, ZmqFilterPolicy filter, ZmqEventHandler handler, ZmqGatewayListener listener, ZmqJournalStore store, ZmqMessageSelector selector, ZmqRedeliveryPolicy redelivery, boolean transacted, boolean acknowledge, boolean heartbeat, ZmqGateway.Direction direction) {
        this.name = name;
        this.type = socketContext.getType();
        this.socketContext = new ZmqSocketContext(socketContext);
        this.bound = socketContext.isBindFlag();
        this.addr = socketContext.getAddr();
        this.flags = socketContext.getRecieveMsgFlag();
        this.filterPolicy = filter;
        this.eventHandler = handler;
        this.listener = listener;
        this.journalStore = store;
        this.messageSelector = selector;
        this.redelivery = redelivery;
        this.transacted = transacted;
        this.acknowledge = acknowledge;
        this.heartbeat = heartbeat;
        this.direction = direction;
        this.startDateTime = new Date();
        this.metrics = Collections.synchronizedList(new LinkedList());
        this.socketSessions = Collections.synchronizedMap(new HashMap());
    }

    protected String[] getSocketAddrs() {
        String[] addrs = this.addr.split(",");
        return addrs;
    }

    protected boolean waitOnStatus(long millis, EnumSet<ZmqSocketStatus> onStatus) {
        Stopwatch stopwatch = new Stopwatch();
        long waitTime = 500L;
        if (millis < 0L) {
            waitTime = 5000L;
        } else if (millis < waitTime) {
            waitTime = millis / 2L;
        }
        boolean success = false;
        do {
            success = true;
            for (ZmqSocketSession socketSession : this.socketSessions.values()) {
                ZmqSocketStatus status = socketSession.getStatus();
                if (onStatus.contains((Object)status)) continue;
                success = false;
                break;
            }
            if (success) break;
            stopwatch.sleep(waitTime);
        } while (stopwatch.before(millis));
        return success;
    }

    @Override
    public void open(int timeout) {
        if (this.active.get()) {
            return;
        }
        this.context = ZMQ.context((int)this.socketContext.getIOThreads());
        this.active.set(true);
        if (this.journalStore != null) {
            try {
                this.journalStore.open();
            }
            catch (ZmqException ex) {
                LOGGER.log(Level.SEVERE, "Unable to journal store: " + this.journalStore, (Throwable)((Object)ex));
                return;
            }
        }
        this.listenerExecutor = Executors.newFixedThreadPool(1);
        if (this.listener != null) {
            ListenerThread listenerThread = new ListenerThread();
            this.listenerExecutor.execute(listenerThread);
        }
        String[] socketAddrs = this.getSocketAddrs();
        this.socketExecutor = Executors.newFixedThreadPool(socketAddrs.length);
        this.proxyExecutor = this.socketContext.isProxy() ? Executors.newFixedThreadPool(1) : null;
        boolean socketOutgoing = this.direction == ZmqGateway.Direction.OUTGOING || this.heartbeat || this.acknowledge;
        boolean socketIncoming = this.direction == ZmqGateway.Direction.INCOMING || this.heartbeat || this.acknowledge;
        String[] stringArray = socketAddrs;
        int n = socketAddrs.length;
        int n2 = 0;
        while (n2 < n) {
            ZmqSocketSession socketSession;
            ZmqSocketMetrics socketMetrics;
            String[] filters;
            String socketAddr = stringArray[n2];
            ZMQ.Socket socket = this.getSocket(this.context, this.socketContext);
            if (this.type == ZmqSocketType.SUB && this.filterPolicy != null && (filters = this.filterPolicy.getConsumerFilters()) != null) {
                String[] stringArray2 = filters;
                int n3 = filters.length;
                int n4 = 0;
                while (n4 < n3) {
                    String filter = stringArray2[n4];
                    byte[] filterAsBytes = filter.getBytes();
                    socket.subscribe(filterAsBytes);
                    ++n4;
                }
            }
            ZmqSocketMetrics zmqSocketMetrics = socketMetrics = (socketSession = this.socketSessions.get(this.addr)) != null ? socketSession.getMetrics() : null;
            if (socketMetrics == null) {
                socketMetrics = new ZmqSocketMetrics(socketAddr, 360, 10000, socketOutgoing, socketIncoming);
                this.metrics.add(socketMetrics);
            }
            ZmqSocketListener socketListener = this.getSocketListener(socketAddr, socketIncoming, socketOutgoing);
            socketSession = new ZmqSocketSession(this.name, this.active, socket, this.type, socketAddr, this.bound, socketIncoming, socketOutgoing, this.flags, 500, this.heartbeat, this.acknowledge, socketListener, this.filterPolicy, this.eventHandler, socketMetrics);
            this.socketSessions.put(socketAddr, socketSession);
            this.socketExecutor.execute(socketSession);
            if (socketSession.isBound()) {
                ZmqSocketStatus status = socketSession.getStatus();
                while (status == ZmqSocketStatus.STOPPED || status == ZmqSocketStatus.PENDING) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException ex) {
                        LOGGER.warning("Binding sleep interrupted: " + this);
                    }
                    status = socketSession.getStatus();
                }
            }
            ++n2;
        }
        if (this.socketContext.isProxy()) {
            this.proxyContext = ZMQ.context((int)this.socketContext.getIOThreads());
            String proxyName = "proxy(" + this.name + ")";
            String frontSocketAddr = this.socketContext.getProxyAddr();
            ZmqSocketType frontSocketType = this.socketContext.getProxyType() == null ? ZmqSocketType.ROUTER : this.socketContext.getProxyType();
            boolean frontSocketBound = true;
            ZMQ.Socket frontSocket = this.context.socket(frontSocketType.getType());
            String backSocketAddr = this.addr;
            ZmqSocketType backSocketType = this.socketContext.getOutProxyType() == null ? ZmqSocketType.DEALER : this.socketContext.getOutProxyType();
            boolean backSocketBound = true;
            ZMQ.Socket backSocket = this.context.socket(backSocketType.getType());
            this.proxySession = new ZmqProxySession(proxyName, this.active, frontSocket, frontSocketType, frontSocketAddr, true, backSocket, backSocketType, backSocketAddr, true);
            this.proxyExecutor.execute(this.proxySession);
        }
        this.waitOnStatus(timeout, EnumSet.of(ZmqSocketStatus.RUNNING, ZmqSocketStatus.PAUSED, ZmqSocketStatus.ERROR));
        LOGGER.info("Gateway openned: " + this.toString());
    }

    protected ZMQ.Socket getSocket(ZMQ.Context context, ZmqSocketContext socketContext) {
        Object value;
        Method method;
        int socketType = socketContext.getType().getType();
        ZMQ.Socket socket = context.socket(socketType);
        socket.setSendTimeOut(0);
        HashMap<String, Object> valueMap = new HashMap<String, Object>();
        Method[] methodArray = socketContext.getClass().getMethods();
        int n = methodArray.length;
        int n2 = 0;
        while (n2 < n) {
            String getterMethodName;
            method = methodArray[n2];
            if (method.getParameterTypes().length == 0 && method.getReturnType() != null && (getterMethodName = method.getName()).startsWith("get")) {
                try {
                    value = method.invoke((Object)socketContext, new Object[0]);
                    if (value != null) {
                        valueMap.put(getterMethodName.substring(3), value);
                    }
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
                    LOGGER.log(Level.WARNING, "Ignoring 'getter' as potential 'setter': " + getterMethodName, ex);
                }
            }
            ++n2;
        }
        methodArray = socket.getClass().getMethods();
        n = methodArray.length;
        n2 = 0;
        while (n2 < n) {
            String setterMethodName;
            method = methodArray[n2];
            if (method.getParameterTypes().length == 1 && method.getReturnType() == null && (setterMethodName = method.getName()).startsWith("set") && (value = valueMap.get(setterMethodName.substring(3))) != null) {
                try {
                    method.invoke((Object)socket, value);
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
                    LOGGER.log(Level.WARNING, "Ignoring 'setting' of socket context': " + setterMethodName, ex);
                }
            }
            ++n2;
        }
        return socket;
    }

    protected ZmqSocketListener getSocketListener(String socketAddr, boolean socketIncoming, boolean socketOutgoing) {
        ZmqSocketListener socketListener = new ZmqSocketListener(){

            @Override
            public boolean open(ZmqSocketSession session) {
                return AbstractZmqGateway.this.socketOpen(session);
            }

            @Override
            public ZmqEvent send(ZmqSocketSession session) {
                return AbstractZmqGateway.this.socketSend(session);
            }

            @Override
            public void error(ZmqSocketSession session, ZmqEvent event) {
                AbstractZmqGateway.this.socketError(session, event);
            }

            @Override
            public ZmqEvent receive(ZmqSocketSession session, ZmqEvent event) {
                return AbstractZmqGateway.this.socketReceive(session, event);
            }

            @Override
            public boolean close(ZmqSocketSession session) {
                return AbstractZmqGateway.this.socketClose(session);
            }
        };
        return socketListener;
    }

    @Override
    public boolean isActive() {
        return this.active.get();
    }

    @Override
    public void close(int timeout) {
        this.active.set(false);
        if (this.proxyContext != null) {
            this.proxyContext.close();
        }
        if (this.acknowledge) {
            int totalCount = 0;
            for (ZmqSocketSession socketSession : this.socketSessions.values()) {
                int sessionCount = socketSession.trackedCount();
                if (sessionCount > 0) {
                    LOGGER.info("Gateway [" + this.name + "@" + socketSession.getAddr() + "] waiting for acknowledgement message(s): " + sessionCount);
                }
                totalCount += sessionCount;
            }
            if (totalCount > 0) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ex) {
                    LOGGER.throwing(AbstractZmqGateway.class.getCanonicalName(), "close()", ex);
                }
            }
        }
        this.waitOnStatus(timeout, EnumSet.of(ZmqSocketStatus.STOPPED));
        if (this.journalStore != null) {
            try {
                this.journalStore.close();
            }
            catch (ZmqException ex) {
                LOGGER.log(Level.SEVERE, "Gateway [" + this.name + "] unable to close the journal store: " + this.journalStore, (Throwable)((Object)ex));
            }
        }
        if (this.listenerExecutor != null) {
            try {
                this.listenerExecutor.shutdown();
                boolean success = this.listenerExecutor.awaitTermination(3L, TimeUnit.SECONDS);
                if (!success) {
                    LOGGER.severe("Gateway [" + this.name + "] listener threads failed to stop: " + this.toString());
                }
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, "Gateway [" + this.name + "] listener threads failed to stop: " + this.toString(), ex);
            }
        }
        if (this.proxyExecutor != null) {
            try {
                this.proxyExecutor.shutdown();
                boolean success = this.proxyExecutor.awaitTermination(3L, TimeUnit.SECONDS);
                if (!success) {
                    LOGGER.warning("Proxy thread fails to stop, until context terminated (ZMQ issue): " + this.toString());
                }
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, "Proxy threads failed to stop: " + this.toString(), ex);
            }
        }
        if (this.socketExecutor != null) {
            try {
                this.socketExecutor.shutdown();
                boolean success = this.socketExecutor.awaitTermination(3L, TimeUnit.SECONDS);
                if (!success) {
                    LOGGER.severe("Socket threads failed to stop: " + this.toString());
                }
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, "Socket threads failed to stop: " + this.toString(), ex);
            }
        }
        if (this.acknowledge) {
            for (ZmqSocketSession socketSession : this.socketSessions.values()) {
                List<ZmqSocketSession.TrackEvent> lostEvents = socketSession.untrackAll();
                for (ZmqSocketSession.TrackEvent lostEvent : lostEvents) {
                    if (!(lostEvent.getEvent() instanceof ZmqSendEvent)) continue;
                    LOGGER.warning("Gateway [" + this.name + "] has un-acknowledged message (LOST): " + lostEvent);
                }
            }
        }
        this.context.close();
        LOGGER.info("Gateway closed: " + this.toString());
    }

    protected boolean socketOpen(ZmqSocketSession source) {
        if (source.isBound()) {
            for (ZmqSocketSession socketSession : this.socketSessions.values()) {
                ZmqSocketStatus status = socketSession.getStatus();
                if (status != ZmqSocketStatus.RUNNING) continue;
                return false;
            }
        }
        return true;
    }

    protected boolean socketClose(ZmqSocketSession source) {
        return true;
    }

    protected ZmqEvent socketSend(ZmqSocketSession source) {
        String socketAddr = source.getAddr();
        boolean socketOutgoing = source.isOutgoing();
        ZmqEvent sendEvent = null;
        if (source.getStatus() == ZmqSocketStatus.RUNNING) {
            if (this.journalStore != null) {
                try {
                    ZmqJournalEntry journalEntry = this.journalStore.read();
                    if (journalEntry != null && !source.isTracked(journalEntry.getMessageId())) {
                        sendEvent = this.eventHandler.createSendEvent(journalEntry.getMessageId(), journalEntry.getMessage());
                    }
                }
                catch (ZmqException ex) {
                    LOGGER.log(Level.WARNING, "Socket [" + this.name + "@" + socketAddr + "] failed to read from the journal store", (Throwable)((Object)ex));
                }
            }
            if (sendEvent == null) {
                try {
                    sendEvent = (ZmqEvent)this.outgoingQueue.poll(500L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException ex) {
                    LOGGER.log(Level.WARNING, "Socket [" + this.name + "@" + socketAddr + "] polling of outgoing queue interrupted", ex);
                }
            }
        }
        if (this.heartbeat && socketOutgoing && sendEvent == null) {
            long lastReceiveTime = source.getLastReceiveTime();
            long lastSendTime = source.getLastSendTime();
            long currentTime = System.nanoTime();
            long lastReceiveLaspedTime = (currentTime - lastReceiveTime) / 1000000L;
            long lastSendLaspedTime = (currentTime - lastSendTime) / 1000000L;
            ZmqSocketStatus status = source.getStatus();
            if (lastReceiveLaspedTime > 1000L && lastSendLaspedTime > 1000L) {
                if (lastReceiveLaspedTime > 3000L && status == ZmqSocketStatus.RUNNING) {
                    source.pause();
                    List<ZmqSocketSession.TrackEvent> redoEvents = source.untrackAll();
                    for (ZmqSocketSession.TrackEvent redoEvent : redoEvents) {
                        if (!(redoEvent.getEvent() instanceof ZmqSendEvent)) continue;
                        this.socketError(source, redoEvent.getEvent());
                    }
                } else {
                    sendEvent = this.eventHandler.createHeartbeatEvent();
                    if (LOGGER.isLoggable(Level.FINEST)) {
                        LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + socketAddr + "] send heartbeat: " + sendEvent);
                    }
                }
            }
        }
        if (sendEvent != null && socketOutgoing && this.acknowledge) {
            source.track(sendEvent);
            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + socketAddr + "] tacking event: " + sendEvent);
            }
        }
        if (LOGGER.isLoggable(Level.FINEST) && sendEvent != null) {
            LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + socketAddr + "] send event: " + sendEvent);
        }
        return sendEvent;
    }

    public void socketError(ZmqSocketSession source, ZmqEvent event) {
        if (event instanceof ZmqSendEvent) {
            ZmqSendEvent sendEvent = (ZmqSendEvent)event;
            try {
                this.outgoingQueue.put(sendEvent);
                if (LOGGER.isLoggable(Level.FINEST) && sendEvent != null) {
                    LOGGER.log(Level.FINEST, "Socket [" + source.getAddr() + "] send event: " + sendEvent);
                }
            }
            catch (InterruptedException ex) {
                String socketAddr = source.getAddr();
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + socketAddr + "] was unable to re-send event: " + event, ex);
            }
        }
    }

    public ZmqEvent socketReceive(ZmqSocketSession source, ZmqEvent event) {
        String socketAddr = source.getAddr();
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + socketAddr + "] consume event: " + event);
        }
        if (event instanceof ZmqSendEvent) {
            try {
                if (this.journalStore != null) {
                    this.journalStore.create(event.getMessageId(), ((ZmqSendEvent)event).getMessage());
                }
                this.incomingQueue.put((ZmqSendEvent)event);
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + socketAddr + "] for gateway " + this.name + " cannot consume message due to intenral error: " + event, ex);
                return null;
            }
            catch (ZmqException ex) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + socketAddr + "] for gateway " + this.name + " cannot store messahe due to intenral error: " + event, (Throwable)((Object)ex));
                return null;
            }
        }
        ZmqAckEvent replyEvent = null;
        if (event instanceof ZmqHeartbeatEvent) {
            try {
                if (source.isAcknowledge() && source.isIncoming()) {
                    replyEvent = this.eventHandler.createAckEvent(event);
                }
            }
            catch (ZmqException ex) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + socketAddr + "] received corrupt event: " + event, (Throwable)((Object)ex));
            }
        } else if (event instanceof ZmqAckEvent) {
            ZmqAckEvent ackEvent = (ZmqAckEvent)event;
            Object messageId = ackEvent.getMessageId();
            if (messageId == null) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + socketAddr + "] received corrupt event: " + event);
            } else {
                LOGGER.log(Level.INFO, "Socket [" + this.name + "@" + socketAddr + "] received ACK event: " + event);
                ZmqSocketSession.TrackEvent trackedEvent = source.untrack(messageId);
                if (trackedEvent == null) {
                    LOGGER.log(Level.WARNING, "Socket [" + this.name + "@" + socketAddr + "] received ACK for untracked event: " + event);
                }
            }
        }
        if (replyEvent != null && LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + socketAddr + "] reply event: " + event);
        }
        return replyEvent;
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit() throws ZmqException {
        if (!this.active.get()) {
            throw new ZmqException("The gateway has been close: " + this.toString());
        }
        if (!this.transacted) {
            throw new ZmqException("No transacion started: " + this.toString());
        }
        Queue<ZmqSendEvent> queue = this.outgoingSnapshot;
        synchronized (queue) {
            for (ZmqSendEvent event : this.outgoingSnapshot) {
                this.outgoingQueue.add(event);
                if (this.journalStore == null) continue;
                this.journalStore.create(event.getMessageId(), event.getMessage());
            }
            this.outgoingSnapshot.clear();
        }
        queue = this.incomingSnapshot;
        synchronized (queue) {
            if (this.redelivery != null) {
                this.redelivery.delivered(this.incomingSnapshot);
            }
            for (ZmqSendEvent event : this.incomingSnapshot) {
                if (this.journalStore == null) continue;
                this.journalStore.delete(event.getMessageId());
            }
            this.incomingSnapshot.clear();
        }
        LOGGER.fine("Transaction committed: " + this.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback() throws ZmqException {
        if (!this.active.get()) {
            throw new ZmqException("The gateway has been close: " + this.toString());
        }
        if (!this.transacted) {
            throw new ZmqException("No transacion started: " + this.toString());
        }
        Queue<ZmqSendEvent> queue = this.outgoingSnapshot;
        synchronized (queue) {
            this.outgoingSnapshot.clear();
        }
        queue = this.incomingSnapshot;
        synchronized (queue) {
            if (this.redelivery != null) {
                this.redelivery.redeliver(this.incomingSnapshot);
            }
            this.incomingSnapshot.clear();
        }
        LOGGER.fine("Transaction rolledback: " + this.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(ZmqMessage message) throws ZmqException {
        ZmqSendEvent event = this.eventHandler.createSendEvent(message);
        if (this.transacted) {
            Queue<ZmqSendEvent> queue = this.outgoingSnapshot;
            synchronized (queue) {
                this.outgoingSnapshot.add(event);
            }
        } else {
            this.outgoingQueue.add(event);
            if (this.journalStore != null) {
                this.journalStore.create(event.getMessageId(), event.getMessage());
            }
        }
    }

    protected boolean isValidMessage(ZmqMessage message) {
        if (this.messageSelector != null) {
            Map<String, Object> variables = message.getProperties();
            boolean validMessage = this.messageSelector.evaluate(variables);
            return validMessage;
        }
        return true;
    }

    @Override
    public ZmqMessage receive() throws ZmqException {
        ZmqMessage message = this.receive(500);
        while (message == null && this.active.get()) {
            message = this.receive(500);
        }
        if (message == null) {
            throw new ZmqException("Receive request, buy gateway has been close: " + this.toString());
        }
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ZmqMessage receive(int timeout) throws ZmqException {
        ZmqJournalEntry journalEntry;
        ZmqSendEvent event;
        Stopwatch stopwatch = null;
        if (LOGGER.isLoggable(Level.FINER)) {
            stopwatch = new Stopwatch();
        }
        if (!this.active.get()) {
            throw new ZmqException("Receive request, buy gateway has been close: " + this.toString());
        }
        if (this.redelivery != null && (event = this.redelivery.getNextRedeliver()) != null) {
            ZmqMessage message = event.getMessage();
            if (this.transacted) {
                Queue<ZmqSendEvent> queue = this.incomingSnapshot;
                synchronized (queue) {
                    this.incomingSnapshot.add(event);
                }
            }
            if (stopwatch != null) {
                LOGGER.log(Level.FINER, "Receive re-delivery message: " + stopwatch.lapsedTime() + " (msec) :" + this.toString());
            }
            return message;
        }
        if (this.journalStore != null && (journalEntry = this.journalStore.read()) != null) {
            if (this.transacted) {
                ZmqSendEvent event2 = this.eventHandler.createSendEvent(journalEntry.getMessageId(), journalEntry.getMessage());
                Queue<ZmqSendEvent> queue = this.outgoingSnapshot;
                synchronized (queue) {
                    this.outgoingSnapshot.add(event2);
                }
            } else {
                this.journalStore.delete(journalEntry.getMessageId());
            }
            return journalEntry.getMessage();
        }
        long startTime = System.currentTimeMillis();
        try {
            ZmqMessage message;
            ZmqSendEvent event3 = (ZmqSendEvent)this.incomingQueue.poll(timeout, TimeUnit.MILLISECONDS);
            if (event3 != null && this.isValidMessage(message = event3.getMessage())) {
                if (this.transacted) {
                    Queue<ZmqSendEvent> queue = this.incomingSnapshot;
                    synchronized (queue) {
                        this.incomingSnapshot.add(event3);
                    }
                } else if (this.journalStore != null) {
                    this.journalStore.create(event3.getMessageId(), message);
                }
                if (stopwatch != null) {
                    LOGGER.log(Level.FINER, "Gateway [" + this.name + "] receive incoming message: " + stopwatch.lapsedTime() + " (msec)");
                }
                return message;
            }
        }
        catch (InterruptedException ex) {
            throw new ZmqException("Unable to poll internal queue: " + this.toString(), ex);
        }
        long endTime = System.currentTimeMillis();
        long lapsedTime = endTime - startTime;
        if (lapsedTime < (long)timeout) {
            int remainingTimeout = (int)((long)timeout - lapsedTime);
            ZmqMessage message = this.receive(remainingTimeout);
            if (stopwatch != null) {
                if (message == null) {
                    LOGGER.log(Level.FINER, "Gateway  [" + this.name + "] receive incoming message (Wait): " + stopwatch.lapsedTime() + " (msec)");
                } else {
                    LOGGER.log(Level.FINER, "Gatewau  [" + this.name + "] receive no message (Timeout): " + stopwatch.lapsedTime() + " (msec)");
                }
            }
            return message;
        }
        return null;
    }

    @Override
    public void setListener(ZmqGatewayListener listener) {
        if (listener != null && this.listener == null) {
            ListenerThread listenerThread = new ListenerThread();
            this.listenerExecutor.execute(listenerThread);
        }
        this.listener = listener;
    }

    @Override
    public List<ZmqSocketMetrics> getMetrics() {
        return this.metrics;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public String getAddr() {
        return this.addr;
    }

    @Override
    public ZmqSocketType getType() {
        return this.type;
    }

    @Override
    public ZmqSocketContext getSocketContext() {
        return this.socketContext;
    }

    @Override
    public boolean isBound() {
        return this.bound;
    }

    @Override
    public boolean isTransacted() {
        return this.transacted;
    }

    @Override
    public boolean isAcknowledged() {
        return this.acknowledge;
    }

    @Override
    public boolean isHeartbeat() {
        return this.heartbeat;
    }

    @Override
    public ZmqGateway.Direction getDirection() {
        return this.direction;
    }

    @Override
    public Date getStartTime() {
        return this.startDateTime;
    }

    protected ZMQ.Context getContext() {
        return this.context;
    }

    protected ZmqFilterPolicy getFilterPolicy() {
        return this.filterPolicy;
    }

    protected ZmqEventHandler getEventHandler() {
        return this.eventHandler;
    }

    protected ZmqMessageSelector getMessageSelector() {
        return this.messageSelector;
    }

    public String toString() {
        return String.valueOf(this.getClass().getCanonicalName()) + " [active=" + this.active + ", name=" + this.name + ", type=" + (Object)((Object)this.type) + ", isBound=" + this.bound + ", addr=" + this.addr + ", proxyAddr=" + this.socketContext.getProxyAddr() + ", transacted=" + this.transacted + ", acknowleged=" + this.acknowledge + ", heartbeat=" + this.heartbeat + ", direction=" + (Object)((Object)this.direction) + ", eventHandler=" + this.eventHandler + ", journalStore=" + this.journalStore + "]";
    }

    private class ListenerThread
    implements Runnable {
        private ListenerThread() {
        }

        @Override
        public void run() {
            while (AbstractZmqGateway.this.active.get() && AbstractZmqGateway.this.listener != null) {
                try {
                    ZmqMessage message = AbstractZmqGateway.this.receive(500);
                    if (message == null) continue;
                    AbstractZmqGateway.this.listener.onMessage(message);
                }
                catch (ZmqException ex) {
                    AbstractZmqGateway.this.listener.onException(ex);
                }
            }
        }
    }
}

