/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTMessageInfo;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;

public class MQTTPublishManager {
    private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
    private SimpleString managementAddress;
    private ServerConsumer managementConsumer;
    private MQTTSession session;
    private MQTTLogger log = MQTTLogger.LOGGER;
    private final Object lock = new Object();

    public MQTTPublishManager(MQTTSession session) {
        this.session = session;
    }

    synchronized void start() throws Exception {
        this.createManagementAddress();
        this.createManagementQueue();
        this.createManagementConsumer();
    }

    synchronized void stop(boolean clean) throws Exception {
        if (this.managementConsumer != null) {
            this.managementConsumer.removeItself();
            this.managementConsumer.setStarted(false);
            this.managementConsumer.close(false);
            if (clean) {
                this.session.getServer().destroyQueue(this.managementAddress);
            }
        }
    }

    private void createManagementConsumer() throws Exception {
        long consumerId = this.session.getServer().getStorageManager().generateID();
        this.managementConsumer = this.session.getServerSession().createConsumer(consumerId, this.managementAddress, null, false, false, Integer.valueOf(-1));
        this.managementConsumer.setStarted(true);
    }

    private void createManagementAddress() {
        String clientId = this.session.getSessionState().getClientId();
        this.managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + clientId);
    }

    private void createManagementQueue() throws Exception {
        if (this.session.getServer().locateQueue(this.managementAddress) == null) {
            this.session.getServerSession().createQueue(this.managementAddress, this.managementAddress, null, false, true);
        }
    }

    boolean isManagementConsumer(ServerConsumer consumer) {
        return consumer == this.managementConsumer;
    }

    private int generateMqttId(int qos) {
        if (qos == 1) {
            return this.session.getSessionState().generateId();
        }
        Integer mqttid = this.session.getSessionState().generateId();
        if (mqttid == null) {
            mqttid = (int)this.session.getServer().getStorageManager().generateID();
        }
        return mqttid;
    }

    protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
        if (this.isManagementConsumer(consumer)) {
            this.sendPubRelMessage(message);
        } else {
            int qos = this.decideQoS(message, consumer);
            if (qos == 0) {
                this.sendServerMessage((int)message.getMessageID(), (ServerMessageImpl)message, deliveryCount, qos);
                this.session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
            } else {
                String consumerAddress = consumer.getQueue().getAddress().toString();
                Integer mqttid = this.generateMqttId(qos);
                this.session.getSessionState().addOutbandMessageRef(mqttid, consumerAddress, message.getMessageID(), qos);
                this.sendServerMessage(mqttid, (ServerMessageImpl)message, deliveryCount, qos);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleMessage(int messageId, String topic, int qos, ByteBuf payload, boolean retain) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(this.session, topic, retain, qos, payload);
            if (qos > 0) {
                serverMessage.setDurable(true);
            }
            if (qos < 2 || !this.session.getSessionState().getPubRec().contains(messageId)) {
                if (qos == 2) {
                    this.session.getSessionState().getPubRec().add(messageId);
                }
                this.session.getServerSession().send(serverMessage, true);
            }
            if (retain) {
                boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
                this.session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset);
            }
            this.createMessageAck(messageId, qos);
        }
    }

    void sendPubRelMessage(ServerMessage message) {
        if (message.getIntProperty("mqtt.message.type").intValue() == MqttMessageType.PUBREL.value()) {
            int messageId = message.getIntProperty("mqtt.message.id");
            MQTTMessageInfo messageInfo = new MQTTMessageInfo(message.getMessageID(), this.managementConsumer.getID(), message.getAddress().toString());
            this.session.getSessionState().storeMessageRef(messageId, messageInfo, false);
            this.session.getProtocolHandler().sendPubRel(messageId);
        }
    }

    private void createMessageAck(final int messageId, final int qos) {
        this.session.getServer().getStorageManager().afterCompleteOperations(new IOCallback(){

            public void done() {
                if (qos == 1) {
                    MQTTPublishManager.this.session.getProtocolHandler().sendPubAck(messageId);
                } else if (qos == 2) {
                    MQTTPublishManager.this.session.getProtocolHandler().sendPubRec(messageId);
                }
            }

            public void onError(int errorCode, String errorMessage) {
                MQTTPublishManager.this.log.error("Pub Sync Failed");
            }
        });
    }

    void handlePubRec(int messageId) throws Exception {
        MQTTMessageInfo messageRef = this.session.getSessionState().getMessageInfo(messageId);
        if (messageRef != null) {
            ServerMessage pubRel = MQTTUtil.createPubRelMessage(this.session, this.managementAddress, messageId);
            this.session.getServerSession().send(pubRel, true);
            this.session.getServerSession().acknowledge(messageRef.getConsumerId(), messageRef.getServerMessageId());
            this.session.getProtocolHandler().sendPubRel(messageId);
        }
    }

    void handlePubComp(int messageId) throws Exception {
        MQTTMessageInfo messageInfo = this.session.getSessionState().getMessageInfo(messageId);
        if (messageInfo != null) {
            this.session.getServerSession().acknowledge(this.managementConsumer.getID(), messageInfo.getServerMessageId());
        }
    }

    void handlePubRel(int messageId) {
        this.session.getSessionState().getPubRec().remove(messageId);
        this.session.getProtocolHandler().sendPubComp(messageId);
        this.session.getSessionState().removeMessageRef(messageId);
    }

    void handlePubAck(int messageId) throws Exception {
        Pair<String, Long> pub1MessageInfo = this.session.getSessionState().removeOutbandMessageRef(messageId, 1);
        if (pub1MessageInfo != null) {
            String mqttAddress = MQTTUtil.convertCoreAddressFilterToMQTT((String)pub1MessageInfo.getA());
            ServerConsumer consumer = this.session.getSubscriptionManager().getConsumerForAddress(mqttAddress);
            this.session.getServerSession().acknowledge(consumer.getID(), ((Long)pub1MessageInfo.getB()).longValue());
        }
    }

    private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
        String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
        ByteBuf payload = message.getBodyBufferCopy().byteBuf();
        this.session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
    }

    private int decideQoS(ServerMessage message, ServerConsumer consumer) {
        int qos;
        int subscriptionQoS = this.session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
        return subscriptionQoS < (qos = message.getIntProperty("mqtt.qos.level").intValue()) ? subscriptionQoS : qos;
    }
}

