/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.proton.plug;

import io.netty.buffer.ByteBuf;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
import org.apache.activemq.artemis.core.protocol.proton.plug.ActiveMQProtonConnectionCallback;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext;
import org.proton.plug.SASLResult;
import org.proton.plug.context.ProtonPlugSender;
import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.proton.plug.sasl.PlainSASLResult;

public class ProtonSessionIntegrationCallback
implements AMQPSessionCallback,
SessionCallback {
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0L);
    private final ActiveMQProtonConnectionCallback protonSPI;
    private final ProtonProtocolManager manager;
    private final AMQPConnectionContext connection;
    private final Connection transportConnection;
    private ServerSession serverSession;
    private AMQPSessionContext protonSession;
    private final Executor closeExecutor;
    private final AtomicBoolean draining = new AtomicBoolean(false);

    public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, Connection transportConnection, Executor executor) {
        this.protonSPI = protonSPI;
        this.manager = manager;
        this.connection = connection;
        this.transportConnection = transportConnection;
        this.closeExecutor = executor;
    }

    public boolean isWritable(ReadyListener callback) {
        return this.transportConnection.isWritable(callback);
    }

    public void onFlowConsumer(Object consumer, int credits, boolean drain) {
        ServerConsumerImpl serverConsumer = (ServerConsumerImpl)consumer;
        if (drain) {
            if (this.draining.compareAndSet(false, true)) {
                final ProtonPlugSender plugSender = (ProtonPlugSender)serverConsumer.getProtocolContext();
                serverConsumer.forceDelivery(1L, new Runnable(){

                    @Override
                    public void run() {
                        try {
                            plugSender.getSender().drained();
                        }
                        finally {
                            ProtonSessionIntegrationCallback.this.draining.set(false);
                        }
                    }
                });
            }
        } else {
            serverConsumer.receiveCredits(-1);
        }
    }

    public void browserFinished(ServerConsumer consumer) {
    }

    public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
        this.protonSession = protonSession;
        String name = UUIDGenerator.getInstance().generateStringUUID();
        String user = null;
        String passcode = null;
        if (saslResult != null) {
            user = saslResult.getUser();
            if (saslResult instanceof PlainSASLResult) {
                passcode = ((PlainSASLResult)saslResult).getPassword();
            }
        }
        this.serverSession = this.manager.getServer().createSession(name, user, passcode, 102400, (RemotingConnection)this.protonSPI.getProtonConnectionDelegate(), false, false, false, true, (String)null, (SessionCallback)this, true);
    }

    public void afterDelivery() throws Exception {
    }

    public void start() {
    }

    public Object createSender(ProtonPlugSender protonSender, String queue, String filter, boolean browserOnly) throws Exception {
        long consumerID = this.consumerIDGenerator.generateID();
        filter = SelectorTranslator.convertToActiveMQFilterString((String)filter);
        ServerConsumer consumer = this.serverSession.createConsumer(consumerID, SimpleString.toSimpleString((String)queue), SimpleString.toSimpleString((String)filter), browserOnly);
        consumer.setStarted(true);
        consumer.setProtocolContext((Object)protonSender);
        return consumer;
    }

    public void startSender(Object brokerConsumer) throws Exception {
        ServerConsumer serverConsumer = (ServerConsumer)brokerConsumer;
        serverConsumer.receiveCredits(-1);
    }

    public void createTemporaryQueue(String queueName) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)queueName), SimpleString.toSimpleString((String)queueName), null, true, false);
    }

    public void createTemporaryQueue(String address, String queueName) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)address), SimpleString.toSimpleString((String)queueName), null, false, true);
    }

    public void createDurableQueue(String address, String queueName) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString((String)address), SimpleString.toSimpleString((String)queueName), null, false, true);
    }

    public boolean queueQuery(String queueName) throws Exception {
        boolean queryResult = false;
        QueueQueryResult queueQuery = this.serverSession.executeQueueQuery(SimpleString.toSimpleString((String)queueName));
        if (queueQuery.isExists()) {
            queryResult = true;
        } else if (queueQuery.isAutoCreateJmsQueues()) {
            this.serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
            queryResult = true;
        } else {
            queryResult = false;
        }
        return queryResult;
    }

    public boolean bindingQuery(String address) throws Exception {
        boolean queryResult = false;
        BindingQueryResult queueQuery = this.serverSession.executeBindingQuery(SimpleString.toSimpleString((String)address));
        if (queueQuery.isExists()) {
            queryResult = true;
        } else if (queueQuery.isAutoCreateJmsQueues()) {
            this.serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
            queryResult = true;
        } else {
            queryResult = false;
        }
        return queryResult;
    }

    public void closeSender(final Object brokerConsumer) throws Exception {
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    ((ServerConsumer)brokerConsumer).close(false);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        };
        Executor executor = this.protonSPI.getExeuctor();
        if (executor != null) {
            executor.execute(runnable);
        } else {
            runnable.run();
        }
    }

    public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception {
        return (ProtonJMessage)this.manager.getConverter().outbound((ServerMessage)message, deliveryCount);
    }

    public Binary getCurrentTXID() {
        Transaction tx = this.serverSession.getCurrentTransaction();
        if (tx == null) {
            tx = this.serverSession.newTransaction();
            this.serverSession.resetTX(tx);
        }
        return new Binary(ByteUtil.longToBytes((long)tx.getID()));
    }

    public String tempQueueName() {
        return UUIDGenerator.getInstance().generateStringUUID();
    }

    public void commitCurrentTX() throws Exception {
        this.recoverContext();
        try {
            this.serverSession.commit();
        }
        finally {
            this.resetContext();
        }
    }

    public void rollbackCurrentTX(boolean lastMessageDelivered) throws Exception {
        if (this.serverSession != null) {
            this.recoverContext();
            try {
                this.serverSession.rollback(lastMessageDelivered);
            }
            finally {
                this.resetContext();
            }
        }
    }

    public void close() throws Exception {
        if (this.serverSession != null) {
            this.recoverContext();
            try {
                this.serverSession.close(false);
            }
            finally {
                this.resetContext();
            }
        }
    }

    public void ack(Object brokerConsumer, Object message) throws Exception {
        this.recoverContext();
        try {
            ((ServerConsumer)brokerConsumer).individualAcknowledge(this.serverSession.getCurrentTransaction(), ((ServerMessage)message).getMessageID());
        }
        finally {
            this.resetContext();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
        this.recoverContext();
        try {
            ((ServerConsumer)brokerConsumer).individualCancel(((ServerMessage)message).getMessageID(), updateCounts);
        }
        finally {
            this.resetContext();
        }
    }

    public void resumeDelivery(Object consumer) {
        ((ServerConsumer)consumer).receiveCredits(-1);
    }

    public void serverSend(Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf messageEncoded) throws Exception {
        EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
        ServerMessage message = this.manager.getConverter().inbound((Object)encodedMessage);
        if (address != null) {
            message.setAddress(new SimpleString(address));
        }
        this.recoverContext();
        PagingStore store = this.manager.getServer().getPagingManager().getPageStore(message.getAddress());
        if (store.isRejectingMessages()) {
            if (delivery.remotelySettled()) {
                if (this.serverSession.getCurrentTransaction() != null) {
                    String amqpAddress = delivery.getLink().getTarget().getAddress();
                    ActiveMQAMQPResourceLimitExceededException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
                    this.serverSession.getCurrentTransaction().markAsRollbackOnly((ActiveMQException)e);
                }
            } else {
                this.rejectMessage(delivery);
            }
        } else {
            this.serverSend(message, delivery, receiver);
        }
    }

    private void rejectMessage(Delivery delivery) {
        String address = delivery.getLink().getTarget().getAddress();
        ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
        Rejected rejected = new Rejected();
        rejected.setError(ec);
        delivery.disposition((DeliveryState)rejected);
        this.connection.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void serverSend(ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
        try {
            this.serverSession.send(message, false);
            this.manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void done() {
                    Object object = ProtonSessionIntegrationCallback.this.connection.getLock();
                    synchronized (object) {
                        delivery.disposition((DeliveryState)Accepted.getInstance());
                        delivery.settle();
                        ProtonSessionIntegrationCallback.this.connection.flush();
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void onError(int errorCode, String errorMessage) {
                    Object object = ProtonSessionIntegrationCallback.this.connection.getLock();
                    synchronized (object) {
                        receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
                        ProtonSessionIntegrationCallback.this.connection.flush();
                    }
                }
            });
        }
        finally {
            this.resetContext();
        }
    }

    public String getPubSubPrefix() {
        return this.manager.getPubSubPrefix();
    }

    public void offerProducerCredit(String address, final int credits, final int threshold, final Receiver receiver) {
        try {
            PagingStore store = this.manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
            store.checkMemory(new Runnable(){

                @Override
                public void run() {
                    if (receiver.getRemoteCredit() < threshold) {
                        receiver.flow(credits);
                        ProtonSessionIntegrationCallback.this.connection.flush();
                    }
                }
            });
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteQueue(String address) throws Exception {
        this.manager.getServer().destroyQueue(new SimpleString(address));
    }

    private void resetContext() {
        this.manager.getServer().getStorageManager().setContext(null);
    }

    private void recoverContext() {
        this.manager.getServer().getStorageManager().setContext(this.serverSession.getSessionContext());
    }

    public void sendProducerCreditsMessage(int credits, SimpleString address) {
    }

    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
        return false;
    }

    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
        ProtonPlugSender plugSender = (ProtonPlugSender)consumer.getProtocolContext();
        try {
            return plugSender.deliverMessage((Object)message, deliveryCount);
        }
        catch (Exception e) {
            Object object = this.connection.getLock();
            synchronized (object) {
                plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
                this.connection.flush();
            }
            throw new IllegalStateException("Can't deliver message " + e, e);
        }
    }

    public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
        return 0;
    }

    public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) {
        return 0;
    }

    public void closed() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disconnect(ServerConsumer consumer, String queueName) {
        Object object = this.connection.getLock();
        synchronized (object) {
            ((Link)consumer.getProtocolContext()).close();
            this.connection.flush();
        }
    }

    public boolean hasCredits(ServerConsumer consumer) {
        ProtonPlugSender plugSender = (ProtonPlugSender)consumer.getProtocolContext();
        return plugSender != null && plugSender.getSender().getCredit() > 0;
    }
}

