/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;

public class MQTTRetainMessageManager {
    private MQTTSession session;

    public MQTTRetainMessageManager(MQTTSession session) {
        this.session = session;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleRetainedMessage(ServerMessage message, String address, boolean reset, Transaction tx) throws Exception {
        SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
        Queue queue = this.session.getServer().locateQueue(retainAddress);
        if (queue == null) {
            queue = this.session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
        }
        try (LinkedListIterator iterator = queue.iterator();){
            Queue queue2 = queue;
            synchronized (queue2) {
                if (iterator.hasNext()) {
                    MessageReference ref = (MessageReference)iterator.next();
                    iterator.remove();
                    queue.acknowledge(tx, ref);
                }
                if (!reset) {
                    this.sendToQueue(message.copy(this.session.getServer().getStorageManager().generateID()), queue, tx);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
        String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
        BindingQueryResult bindingQueryResult = this.session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
        Transaction tx = this.session.getServerSession().newTransaction();
        try {
            Queue queue2 = queue;
            synchronized (queue2) {
                for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
                    Queue retainedQueue = this.session.getServer().locateQueue(retainedQueueName);
                    LinkedListIterator i = retainedQueue.iterator();
                    Throwable throwable = null;
                    try {
                        if (!i.hasNext()) continue;
                        ServerMessage message = ((MessageReference)i.next()).getMessage().copy(this.session.getServer().getStorageManager().generateID());
                        this.sendToQueue(message, queue, tx);
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (i == null) continue;
                        if (throwable != null) {
                            try {
                                i.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        i.close();
                    }
                }
            }
        }
        catch (Throwable t) {
            tx.rollback();
            throw t;
        }
        tx.commit();
    }

    private void sendToQueue(ServerMessage message, Queue queue, Transaction tx) throws Exception {
        RoutingContextImpl context = new RoutingContextImpl(tx);
        queue.route(message, (RoutingContext)context);
        this.session.getServer().getPostOffice().processRoute(message, (RoutingContext)context, false);
    }
}

