/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.service.transport.amqps;

import com.microsoft.azure.sdk.iot.deps.auth.IotHubSSLContext;
import com.microsoft.azure.sdk.iot.deps.ws.impl.WebSocketImpl;
import com.microsoft.azure.sdk.iot.service.IotHubServiceClientProtocol;
import com.microsoft.azure.sdk.iot.service.Message;
import com.microsoft.azure.sdk.iot.service.Tools;
import com.microsoft.azure.sdk.iot.service.exceptions.IotHubException;
import com.microsoft.azure.sdk.iot.service.transport.TransportUtils;
import com.microsoft.azure.sdk.iot.service.transport.amqps.AmqpResponseVerification;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.apache.qpid.proton.reactor.Handshaker;

public class AmqpSendHandler
extends BaseHandler {
    public static final String SEND_TAG = "sender";
    public static final String SEND_PORT_AMQPS = ":5671";
    public static final String SEND_PORT_AMQPS_WS = ":443";
    public static final String ENDPOINT = "/messages/devicebound";
    public static final String DEVICE_PATH_FORMAT = "/devices/%s/messages/devicebound";
    public static final String MODULE_PATH_FORMAT = "/devices/%s/modules/%s/messages/devicebound";
    public static final String WEBSOCKET_PATH = "/$iothub/websocket";
    public static final String WEBSOCKET_SUB_PROTOCOL = "AMQPWSB10";
    private Queue<AmqpResponseVerification> sendStatusQueue = new LinkedBlockingQueue<AmqpResponseVerification>();
    private Queue<org.apache.qpid.proton.message.Message> messagesToBeSent = new LinkedBlockingQueue<org.apache.qpid.proton.message.Message>();
    protected final String hostName;
    protected final String userName;
    protected final String sasToken;
    private int nextTag = 0;
    protected final IotHubServiceClientProtocol iotHubServiceClientProtocol;
    protected final String webSocketHostName;
    private boolean isConnected = false;
    private Exception savedException = null;
    private boolean connectionWasOpened = false;

    public AmqpSendHandler(String hostName, String userName, String sasToken, IotHubServiceClientProtocol iotHubServiceClientProtocol) {
        if (Tools.isNullOrEmpty(hostName).booleanValue()) {
            throw new IllegalArgumentException("hostName can not be null or empty");
        }
        if (Tools.isNullOrEmpty(userName).booleanValue()) {
            throw new IllegalArgumentException("userName can not be null or empty");
        }
        if (Tools.isNullOrEmpty(sasToken).booleanValue()) {
            throw new IllegalArgumentException("sasToken can not be null or empty");
        }
        if (iotHubServiceClientProtocol == null) {
            throw new IllegalArgumentException("iotHubServiceClientProtocol cannot be null");
        }
        this.iotHubServiceClientProtocol = iotHubServiceClientProtocol;
        this.webSocketHostName = hostName;
        this.hostName = this.iotHubServiceClientProtocol == IotHubServiceClientProtocol.AMQPS_WS ? hostName + SEND_PORT_AMQPS_WS : hostName + SEND_PORT_AMQPS;
        this.userName = userName;
        this.sasToken = sasToken;
        this.add((Handler)new Handshaker());
        this.isConnected = false;
    }

    public void createProtonMessage(String deviceId, Message message) {
        this.populateProtonMessage(String.format(DEVICE_PATH_FORMAT, deviceId), message);
    }

    public void createProtonMessage(String deviceId, String moduleId, Message message) {
        this.populateProtonMessage(String.format(MODULE_PATH_FORMAT, deviceId, moduleId), message);
    }

    private void populateProtonMessage(String targetPath, Message message) {
        org.apache.qpid.proton.message.Message protonMessage = Proton.message();
        Properties properties = new Properties();
        properties.setMessageId((Object)message.getMessageId());
        properties.setTo(targetPath);
        properties.setAbsoluteExpiryTime(message.getExpiryTimeUtc());
        properties.setCorrelationId((Object)message.getCorrelationId());
        if (message.getUserId() != null) {
            properties.setUserId(new Binary(message.getUserId().getBytes()));
        }
        protonMessage.setProperties(properties);
        if (message.getProperties() != null && message.getProperties().size() > 0) {
            HashMap<String, String> applicationPropertiesMap = new HashMap<String, String>(message.getProperties().size());
            for (Map.Entry<String, String> entry : message.getProperties().entrySet()) {
                applicationPropertiesMap.put(entry.getKey(), entry.getValue());
            }
            ApplicationProperties applicationProperties = new ApplicationProperties(applicationPropertiesMap);
            protonMessage.setApplicationProperties(applicationProperties);
        }
        Binary binary = new Binary(message.getBytes());
        Data section = new Data(binary);
        protonMessage.setBody((Section)section);
        this.messagesToBeSent.add(protonMessage);
    }

    private SslDomain makeDomain(SslDomain.Mode mode) {
        SslDomain domain = Proton.sslDomain();
        try {
            domain.setSslContext(new IotHubSSLContext().getSSLContext());
        }
        catch (Exception e) {
            this.savedException = e;
        }
        domain.init(mode);
        return domain;
    }

    public void onConnectionBound(Event event) {
        Transport transport = event.getConnection().getTransport();
        if (transport != null) {
            if (this.iotHubServiceClientProtocol == IotHubServiceClientProtocol.AMQPS_WS) {
                WebSocketImpl webSocket = new WebSocketImpl();
                webSocket.configure(this.webSocketHostName, WEBSOCKET_PATH, 0, WEBSOCKET_SUB_PROTOCOL, null, null);
                ((TransportInternal)transport).addTransportLayer((TransportLayer)webSocket);
            }
            Sasl sasl = transport.sasl();
            sasl.plain(this.userName, this.sasToken);
            SslDomain domain = this.makeDomain(SslDomain.Mode.CLIENT);
            domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
            Ssl ssl = transport.ssl(domain);
        }
    }

    public void onConnectionInit(Event event) {
        Connection conn = event.getConnection();
        conn.setHostname(this.hostName);
        Session ssn = conn.session();
        HashMap<Symbol, String> properties = new HashMap<Symbol, String>();
        properties.put(Symbol.getSymbol((String)"com.microsoft:client-version"), TransportUtils.USER_AGENT_STRING);
        Sender snd = ssn.sender(SEND_TAG);
        snd.setProperties(properties);
        conn.open();
        ssn.open();
        snd.open();
        this.isConnected = true;
    }

    public void onTransportError(Event event) {
        this.isConnected = false;
        this.savedException = new IOException("A Transport error occurred");
    }

    public void onLinkInit(Event event) {
        Link link = event.getLink();
        Target t = new Target();
        t.setAddress(ENDPOINT);
        link.setTarget((org.apache.qpid.proton.amqp.transport.Target)t);
    }

    public void onLinkFlow(Event event) {
        if (!this.messagesToBeSent.isEmpty()) {
            org.apache.qpid.proton.message.Message protonMessage = this.messagesToBeSent.remove();
            Sender snd = (Sender)event.getLink();
            if (snd.getCredit() > 0) {
                int length;
                byte[] msgData = new byte[1024];
                while (true) {
                    try {
                        length = protonMessage.encode(msgData, 0, msgData.length);
                    }
                    catch (BufferOverflowException e) {
                        msgData = new byte[msgData.length * 2];
                        continue;
                    }
                    break;
                }
                byte[] tag = String.valueOf(this.nextTag).getBytes();
                this.nextTag = this.nextTag == Integer.MAX_VALUE || this.nextTag < 0 ? 0 : ++this.nextTag;
                Delivery dlv = snd.delivery(tag);
                snd.send(msgData, 0, length);
                snd.advance();
            }
        }
    }

    public void onLinkRemoteOpen(Event event) {
        this.connectionWasOpened = true;
    }

    public void onDelivery(Event event) {
        if (event.getType() == Event.Type.DELIVERY) {
            Delivery d = event.getDelivery();
            DeliveryState remoteState = d.getRemoteState();
            this.sendStatusQueue.add(new AmqpResponseVerification(remoteState));
            d.settle();
            Sender snd = event.getSender();
            snd.close();
            snd.getSession().close();
            snd.getSession().getConnection().close();
            this.isConnected = false;
        }
    }

    public void onConnectionRemoteClose(Event event) {
        event.getTransport().close_tail();
    }

    public void sendComplete() throws IotHubException, IOException {
        AmqpResponseVerification verifier;
        if (this.savedException != null) {
            throw new IOException("Connection failed to be established", this.savedException);
        }
        if (!this.connectionWasOpened) {
            throw new IOException("Connection failed to open");
        }
        if (!this.sendStatusQueue.isEmpty() && (verifier = this.sendStatusQueue.remove()).getException() != null) {
            throw verifier.getException();
        }
    }
}

