/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.service.transport.amqps;

import com.microsoft.azure.sdk.iot.service.exceptions.IotHubException;
import com.microsoft.azure.sdk.iot.service.messaging.Message;
import com.microsoft.azure.sdk.iot.service.messaging.SendResult;
import com.microsoft.azure.sdk.iot.service.transport.amqps.CloudToDeviceMessage;
import com.microsoft.azure.sdk.iot.service.transport.amqps.LinkStateCallback;
import com.microsoft.azure.sdk.iot.service.transport.amqps.ProtonJExceptionParser;
import com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudToDeviceMessageSenderLinkHandler
extends SenderLinkHandler {
    private static final Logger log = LoggerFactory.getLogger(CloudToDeviceMessageSenderLinkHandler.class);
    private final Queue<CloudToDeviceMessage> outgoingMessageQueue = new ConcurrentLinkedQueue<CloudToDeviceMessage>();
    private final Map<Integer, CloudToDeviceMessage> unacknowledgedMessages = new ConcurrentHashMap<Integer, CloudToDeviceMessage>();

    public CloudToDeviceMessageSenderLinkHandler(Sender sender, String linkCorrelationId, LinkStateCallback linkStateCallback) {
        super(sender, linkCorrelationId, linkStateCallback);
    }

    public void sendAsync(String deviceId, String moduleId, Message iotHubMessage, Consumer<SendResult> callback, Object context) {
        if (moduleId == null) {
            log.trace("Queueing cloud to device message with correlation id {}", (Object)iotHubMessage.getCorrelationId());
        } else {
            log.trace("Queueing cloud to module message with correlation id {}", (Object)iotHubMessage.getCorrelationId());
        }
        this.outgoingMessageQueue.add(new CloudToDeviceMessage(deviceId, moduleId, iotHubMessage, callback, context));
    }

    public void onLinkFlow(Event event) {
        event.getReactor().schedule(200, (Handler)this);
    }

    public void onTimerTask(Event event) {
        this.sendQueuedMessages();
        event.getReactor().schedule(200, (Handler)this);
    }

    private void sendQueuedMessages() {
        CloudToDeviceMessage outgoingMessage = this.outgoingMessageQueue.poll();
        while (outgoingMessage != null) {
            int deliveryTag = this.sendMessageAndGetDeliveryTag(outgoingMessage.getProtonMessage());
            this.unacknowledgedMessages.put(deliveryTag, outgoingMessage);
            outgoingMessage = this.outgoingMessageQueue.poll();
        }
    }

    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        DeliveryState remoteState = delivery.getRemoteState();
        int deliveryTag = Integer.parseInt(new String(delivery.getTag(), StandardCharsets.UTF_8));
        CloudToDeviceMessage message = this.unacknowledgedMessages.remove(deliveryTag);
        if (message != null) {
            Consumer<SendResult> onMessageSentCallback;
            String correlationId = message.getCorrelationId();
            log.trace("Acknowledgement arrived for sent cloud to device message with correlation id {}", (Object)correlationId);
            IotHubException messageException = null;
            if (remoteState instanceof Rejected) {
                String error = ((Rejected)remoteState).getError().getCondition().toString();
                String errorDescription = ((Rejected)remoteState).getError().getDescription();
                messageException = new ProtonJExceptionParser(error, errorDescription).getIotHubException();
            }
            if ((onMessageSentCallback = message.getOnMessageSentCallback()) != null) {
                SendResult sendResult = new SendResult(messageException == null, correlationId, message.getOnMessageSentCallbackContext(), messageException);
                onMessageSentCallback.accept(sendResult);
            }
        } else {
            log.debug("Received an acknowledgement for a cloud to device message that this client did not send");
        }
        delivery.settle();
    }

    public void onConnectionRemoteClose(Event event) {
        super.onConnectionRemoteClose(event);
        event.getTransport().close_tail();
    }

    @Override
    void close() {
        Consumer<SendResult> callback;
        IotHubException exception;
        super.close();
        for (CloudToDeviceMessage unsentMessage : this.outgoingMessageQueue) {
            exception = new IotHubException("Message failed to send because the client was closed while it was still queued.");
            callback = unsentMessage.getOnMessageSentCallback();
            if (callback == null) continue;
            callback.accept(new SendResult(false, unsentMessage.getCorrelationId(), unsentMessage.getOnMessageSentCallbackContext(), exception));
        }
        for (CloudToDeviceMessage unacknowledgedMessage : this.unacknowledgedMessages.values()) {
            exception = new IotHubException("Message failed to send because the client was closed after it was sent, but before it was acknowledged by the service.");
            callback = unacknowledgedMessage.getOnMessageSentCallback();
            if (callback == null) continue;
            callback.accept(new SendResult(false, unacknowledgedMessage.getCorrelationId(), unacknowledgedMessage.getOnMessageSentCallbackContext(), exception));
        }
        this.outgoingMessageQueue.clear();
        this.unacknowledgedMessages.clear();
    }

    @Override
    protected String getLinkInstanceType() {
        return "cloudToDeviceSender";
    }
}

