/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.broker;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.RunnableEx;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;

public class AMQPSessionCallback
implements SessionCallback {
    private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0L);
    private final AMQPConnectionCallback protonSPI;
    private final ProtonProtocolManager manager;
    private final StorageManager storageManager;
    private final AMQPConnectionContext connection;
    private final Connection transportConnection;
    private ServerSession serverSession;
    private final OperationContext operationContext;
    private AMQPSessionContext protonSession;
    private final Executor sessionExecutor;
    private final AtomicBoolean draining = new AtomicBoolean(false);

    public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, Connection transportConnection, Executor executor, OperationContext operationContext) {
        this.protonSPI = protonSPI;
        this.manager = manager;
        this.storageManager = manager.getServer().getStorageManager();
        this.connection = connection;
        this.transportConnection = transportConnection;
        this.sessionExecutor = executor;
        this.operationContext = operationContext;
    }

    public boolean isWritable(ReadyListener callback, Object protocolContext) {
        ProtonServerSenderContext senderContext = (ProtonServerSenderContext)protocolContext;
        return this.transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED;
    }

    public void onFlowConsumer(Object consumer, int credits, boolean drain) {
        ServerConsumerImpl serverConsumer = (ServerConsumerImpl)consumer;
        if (drain) {
            if (this.draining.compareAndSet(false, true)) {
                final ProtonServerSenderContext plugSender = (ProtonServerSenderContext)serverConsumer.getProtocolContext();
                serverConsumer.forceDelivery(1L, new Runnable(){

                    @Override
                    public void run() {
                        try {
                            plugSender.reportDrained();
                        }
                        finally {
                            AMQPSessionCallback.this.draining.set(false);
                        }
                    }
                });
            }
        } else {
            serverConsumer.receiveCredits(-1);
        }
    }

    public void withinContext(RunnableEx run) throws Exception {
        OperationContext context = this.recoverContext();
        try {
            run.run();
        }
        finally {
            this.resetContext(context);
        }
    }

    public void afterIO(IOCallback ioCallback) {
        OperationContext context = this.recoverContext();
        try {
            this.manager.getServer().getStorageManager().afterCompleteOperations(ioCallback);
        }
        finally {
            this.resetContext(context);
        }
    }

    public void browserFinished(ServerConsumer consumer) {
    }

    public boolean supportsDirectDelivery() {
        return false;
    }

    public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
        this.protonSession = protonSession;
        String name = UUIDGenerator.getInstance().generateStringUUID();
        String user = null;
        String passcode = null;
        if (saslResult != null) {
            user = saslResult.getUser();
            if (saslResult instanceof PlainSASLResult) {
                passcode = ((PlainSASLResult)saslResult).getPassword();
            }
        }
        this.serverSession = this.manager.getServer().createSession(name, user, passcode, 102400, (RemotingConnection)this.protonSPI.getProtonConnectionDelegate(), false, false, false, true, (String)null, (SessionCallback)this, true, this.operationContext, this.manager.getPrefixes());
    }

    public void afterDelivery() throws Exception {
    }

    public void start() {
    }

    public Object createSender(ProtonServerSenderContext protonSender, String queue, String filter, boolean browserOnly) throws Exception {
        long consumerID = this.consumerIDGenerator.generateID();
        filter = SelectorTranslator.convertToActiveMQFilterString((String)filter);
        ServerConsumer consumer = this.serverSession.createConsumer(consumerID, SimpleString.toSimpleString((String)queue), SimpleString.toSimpleString((String)filter), browserOnly);
        consumer.setStarted(true);
        consumer.setProtocolContext((Object)protonSender);
        return consumer;
    }

    public void startSender(Object brokerConsumer) throws Exception {
        ServerConsumer serverConsumer = (ServerConsumer)brokerConsumer;
        serverConsumer.receiveCredits(-1);
    }

    public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)queueName), SimpleString.toSimpleString((String)queueName), routingType, null, true, false);
    }

    public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)address), SimpleString.toSimpleString((String)queueName), routingType, SimpleString.toSimpleString((String)filter), true, false);
    }

    public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)address), SimpleString.toSimpleString((String)queueName), routingType, SimpleString.toSimpleString((String)filter), false, true, 1, false, false);
    }

    public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)address), SimpleString.toSimpleString((String)queueName), routingType, SimpleString.toSimpleString((String)filter), false, true, -1, false, false);
    }

    public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)address), SimpleString.toSimpleString((String)queueName), routingType, SimpleString.toSimpleString((String)filter), false, false, -1, true, true);
    }

    public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
        QueueQueryResult queueQueryResult = this.serverSession.executeQueueQuery(SimpleString.toSimpleString((String)queueName));
        if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
            try {
                this.serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true);
            }
            catch (ActiveMQQueueExistsException activeMQQueueExistsException) {
                // empty catch block
            }
            queueQueryResult = this.serverSession.executeQueueQuery(SimpleString.toSimpleString((String)queueName));
        }
        if (queueQueryResult.getRoutingType() != routingType) {
            throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
        }
        return queueQueryResult;
    }

    public boolean bindingQuery(String address) throws Exception {
        BindingQueryResult bindingQueryResult = this.serverSession.executeBindingQuery(SimpleString.toSimpleString((String)address));
        if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
            try {
                this.serverSession.createQueue(new SimpleString(address), new SimpleString(address), RoutingType.ANYCAST, null, false, true);
            }
            catch (ActiveMQQueueExistsException activeMQQueueExistsException) {
                // empty catch block
            }
            bindingQueryResult = this.serverSession.executeBindingQuery(SimpleString.toSimpleString((String)address));
        }
        return bindingQueryResult.isExists();
    }

    public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
        AddressQueryResult addressQueryResult = this.serverSession.executeAddressQuery(SimpleString.toSimpleString((String)addressName));
        if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
            try {
                this.serverSession.createAddress(SimpleString.toSimpleString((String)addressName), routingType, true);
            }
            catch (ActiveMQQueueExistsException activeMQQueueExistsException) {
                // empty catch block
            }
            addressQueryResult = this.serverSession.executeAddressQuery(SimpleString.toSimpleString((String)addressName));
        }
        return addressQueryResult;
    }

    public void closeSender(Object brokerConsumer) throws Exception {
        final ServerConsumer consumer = (ServerConsumer)brokerConsumer;
        final CountDownLatch latch = new CountDownLatch(1);
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    consumer.close(false);
                    latch.countDown();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        };
        Executor executor = this.protonSPI.getExeuctor();
        if (executor != null) {
            executor.execute(runnable);
        } else {
            runnable.run();
        }
        try {
            latch.await(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
        }
    }

    public String tempQueueName() {
        return UUIDGenerator.getInstance().generateStringUUID();
    }

    public void close() throws Exception {
        if (this.serverSession != null) {
            OperationContext context = this.recoverContext();
            try {
                this.serverSession.close(false);
            }
            finally {
                this.resetContext(context);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
        if (transaction == null) {
            transaction = this.serverSession.getCurrentTransaction();
        }
        OperationContext oldContext = this.recoverContext();
        try {
            ((ServerConsumer)brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
        }
        finally {
            this.resetContext(oldContext);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
        OperationContext oldContext = this.recoverContext();
        try {
            ((ServerConsumer)brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
            ((ServerConsumer)brokerConsumer).getQueue().forceDelivery();
        }
        finally {
            this.resetContext(oldContext);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reject(Object brokerConsumer, Message message) throws Exception {
        OperationContext oldContext = this.recoverContext();
        try {
            ((ServerConsumer)brokerConsumer).reject(message.getMessageID());
        }
        finally {
            this.resetContext(oldContext);
        }
    }

    public void resumeDelivery(Object consumer) {
        ((ServerConsumer)consumer).receiveCredits(-1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void serverSend(Transaction transaction, Receiver receiver, Delivery delivery, String address, int messageFormat, byte[] data) throws Exception {
        AMQPMessage message = new AMQPMessage((long)messageFormat, data);
        if (address != null) {
            message.setAddress(new SimpleString(address));
        } else {
            if (message.getAddress() == null) {
                this.rejectMessage(delivery, Symbol.valueOf((String)"failed"), "Missing 'to' field for message sent to an anonymous producer");
                return;
            }
            if (!this.bindingQuery(message.getAddress().toString())) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
            }
        }
        OperationContext oldcontext = this.recoverContext();
        try {
            PagingStore store = this.manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
            if (store.isRejectingMessages()) {
                if (delivery.remotelySettled()) {
                    if (transaction != null) {
                        String amqpAddress = delivery.getLink().getTarget().getAddress();
                        ActiveMQAMQPResourceLimitExceededException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
                        transaction.markAsRollbackOnly((ActiveMQException)e);
                    }
                } else {
                    this.rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
                }
            } else {
                this.serverSend(transaction, (Message)message, delivery, receiver);
            }
        }
        finally {
            this.resetContext(oldcontext);
        }
    }

    private void rejectMessage(final Delivery delivery, Symbol errorCondition, String errorMessage) {
        ErrorCondition condition = new ErrorCondition();
        condition.setCondition(errorCondition);
        condition.setDescription(errorMessage);
        final Rejected rejected = new Rejected();
        rejected.setError(condition);
        this.afterIO(new IOCallback(){

            public void done() {
                AMQPSessionCallback.this.connection.lock();
                try {
                    delivery.disposition((DeliveryState)rejected);
                    delivery.settle();
                }
                finally {
                    AMQPSessionCallback.this.connection.unlock();
                }
                AMQPSessionCallback.this.connection.flush();
            }

            public void onError(int errorCode, String errorMessage) {
            }
        });
    }

    private void serverSend(Transaction transaction, Message message, final Delivery delivery, final Receiver receiver) throws Exception {
        message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
        this.invokeIncoming((AMQPMessage)message, (ActiveMQProtonRemotingConnection)this.transportConnection.getProtocolConnection());
        this.serverSession.send(transaction, message, false, false);
        this.afterIO(new IOCallback(){

            public void done() {
                AMQPSessionCallback.this.connection.lock();
                try {
                    if (delivery.getRemoteState() instanceof TransactionalState) {
                        TransactionalState txAccepted = new TransactionalState();
                        txAccepted.setOutcome((Outcome)Accepted.getInstance());
                        txAccepted.setTxnId(((TransactionalState)delivery.getRemoteState()).getTxnId());
                        delivery.disposition((DeliveryState)txAccepted);
                    } else {
                        delivery.disposition((DeliveryState)Accepted.getInstance());
                    }
                    delivery.settle();
                }
                finally {
                    AMQPSessionCallback.this.connection.unlock();
                }
                AMQPSessionCallback.this.connection.flush();
            }

            public void onError(int errorCode, String errorMessage) {
                AMQPSessionCallback.this.connection.lock();
                try {
                    receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
                    AMQPSessionCallback.this.connection.flush();
                }
                finally {
                    AMQPSessionCallback.this.connection.unlock();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void offerProducerCredit(String address, final int credits, final int threshold, final Receiver receiver) {
        try {
            if (address == null) {
                this.connection.lock();
                try {
                    receiver.flow(credits);
                }
                finally {
                    this.connection.unlock();
                }
                this.connection.flush();
                return;
            }
            PagingStore store = this.manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
            store.checkMemory(new Runnable(){

                @Override
                public void run() {
                    AMQPSessionCallback.this.connection.lock();
                    try {
                        if (receiver.getRemoteCredit() <= threshold) {
                            receiver.flow(credits);
                        }
                    }
                    finally {
                        AMQPSessionCallback.this.connection.unlock();
                    }
                    AMQPSessionCallback.this.connection.flush();
                }
            });
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteQueue(String queueName) throws Exception {
        this.manager.getServer().destroyQueue(new SimpleString(queueName));
    }

    public void resetContext(OperationContext oldContext) {
        this.storageManager.setContext(oldContext);
    }

    public OperationContext recoverContext() {
        OperationContext oldContext = this.storageManager.getContext();
        this.manager.getServer().getStorageManager().setContext(this.serverSession.getSessionContext());
        return oldContext;
    }

    public void sendProducerCreditsMessage(int credits, SimpleString address) {
    }

    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
        return false;
    }

    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
        ProtonServerSenderContext plugSender = (ProtonServerSenderContext)consumer.getProtocolContext();
        try {
            return plugSender.deliverMessage(ref, deliveryCount, this.transportConnection);
        }
        catch (Exception e) {
            this.connection.lock();
            try {
                plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
                this.connection.flush();
            }
            finally {
                this.connection.unlock();
            }
            throw new IllegalStateException("Can't deliver message " + e, e);
        }
    }

    public int sendLargeMessage(MessageReference ref, Message message, ServerConsumer consumer, long bodySize, int deliveryCount) {
        return 0;
    }

    public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) {
        return 0;
    }

    public void closed() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disconnect(ServerConsumer consumer, String queueName) {
        ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
        this.connection.lock();
        try {
            ((ProtonServerSenderContext)consumer.getProtocolContext()).close(ec);
            this.connection.flush();
        }
        catch (ActiveMQAMQPException e) {
            logger.error((Object)("Error closing link for " + consumer.getQueue().getAddress()));
        }
        finally {
            this.connection.unlock();
        }
    }

    public boolean hasCredits(ServerConsumer consumer) {
        ProtonServerSenderContext plugSender = (ProtonServerSenderContext)consumer.getProtocolContext();
        return plugSender != null && plugSender.getSender().getCredit() > 0;
    }

    public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
        return this.protonSPI.getTransaction(txid, remove);
    }

    public Binary newTransaction() {
        return this.protonSPI.newTransaction();
    }

    public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
        return this.serverSession.getMatchingQueue(address, routingType);
    }

    public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
        return this.serverSession.getMatchingQueue(address, queueName, routingType);
    }

    public AddressInfo getAddress(SimpleString address) {
        return this.serverSession.getAddress(address);
    }

    public void removeTemporaryQueue(String address) throws Exception {
        this.serverSession.deleteQueue(SimpleString.toSimpleString((String)address));
    }

    public RoutingType getDefaultRoutingType(String address) {
        return ((AddressSettings)this.manager.getServer().getAddressSettingsRepository().getMatch(address)).getDefaultQueueRoutingType();
    }

    public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
        this.manager.getServer().getSecurityStore().check(address, checkType, session);
    }

    public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
        this.protonSPI.invokeIncomingInterceptors(message, connection);
    }

    public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
        this.protonSPI.invokeOutgoingInterceptors(message, connection);
    }

    public void addProducer(ServerProducer serverProducer) {
        this.serverSession.addProducer(serverProducer);
    }

    public void removeProducer(String name) {
        this.serverSession.removeProducer(name);
    }
}

