/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.deps.transport.amqp;

import com.microsoft.azure.sdk.iot.deps.transport.amqp.AmqpDeviceOperations;
import com.microsoft.azure.sdk.iot.deps.transport.amqp.AmqpListener;
import com.microsoft.azure.sdk.iot.deps.transport.amqp.AmqpMessage;
import com.microsoft.azure.sdk.iot.deps.transport.amqp.AmqpReactor;
import com.microsoft.azure.sdk.iot.deps.transport.amqp.SaslHandler;
import com.microsoft.azure.sdk.iot.deps.transport.amqp.SaslListenerImpl;
import com.microsoft.azure.sdk.iot.deps.util.CustomLogger;
import com.microsoft.azure.sdk.iot.deps.util.ObjectLock;
import com.microsoft.azure.sdk.iot.deps.ws.impl.WebSocketImpl;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.SaslListener;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;

public class AmqpsConnection
extends BaseHandler {
    private static final int MAX_WAIT_TO_OPEN_CLOSE_CONNECTION = 60000;
    private static final int MAX_WAIT_TO_TERMINATE_EXECUTOR = 30;
    private static final String WEB_SOCKET_PATH = "/$iothub/websocket";
    private static final String WEB_SOCKET_SUB_PROTOCOL = "AMQPWSB10";
    private static final int AMQP_PORT = 5671;
    private static final int AMQP_WEB_SOCKET_PORT = 443;
    private static final int THREAD_POOL_MAX_NUMBER = 1;
    private int linkCredit;
    private long nextTag;
    private Boolean useWebSockets;
    private Boolean isOpen;
    private String hostName;
    private String fullHostAddress;
    private Connection connection;
    private Session session;
    private ExecutorService executorService;
    private AmqpDeviceOperations amqpDeviceOperations;
    private Reactor reactor;
    private SaslListenerImpl saslListener;
    private AmqpListener msgListener;
    private CountDownLatch openLatch;
    private final ObjectLock closeLock;
    private SSLContext sslContext;
    private CustomLogger logger = new CustomLogger();

    public AmqpsConnection(String hostName, AmqpDeviceOperations amqpDeviceOperations, SSLContext sslContext, SaslHandler saslHandler, boolean useWebSockets) throws IOException {
        if (hostName == null || hostName.isEmpty()) {
            throw new IllegalArgumentException("The hostname cannot be null or empty.");
        }
        this.linkCredit = -1;
        this.nextTag = 0L;
        this.amqpDeviceOperations = amqpDeviceOperations;
        this.useWebSockets = useWebSockets;
        if (saslHandler != null) {
            this.saslListener = new SaslListenerImpl(saslHandler);
        }
        this.openLatch = new CountDownLatch(1);
        this.closeLock = new ObjectLock();
        this.sslContext = sslContext;
        this.isOpen = false;
        this.fullHostAddress = String.format("%s:%d", hostName, this.useWebSockets != false ? 443 : 5671);
        this.hostName = hostName;
        this.add((Handler)new Handshaker());
        this.add((Handler)new FlowController());
        try {
            ReactorOptions options = new ReactorOptions();
            options.setEnableSaslByDefault(false);
            this.reactor = Proton.reactor((ReactorOptions)options, (Handler[])new Handler[]{this});
        }
        catch (IOException e) {
            this.logger.LogError(e);
            throw new IOException("Could not create Proton reactor", e);
        }
    }

    public void setListener(AmqpListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("The listener cannot be null.");
        }
        this.msgListener = listener;
    }

    public boolean isConnected() throws Exception {
        if (this.saslListener != null && this.saslListener.getSavedException() != null) {
            throw this.saslListener.getSavedException();
        }
        return this.isOpen;
    }

    public void open() throws IOException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (!this.isOpen.booleanValue()) {
            try {
                this.openAmqpAsync();
            }
            catch (Exception e) {
                this.logger.LogError(e);
                this.close();
                throw new IOException("Error opening Amqp connection: ", e);
            }
            try {
                this.openLatch.await(60000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                this.logger.LogError(e);
                this.close();
                throw new IOException("Waited too long for the connection to open.");
            }
        }
        if (!this.isOpen.booleanValue()) {
            throw new IOException("Failed to open the connection");
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void openAmqpAsync() {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.openLatch = new CountDownLatch(1);
        if (this.executorService == null) {
            this.executorService = Executors.newFixedThreadPool(1);
        }
        AmqpReactor amqpReactor = new AmqpReactor(this.reactor);
        ReactorRunner reactorRunner = new ReactorRunner(amqpReactor, this.logger);
        this.executorService.submit(reactorRunner);
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.isOpen.booleanValue()) {
            this.amqpDeviceOperations.closeLinks();
            if (this.session != null) {
                this.session.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
            if (this.reactor != null) {
                this.reactor.stop();
            }
            try {
                ObjectLock objectLock = this.closeLock;
                synchronized (objectLock) {
                    this.closeLock.waitLock(60000L);
                }
            }
            catch (InterruptedException e) {
                this.logger.LogError(e);
                throw new IOException("Waited too long for the connection to open.");
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                try {
                    if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                        this.executorService.shutdownNow();
                        if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                            this.logger.LogInfo("Pool did not terminate", new Object[0]);
                        }
                    }
                }
                catch (InterruptedException ie) {
                    this.executorService.shutdownNow();
                }
            }
            this.isOpen = false;
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onReactorInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        event.getReactor().connectionToHost(this.hostName, this.useWebSockets != false ? 443 : 5671, (Handler)this);
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onReactorFinal(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.reactor = null;
        ObjectLock objectLock = this.closeLock;
        synchronized (objectLock) {
            this.closeLock.notifyLock();
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onConnectionInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.connection = event.getConnection();
        this.connection.setHostname(this.fullHostAddress);
        this.session = this.connection.session();
        this.connection.open();
        this.session.open();
        try {
            this.amqpDeviceOperations.openLinks(this.session);
        }
        catch (Exception e) {
            this.logger.LogDebug("openLinks has thrown exception: %s", e.getMessage());
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    private SslDomain makeDomain() throws IOException {
        SslDomain domain = Proton.sslDomain();
        domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
        domain.init(SslDomain.Mode.CLIENT);
        domain.setSslContext(this.sslContext);
        return domain;
    }

    public void onConnectionBound(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Transport transport = event.getConnection().getTransport();
        if (transport != null) {
            if (this.saslListener != null) {
                transport.sasl().setListener((SaslListener)this.saslListener);
            }
            if (this.useWebSockets.booleanValue()) {
                WebSocketImpl webSocket = new WebSocketImpl();
                webSocket.configure(this.hostName, WEB_SOCKET_PATH, 0, WEB_SOCKET_SUB_PROTOCOL, null, null);
                ((TransportInternal)transport).addTransportLayer((TransportLayer)webSocket);
            }
            try {
                SslDomain domain = this.makeDomain();
                transport.ssl(domain);
            }
            catch (IOException e) {
                this.logger.LogDebug("onConnectionBound has thrown exception while creating ssl context: %s", e.getMessage());
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onConnectionUnbound(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.isOpen = false;
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        try {
            Link link = event.getLink();
            this.amqpDeviceOperations.initLink(link);
        }
        catch (Exception e) {
            this.logger.LogDebug("Exception in onLinkInit: %s", e.getMessage());
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkRemoteOpen(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        String linkName = event.getLink().getName();
        if (this.amqpDeviceOperations.isReceiverLinkTag(linkName)) {
            this.isOpen = true;
            if (this.msgListener != null) {
                this.msgListener.connectionEstablished();
                this.openLatch.countDown();
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public boolean sendAmqpMessage(AmqpMessage message) throws Exception {
        boolean result;
        if (this.saslListener != null && this.saslListener.getSavedException() != null) {
            throw this.saslListener.getSavedException();
        }
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (!this.isOpen.booleanValue()) {
            result = false;
        } else {
            byte[] msgData = new byte[1024];
            int length = 0;
            boolean encodingComplete = false;
            do {
                try {
                    length = message.encode(msgData, 0);
                    encodingComplete = true;
                }
                catch (BufferOverflowException e) {
                    msgData = new byte[msgData.length * 2];
                }
            } while (!encodingComplete);
            if (length > 0) {
                byte[] tag = String.valueOf(this.nextTag).getBytes();
                this.nextTag = this.nextTag == Integer.MAX_VALUE || this.nextTag < 0L ? 0L : ++this.nextTag;
                this.amqpDeviceOperations.sendMessage(tag, msgData, length, 0);
                result = true;
            } else {
                result = false;
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
        return result;
    }

    public void onDelivery(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        AmqpMessage message = this.amqpDeviceOperations.receiverMessageFromLink(event.getLink().getName());
        if (message == null) {
            if (event.getType() == Event.Type.DELIVERY) {
                Delivery d = event.getDelivery();
                DeliveryState remoteState = d.getRemoteState();
                boolean state = remoteState.equals(Accepted.getInstance());
                d.free();
            }
        } else {
            this.msgListener.messageReceived(message);
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkFlow(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.linkCredit = event.getLink().getCredit();
        this.logger.LogDebug("The link credit value is %s, method name is %s", this.linkCredit, this.logger.getMethodName());
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkRemoteClose(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onTransportError(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.isOpen = false;
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onTransportHeadClosed(Event event) {
        this.openLatch.countDown();
    }

    private class ReactorRunner
    implements Callable {
        private static final String THREAD_NAME = "azure-iot-sdk-ReactorRunner";
        private final AmqpReactor amqpReactor;
        private final CustomLogger logger;

        ReactorRunner(AmqpReactor reactor, CustomLogger logger) {
            this.amqpReactor = reactor;
            this.logger = logger;
        }

        public Object call() {
            Thread.currentThread().setName(THREAD_NAME);
            try {
                this.amqpReactor.run();
            }
            catch (HandlerException e) {
                this.logger.LogError(e);
                throw e;
            }
            return null;
        }
    }
}

