/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.proton;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;

public class ProtonServerSenderContext
extends ProtonInitializable
implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
    private static final Symbol COPY = Symbol.valueOf((String)"copy");
    private static final Symbol TOPIC = Symbol.valueOf((String)"topic");
    private static final Symbol QUEUE = Symbol.valueOf((String)"queue");
    private static final Symbol SHARED = Symbol.valueOf((String)"shared");
    private static final Symbol GLOBAL = Symbol.valueOf((String)"global");
    private Object brokerConsumer;
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
    protected final AMQPConnectionContext connection;
    protected boolean closed = false;
    protected final AMQPSessionCallback sessionSPI;
    private boolean multicast;
    private RoutingType defaultRoutingType = RoutingType.ANYCAST;
    protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
    private RoutingType routingTypeToUse = this.defaultRoutingType;
    private boolean shared = false;
    private boolean global = false;
    private boolean isVolatile = false;

    public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
        this.connection = connection;
        this.sender = sender;
        this.protonSession = protonSession;
        this.sessionSPI = server;
    }

    public Object getBrokerConsumer() {
        return this.brokerConsumer;
    }

    @Override
    public void onFlow(int currentCredits, boolean drain) {
        this.creditsSemaphore.setCredits(currentCredits);
        this.sessionSPI.onFlowConsumer(this.brokerConsumer, currentCredits, drain);
    }

    public Sender getSender() {
        return this.sender;
    }

    public void start() throws ActiveMQAMQPException {
        this.sessionSPI.start();
        try {
            if (this.brokerConsumer != null) {
                this.sessionSPI.startSender(this.brokerConsumer);
            }
        }
        catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void initialise() throws Exception {
        Map.Entry<Symbol, DescribedType> filter;
        super.initialise();
        Source source = (Source)this.sender.getRemoteSource();
        String queue = null;
        String selector = null;
        HashMap<Symbol, DescribedType> supportedFilters = new HashMap<Symbol, DescribedType>();
        if (source != null && (filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS)) != null) {
            selector = filter.getValue().getDescribed().toString();
            try {
                SelectorParser.parse((String)selector);
            }
            catch (FilterException e) {
                throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
            }
            supportedFilters.put(filter.getKey(), filter.getValue());
        }
        if (source == null) {
            String clientId = this.getClientId();
            String pubId = this.sender.getName();
            queue = ProtonServerSenderContext.createQueueName(clientId, pubId, true, this.global, false);
            QueueQueryResult result = this.sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
            this.multicast = true;
            this.routingTypeToUse = RoutingType.MULTICAST;
            if (!result.isExists()) throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + this.sender.getName());
            source = new Source();
            source.setAddress(queue);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDistributionMode(COPY);
            source.setCapabilities(new Symbol[]{TOPIC});
            SimpleString filterString = result.getFilterString();
            if (filterString != null) {
                selector = filterString.toString();
                boolean noLocal = false;
                String remoteContainerId = this.sender.getSession().getConnection().getRemoteContainer();
                String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
                if (selector.endsWith(noLocalFilter)) {
                    if (selector.length() > noLocalFilter.length()) {
                        noLocalFilter = " AND " + noLocalFilter;
                        selector = selector.substring(0, selector.length() - noLocalFilter.length());
                    } else {
                        selector = null;
                    }
                    noLocal = true;
                }
                if (noLocal) {
                    supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                }
                if (selector != null && !selector.trim().isEmpty()) {
                    supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
                }
            }
            this.sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        } else if (source.getDynamic()) {
            queue = UUID.randomUUID().toString();
            try {
                this.sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
            }
            catch (Exception e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
            }
            source.setAddress(queue);
        } else {
            SimpleString matchingAnycastQueue;
            Set routingTypes;
            AddressQueryResult addressQueryResult;
            boolean clientDefined;
            SimpleString addressToUse;
            SimpleString queueNameToUse = null;
            this.shared = ProtonServerSenderContext.hasCapabilities(SHARED, source);
            this.global = ProtonServerSenderContext.hasCapabilities(GLOBAL, source);
            if (CompositeAddress.isFullyQualified((String)source.getAddress())) {
                CompositeAddress compositeAddress = CompositeAddress.getQueueName((String)source.getAddress());
                addressToUse = new SimpleString(compositeAddress.getAddress());
                queueNameToUse = new SimpleString(compositeAddress.getQueueName());
            } else {
                addressToUse = new SimpleString(source.getAddress());
            }
            boolean bl = clientDefined = ProtonServerSenderContext.hasCapabilities(TOPIC, source) || ProtonServerSenderContext.hasCapabilities(QUEUE, source);
            if (clientDefined) {
                this.multicast = ProtonServerSenderContext.hasCapabilities(TOPIC, source);
                addressQueryResult = this.sessionSPI.addressQuery(addressToUse.toString(), this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
                if (!addressQueryResult.isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
                routingTypes = addressQueryResult.getRoutingTypes();
                if (this.multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
                    throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support");
                }
                if (!this.multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
                    throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
                }
            } else {
                addressQueryResult = this.sessionSPI.addressQuery(addressToUse.toString(), this.defaultRoutingType, true);
                if (!addressQueryResult.isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
                routingTypes = addressQueryResult.getRoutingTypes();
                this.multicast = routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1;
            }
            RoutingType routingType = this.routingTypeToUse = this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
            if (this.multicast) {
                Map.Entry<Symbol, DescribedType> filter2 = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
                if (filter2 != null) {
                    String remoteContainerId = this.sender.getSession().getConnection().getRemoteContainer();
                    String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
                    selector = selector != null ? selector + " AND " + noLocalFilter : noLocalFilter;
                    supportedFilters.put(filter2.getKey(), filter2.getValue());
                }
                if (queueNameToUse != null) {
                    SimpleString matchingAnycastQueue2 = this.sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
                    queue = matchingAnycastQueue2.toString();
                }
                if (queue != null) {
                    this.multicast = false;
                } else if (TerminusDurability.UNSETTLED_STATE.equals((Object)source.getDurable()) || TerminusDurability.CONFIGURATION.equals((Object)source.getDurable())) {
                    String pubId;
                    String clientId = this.getClientId();
                    queue = ProtonServerSenderContext.createQueueName(clientId, pubId = this.sender.getName(), this.shared, this.global, false);
                    QueueQueryResult result = this.sessionSPI.queueQuery(queue, this.routingTypeToUse, false);
                    if (result.isExists()) {
                        if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString((String)selector)) || this.sender.getSource() != null && !this.sender.getSource().getAddress().equals(result.getAddress().toString())) {
                            if (result.getConsumerCount() != 0) throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                            this.sessionSPI.deleteQueue(queue);
                            this.sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                        }
                    } else if (this.shared) {
                        this.sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                    } else {
                        this.sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                    }
                } else {
                    this.isVolatile = true;
                    if (this.shared && this.sender.getName() != null) {
                        queue = ProtonServerSenderContext.createQueueName(this.getClientId(), this.sender.getName(), this.shared, this.global, this.isVolatile);
                        try {
                            this.sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                        }
                        catch (ActiveMQQueueExistsException clientId) {}
                    } else {
                        queue = UUID.randomUUID().toString();
                        try {
                            this.sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
                        }
                        catch (Exception e) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                        }
                    }
                }
            } else if (queueNameToUse != null) {
                matchingAnycastQueue = this.sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
                if (matchingAnycastQueue == null) throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                queue = matchingAnycastQueue.toString();
            } else {
                matchingAnycastQueue = this.sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
                queue = matchingAnycastQueue != null ? matchingAnycastQueue.toString() : addressToUse.toString();
            }
            if (queue == null) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
            }
            try {
                if (!this.sessionSPI.queueQuery(queue, this.routingTypeToUse, !this.multicast).isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
            }
            catch (ActiveMQAMQPNotFoundException e) {
                throw e;
            }
            catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }
        }
        source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
        boolean browseOnly = !this.multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
        try {
            this.brokerConsumer = this.sessionSPI.createSender(this, queue, this.multicast ? null : selector, browseOnly);
            return;
        }
        catch (ActiveMQAMQPResourceLimitExceededException e1) {
            throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
        }
        catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
        }
    }

    protected String getClientId() {
        return this.connection.getRemoteContainer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
        this.closed = true;
        if (condition != null) {
            this.sender.setCondition(condition);
        }
        this.protonSession.removeSender(this.sender);
        Object object = this.connection.getLock();
        synchronized (object) {
            this.sender.close();
        }
        this.connection.flush();
        try {
            this.sessionSPI.closeSender(this.brokerConsumer);
        }
        catch (Exception e) {
            log.warn((Object)e.getMessage(), (Throwable)e);
            throw new ActiveMQAMQPInternalErrorException(e.getMessage());
        }
    }

    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
        try {
            this.closed = true;
            this.sessionSPI.closeSender(this.brokerConsumer);
            if (remoteLinkClose) {
                Source source = (Source)this.sender.getSource();
                if (source != null && source.getAddress() != null && this.multicast) {
                    String queueName = source.getAddress();
                    QueueQueryResult result = this.sessionSPI.queueQuery(queueName, this.routingTypeToUse, false);
                    if (result.isExists() && source.getDynamic()) {
                        this.sessionSPI.deleteQueue(queueName);
                    } else {
                        String queue;
                        String clientId = this.getClientId();
                        String pubId = this.sender.getName();
                        if (pubId.contains("|")) {
                            pubId = pubId.split("\\|")[0];
                        }
                        if ((result = this.sessionSPI.queueQuery(queue = ProtonServerSenderContext.createQueueName(clientId, pubId, this.shared, this.global, this.isVolatile), this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false)).isExists() && !this.isVolatile && result.getConsumerCount() == 0) {
                            this.sessionSPI.deleteQueue(queue);
                        }
                    }
                } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
                    try {
                        this.sessionSPI.removeTemporaryQueue(source.getAddress());
                    }
                    catch (Exception exception) {}
                }
            }
        }
        catch (Exception e) {
            log.warn((Object)e.getMessage(), (Throwable)e);
            throw new ActiveMQAMQPInternalErrorException(e.getMessage());
        }
    }

    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        if (this.closed) {
            return;
        }
        Message message = ((MessageReference)delivery.getContext()).getMessage();
        boolean preSettle = this.sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
        DeliveryState remoteState = delivery.getRemoteState();
        boolean settleImmediate = true;
        if (remoteState != null) {
            if (remoteState instanceof TransactionalState) {
                TransactionalState txState = (TransactionalState)remoteState;
                ProtonTransactionImpl tx = (ProtonTransactionImpl)this.sessionSPI.getTransaction(txState.getTxnId());
                if (txState.getOutcome() != null) {
                    settleImmediate = false;
                    Outcome outcome = txState.getOutcome();
                    if (outcome instanceof Accepted) {
                        if (!delivery.remotelySettled()) {
                            TransactionalState txAccepted = new TransactionalState();
                            txAccepted.setOutcome((Outcome)Accepted.getInstance());
                            txAccepted.setTxnId(txState.getTxnId());
                            delivery.disposition((DeliveryState)txAccepted);
                        }
                        try {
                            this.sessionSPI.ack((Transaction)tx, this.brokerConsumer, message);
                            tx.addDelivery(delivery, this);
                        }
                        catch (Exception e) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
                        }
                    }
                }
            } else {
                if (remoteState instanceof Accepted) {
                    if (delivery.isSettled()) {
                        return;
                    }
                    try {
                        this.sessionSPI.ack(null, this.brokerConsumer, message);
                    }
                    catch (Exception e) {
                        log.warn((Object)e.toString(), (Throwable)e);
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
                    }
                }
                if (remoteState instanceof Released) {
                    try {
                        this.sessionSPI.cancel(this.brokerConsumer, message, false);
                    }
                    catch (Exception e) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                    }
                }
                if (remoteState instanceof Rejected) {
                    try {
                        this.sessionSPI.cancel(this.brokerConsumer, message, true);
                    }
                    catch (Exception e) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                    }
                }
                if (remoteState instanceof Modified) {
                    try {
                        Modified modification = (Modified)remoteState;
                        if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
                            this.sessionSPI.cancel(this.brokerConsumer, message, true);
                        } else {
                            this.sessionSPI.cancel(this.brokerConsumer, message, false);
                        }
                    }
                    catch (Exception e) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                    }
                }
            }
            if (!preSettle) {
                this.protonSession.replaceTag(delivery.getTag());
            }
            if (settleImmediate) {
                this.settle(delivery);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void settle(Delivery delivery) {
        Object object = this.connection.getLock();
        synchronized (object) {
            delivery.settle();
        }
    }

    public synchronized void checkState() {
        this.sessionSPI.resumeDelivery(this.brokerConsumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int deliverMessage(MessageReference messageReference, int deliveryCount) throws Exception {
        AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
        if (this.closed) {
            return 0;
        }
        if (!this.creditsSemaphore.tryAcquire()) {
            try {
                this.creditsSemaphore.acquire();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        boolean preSettle = this.sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
        byte[] tag = preSettle ? new byte[]{} : this.protonSession.getTag();
        ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
        try {
            message.sendBuffer(nettyBuffer, deliveryCount);
            int size = nettyBuffer.writerIndex();
            Object object = this.connection.getLock();
            synchronized (object) {
                block14: {
                    if (this.sender.getLocalState() != EndpointState.CLOSED) break block14;
                    int n = 0;
                    return n;
                }
                Delivery delivery = this.sender.delivery(tag, 0, tag.length);
                delivery.setMessageFormat((int)message.getMessageFormat());
                delivery.setContext((Object)messageReference);
                this.sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
                if (preSettle) {
                    this.sessionSPI.ack(null, this.brokerConsumer, messageReference.getMessage());
                    delivery.settle();
                } else {
                    this.sender.advance();
                }
            }
            this.connection.flush();
            int n = size;
            return n;
        }
        finally {
            nettyBuffer.release();
        }
    }

    private static boolean hasCapabilities(Symbol symbol, Source source) {
        if (source != null && source.getCapabilities() != null) {
            for (Symbol cap : source.getCapabilities()) {
                if (!symbol.equals(cap)) continue;
                return true;
            }
        }
        return false;
    }

    private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
        String queue;
        String string = queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
        if (shared) {
            if (queue.contains("|")) {
                queue = queue.split("\\|")[0];
            }
            if (isVolatile) {
                queue = queue + ":shared-volatile";
            }
            if (global) {
                queue = queue + ":global";
            }
        }
        return queue;
    }
}

