/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.amqp.client;

import jakarta.jms.InvalidDestinationException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.AmqpAbstractResource;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpTransactionId;
import org.apache.activemq.transport.amqp.client.AmqpTransferTagGenerator;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpSender
extends AmqpAbstractResource<Sender> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    public static final long DEFAULT_SEND_TIMEOUT = 15000L;
    public static final Symbol[] DEFAULT_OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AmqpSession session;
    private final String address;
    private final String senderId;
    private final Target userSpecifiedTarget;
    private final SenderSettleMode userSpecifiedSenderSettlementMode;
    private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
    private final Symbol[] outcomes;
    private boolean presettle;
    private long sendTimeout = 15000L;
    private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
    private byte[] encodeBuffer = new byte[8192];
    private Symbol[] desiredCapabilities;
    private Symbol[] offeredCapabilities;
    private Map<Symbol, Object> properties;

    public AmqpSender(AmqpSession session, String address, String senderId) {
        this(session, address, senderId, null, null, DEFAULT_OUTCOMES);
    }

    public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode, Symbol[] outcomes) {
        if (address != null && address.isEmpty()) {
            throw new IllegalArgumentException("Address cannot be empty.");
        }
        this.session = session;
        this.address = address;
        this.senderId = senderId;
        this.userSpecifiedTarget = null;
        this.userSpecifiedSenderSettlementMode = senderMode;
        this.userSpecifiedReceiverSettlementMode = receiverMode;
        this.outcomes = outcomes;
    }

    public AmqpSender(AmqpSession session, Target target, String senderId) {
        if (target == null) {
            throw new IllegalArgumentException("User specified Target cannot be null");
        }
        this.session = session;
        this.address = target.getAddress();
        this.senderId = senderId;
        this.userSpecifiedTarget = target;
        this.userSpecifiedSenderSettlementMode = null;
        this.userSpecifiedReceiverSettlementMode = null;
        this.outcomes = DEFAULT_OUTCOMES;
    }

    public void send(AmqpMessage message) throws IOException {
        this.checkClosed();
        this.send(message, null);
    }

    public void send(AmqpMessage message, AmqpTransactionId txId) throws IOException {
        this.checkClosed();
        ClientFuture sendRequest = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            try {
                this.doSend(message, sendRequest, txId);
                this.session.pumpToProtonTransport(sendRequest);
            }
            catch (Exception e) {
                sendRequest.onFailure(e);
                this.session.getConnection().fireClientException(e);
            }
        });
        if (this.sendTimeout <= 0L) {
            sendRequest.sync();
        } else {
            sendRequest.sync(this.sendTimeout, TimeUnit.MILLISECONDS);
        }
    }

    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            ClientFuture request = new ClientFuture();
            this.session.getScheduler().execute(() -> {
                this.checkClosed();
                this.close(request);
                this.session.pumpToProtonTransport(request);
            });
            request.sync();
        }
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public Sender getSender() {
        return UnmodifiableProxy.senderProxy((Sender)this.getEndpoint());
    }

    public String getAddress() {
        return this.address;
    }

    public boolean isPresettle() {
        return this.presettle;
    }

    public void setPresettle(boolean presettle) {
        this.presettle = presettle;
    }

    public long getSendTimeout() {
        return this.sendTimeout;
    }

    public void setSendTimeout(long sendTimeout) {
        this.sendTimeout = sendTimeout;
    }

    public void setDesiredCapabilities(Symbol[] desiredCapabilities) {
        if (this.getEndpoint() != null) {
            throw new IllegalStateException("Endpoint already established");
        }
        this.desiredCapabilities = desiredCapabilities;
    }

    public void setOfferedCapabilities(Symbol[] offeredCapabilities) {
        if (this.getEndpoint() != null) {
            throw new IllegalStateException("Endpoint already established");
        }
        this.offeredCapabilities = offeredCapabilities;
    }

    public void setProperties(Map<Symbol, Object> properties) {
        if (this.getEndpoint() != null) {
            throw new IllegalStateException("Endpoint already established");
        }
        this.properties = properties;
    }

    private void checkClosed() {
        if (this.isClosed()) {
            throw new IllegalStateException("Sender is already closed");
        }
    }

    @Override
    protected void doOpen() {
        Source source = new Source();
        source.setAddress(this.senderId);
        source.setOutcomes(this.outcomes);
        Target target = this.userSpecifiedTarget;
        if (target == null) {
            target = new Target();
            target.setAddress(this.address);
        }
        String senderName = this.senderId + ":" + this.address;
        Sender sender = ((Session)this.session.getEndpoint()).sender(senderName);
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        sender.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        if (this.userSpecifiedSenderSettlementMode != null) {
            sender.setSenderSettleMode(this.userSpecifiedSenderSettlementMode);
            if (SenderSettleMode.SETTLED.equals((Object)this.userSpecifiedSenderSettlementMode)) {
                this.presettle = true;
            }
        } else if (this.presettle) {
            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
        } else {
            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        }
        if (this.userSpecifiedReceiverSettlementMode != null) {
            sender.setReceiverSettleMode(this.userSpecifiedReceiverSettlementMode);
        } else {
            sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        }
        sender.setDesiredCapabilities(this.desiredCapabilities);
        sender.setOfferedCapabilities(this.offeredCapabilities);
        sender.setProperties(this.properties);
        this.setEndpoint(sender);
        super.doOpen();
    }

    @Override
    protected void doOpenCompletion() {
        org.apache.qpid.proton.amqp.transport.Target t = ((Sender)this.getEndpoint()).getRemoteTarget();
        if (t != null) {
            super.doOpenCompletion();
        }
    }

    @Override
    protected void doOpenInspection() {
        try {
            this.getStateInspector().inspectOpenedResource(this.getSender());
        }
        catch (Throwable error) {
            this.getStateInspector().markAsInvalid(error.getMessage());
        }
    }

    @Override
    protected void doClosedInspection() {
        try {
            this.getStateInspector().inspectClosedResource(this.getSender());
        }
        catch (Throwable error) {
            this.getStateInspector().markAsInvalid(error.getMessage());
        }
    }

    @Override
    protected void doDetachedInspection() {
        try {
            this.getStateInspector().inspectDetachedResource(this.getSender());
        }
        catch (Throwable error) {
            this.getStateInspector().markAsInvalid(error.getMessage());
        }
    }

    protected void doDeliveryUpdateInspection(Delivery delivery) {
        try {
            this.getStateInspector().inspectDeliveryUpdate(this.getSender(), delivery);
        }
        catch (Throwable error) {
            this.getStateInspector().markAsInvalid(error.getMessage());
        }
    }

    private void doCreditInspection() {
        try {
            this.getStateInspector().inspectCredit(this.getSender());
        }
        catch (Throwable error) {
            this.getStateInspector().markAsInvalid(error.getMessage());
        }
    }

    @Override
    protected Exception getOpenAbortException() {
        org.apache.qpid.proton.amqp.transport.Target t = ((Sender)this.getEndpoint()).getRemoteTarget();
        if (t != null) {
            return super.getOpenAbortException();
        }
        return new InvalidDestinationException("Link creation was refused");
    }

    private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception {
        logger.trace("Producer sending message: {}", (Object)message);
        Delivery delivery = null;
        if (this.presettle) {
            delivery = ((Sender)this.getEndpoint()).delivery(EMPTY_BYTE_ARRAY, 0, 0);
        } else {
            byte[] tag = this.tagGenerator.getNextTag();
            delivery = ((Sender)this.getEndpoint()).delivery(tag, 0, tag.length);
        }
        delivery.setContext((Object)request);
        Binary amqpTxId = null;
        if (txId != null) {
            amqpTxId = txId.getRemoteTxId();
        } else if (this.session.isInTransaction()) {
            amqpTxId = this.session.getTransactionId().getRemoteTxId();
        }
        if (amqpTxId != null) {
            TransactionalState state = new TransactionalState();
            state.setTxnId(amqpTxId);
            delivery.disposition((DeliveryState)state);
        }
        this.encodeAndSend(message.getWrappedMessage(), delivery);
        if (this.presettle) {
            delivery.settle();
            request.onSuccess();
        } else {
            this.pending.add(delivery);
            ((Sender)this.getEndpoint()).advance();
        }
    }

    private void encodeAndSend(Message message, Delivery delivery) throws IOException {
        int encodedSize;
        while (true) {
            try {
                encodedSize = message.encode(this.encodeBuffer, 0, this.encodeBuffer.length);
            }
            catch (BufferOverflowException e) {
                this.encodeBuffer = new byte[this.encodeBuffer.length * 2];
                continue;
            }
            break;
        }
        int sentSoFar = 0;
        while (true) {
            int sent;
            if ((sent = ((Sender)this.getEndpoint()).send(this.encodeBuffer, sentSoFar, encodedSize - sentSoFar)) > 0) {
                if (encodedSize - (sentSoFar += sent) != 0) continue;
                break;
            }
            logger.warn("{} failed to send any data from current Message.", (Object)this);
        }
    }

    @Override
    public void processFlowUpdates(AmqpConnection connection) throws IOException {
        logger.trace("Sender {} flow update, credit = {}", (Object)((Sender)this.getEndpoint()).getCredit());
        this.doCreditInspection();
    }

    @Override
    public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException {
        ArrayList<Delivery> toRemove = new ArrayList<Delivery>();
        for (Delivery delivery : this.pending) {
            DeliveryState state = delivery.getRemoteState();
            if (state == null) continue;
            this.doDeliveryUpdateInspection(delivery);
            Outcome outcome = null;
            if (state instanceof TransactionalState) {
                logger.trace("State of delivery is Transactional, retrieving outcome: {}", (Object)state);
                outcome = ((TransactionalState)state).getOutcome();
            } else if (state instanceof Outcome) {
                outcome = (Outcome)state;
            } else {
                logger.warn("Message send updated with unsupported state: {}", (Object)state);
                outcome = null;
            }
            AsyncResult request = (AsyncResult)delivery.getContext();
            Exception deliveryError = null;
            if (outcome instanceof Accepted) {
                logger.trace("Outcome of delivery was accepted: {}", (Object)delivery);
                if (request != null && !request.isComplete()) {
                    request.onSuccess();
                }
            } else if (outcome instanceof Rejected) {
                logger.trace("Outcome of delivery was rejected: {}", (Object)delivery);
                ErrorCondition remoteError = ((Rejected)outcome).getError();
                if (remoteError == null) {
                    remoteError = ((Sender)this.getEndpoint()).getRemoteCondition();
                }
                deliveryError = AmqpSupport.convertToException(remoteError);
            } else if (outcome instanceof Released) {
                logger.trace("Outcome of delivery was released: {}", (Object)delivery);
                deliveryError = new IOException("Delivery failed: released by receiver");
            } else if (outcome instanceof Modified) {
                logger.trace("Outcome of delivery was modified: {}", (Object)delivery);
                deliveryError = new IOException("Delivery failed: failure at remote");
            }
            if (deliveryError != null) {
                if (request != null && !request.isComplete()) {
                    request.onFailure(deliveryError);
                } else {
                    connection.fireClientException(deliveryError);
                }
            }
            this.tagGenerator.returnTag(delivery.getTag());
            delivery.settle();
            toRemove.add(delivery);
        }
        this.pending.removeAll(toRemove);
    }

    public String toString() {
        return this.getClass().getSimpleName() + "{ address = " + this.address + "}";
    }
}

