/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.service.transport.amqps;

import com.microsoft.azure.sdk.iot.service.transport.amqps.LinkStateCallback;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.FlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class SenderLinkHandler
extends BaseHandler {
    private static final Logger log = LoggerFactory.getLogger(SenderLinkHandler.class);
    private static final String API_VERSION_KEY = "com.microsoft:api-version";
    private final Map<Symbol, Object> amqpProperties = new HashMap<Symbol, Object>();
    String senderLinkTag;
    private final String linkCorrelationId;
    String senderLinkAddress;
    final Sender senderLink;
    private long nextTag = 0L;
    private final LinkStateCallback linkStateCallback;

    protected abstract String getLinkInstanceType();

    SenderLinkHandler(Sender sender, String linkCorrelationId, LinkStateCallback linkStateCallback) {
        this.amqpProperties.put(Symbol.getSymbol((String)API_VERSION_KEY), "2021-04-12");
        this.linkCorrelationId = linkCorrelationId;
        this.senderLink = sender;
        this.linkStateCallback = linkStateCallback;
        BaseHandler.setHandler((Extendable)sender, (Handler)this);
        this.add((Handler)new FlowController());
    }

    public void onLinkRemoteOpen(Event event) {
        log.debug("{} sender link with link correlation id {} was successfully opened", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
        this.linkStateCallback.onSenderLinkRemoteOpen();
    }

    public void onLinkLocalOpen(Event event) {
        log.trace("{} sender link with link correlation id {} opened locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
    }

    public void onLinkInit(Event event) {
        Link link = event.getLink();
        Target target = new Target();
        target.setAddress(this.senderLinkAddress);
        link.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        link.setProperties(this.amqpProperties);
        link.open();
        log.trace("Opening {} sender link with correlation id {}", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
    }

    public void onLinkRemoteClose(Event event) {
        Link link = event.getLink();
        if (link.getLocalState() == EndpointState.ACTIVE) {
            log.debug("{} sender link with link correlation id {} was closed remotely unexpectedly", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            link.close();
        } else {
            log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            event.getSession().close();
        }
    }

    public void onLinkLocalClose(Event event) {
        Link link = event.getLink();
        if (link.getRemoteState() == EndpointState.CLOSED) {
            log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            event.getSession().close();
        } else {
            log.trace("{} sender link with correlation id {} was closed locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
        }
    }

    boolean isOpen() {
        return this.senderLink != null && this.senderLink.getLocalState() == EndpointState.ACTIVE && this.senderLink.getRemoteState() == EndpointState.ACTIVE;
    }

    void close() {
        if (this.senderLink.getLocalState() != EndpointState.CLOSED) {
            log.debug("Closing {} sender link with link correlation id {}", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            this.senderLink.close();
        }
    }

    int sendMessageAndGetDeliveryTag(Message protonMessage) {
        int length;
        this.nextTag = this.nextTag == Integer.MAX_VALUE || this.nextTag < 0L ? 0L : ++this.nextTag;
        byte[] msgData = new byte[1024];
        while (true) {
            try {
                length = protonMessage.encode(msgData, 0, msgData.length);
            }
            catch (BufferOverflowException e) {
                msgData = new byte[msgData.length * 2];
                continue;
            }
            break;
        }
        byte[] deliveryTag = String.valueOf(this.nextTag).getBytes(StandardCharsets.UTF_8);
        Delivery delivery = this.senderLink.delivery(deliveryTag);
        try {
            log.trace("Sending {} bytes over the amqp {} sender link with link correlation id {}", new Object[]{length, this.getLinkInstanceType(), this.linkCorrelationId});
            int bytesSent = this.senderLink.send(msgData, 0, length);
            log.trace("{} bytes sent over the amqp {} sender link with link correlation id {}", new Object[]{bytesSent, this.getLinkInstanceType(), this.linkCorrelationId});
            if (bytesSent != length) {
                throw new IOException(String.format("Amqp send operation did not send all of the expected bytes for %s sender link with link correlation id %s, retrying to send the message", this.getLinkInstanceType(), this.linkCorrelationId));
            }
            boolean canAdvance = this.senderLink.advance();
            if (!canAdvance) {
                throw new IOException(String.format("Failed to advance the senderLink after sending a message on %s sender link with link correlation id %s, retrying to send the message", this.getLinkInstanceType(), this.linkCorrelationId));
            }
            String deliveryTagString = new String(deliveryTag, StandardCharsets.UTF_8);
            log.trace("Message was sent over {} sender link with delivery tag {} and hash {}", new Object[]{this.getLinkInstanceType(), deliveryTagString, delivery.hashCode()});
            return Integer.parseInt(deliveryTagString);
        }
        catch (Exception e) {
            log.warn("Encountered a problem while sending a message on {} sender link with link correlation id {}", new Object[]{this.getLinkInstanceType(), this.linkCorrelationId, e});
            this.senderLink.advance();
            delivery.free();
            return -1;
        }
    }
}

