/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.incoming;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlow;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.util.collections.ChunkedArrayQueue;
import com.hivemq.client.internal.util.collections.HandleList;
import org.jetbrains.annotations.NotNull;

@ClientScope
class MqttIncomingPublishService {
    @NotNull
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttIncomingPublishService.class);
    private static final boolean QOS_0_DROP_OLDEST = true;
    @NotNull
    private final MqttIncomingQosHandler incomingQosHandler;
    @NotNull
    private final ChunkedArrayQueue<Object> qos0Queue = new ChunkedArrayQueue(32);
    @NotNull
    private final ChunkedArrayQueue.Iterator qos0It = this.qos0Queue.iterator();
    @NotNull
    private final ChunkedArrayQueue<Object> qos1Or2Queue = new ChunkedArrayQueue(32);
    @NotNull
    private final ChunkedArrayQueue.Iterator qos1Or2It = this.qos1Or2Queue.iterator();
    private int referencedFlowCount;
    private int runIndex;
    private int blockingFlowCount;

    MqttIncomingPublishService(@NotNull MqttIncomingQosHandler incomingQosHandler) {
        this.incomingQosHandler = incomingQosHandler;
    }

    @CallByThread(value="Netty EventLoop")
    void onPublishQos0(@NotNull MqttStatefulPublish publish, int receiveMaximum) {
        HandleList flows;
        if (this.qos0Queue.size() >= 2 * receiveMaximum) {
            LOGGER.warn("QoS 0 publish message dropped.");
            this.qos0It.reset();
            this.qos0It.next();
            flows = (HandleList)this.qos0It.next();
            this.qos0It.remove();
            for (HandleList.Handle h = (HandleList.Handle)flows.getFirst(); h != null; h = (HandleList.Handle)h.getNext()) {
                if (((MqttIncomingPublishFlow)h.getElement()).dereference() != 0) continue;
                --this.referencedFlowCount;
            }
        }
        if (!(flows = this.onPublish(publish)).isEmpty()) {
            this.qos0Queue.offer(publish);
            this.qos0Queue.offer(flows);
        }
    }

    @CallByThread(value="Netty EventLoop")
    boolean onPublishQos1Or2(@NotNull MqttStatefulPublish publish, int receiveMaximum) {
        if (this.qos1Or2Queue.size() >= 2 * receiveMaximum) {
            return false;
        }
        HandleList<MqttIncomingPublishFlow> flows = this.onPublish(publish);
        if (this.qos1Or2Queue.isEmpty() && flows.isEmpty()) {
            this.incomingQosHandler.ack(publish);
        } else {
            this.qos1Or2Queue.offer(publish);
            this.qos1Or2Queue.offer(flows);
        }
        return true;
    }

    @CallByThread(value="Netty EventLoop")
    @NotNull
    private HandleList<MqttIncomingPublishFlow> onPublish(@NotNull MqttStatefulPublish publish) {
        HandleList<MqttIncomingPublishFlow> flows = this.incomingQosHandler.getIncomingPublishFlows().findMatching(publish);
        if (flows.isEmpty()) {
            LOGGER.warn("No publish flow registered for {}.", publish);
        }
        this.drain();
        for (HandleList.Handle h = (HandleList.Handle)flows.getFirst(); h != null; h = (HandleList.Handle)h.getNext()) {
            if (((MqttIncomingPublishFlow)h.getElement()).reference() != 1) continue;
            ++this.referencedFlowCount;
        }
        this.emit((MqttPublish)publish.stateless(), flows);
        return flows;
    }

    @CallByThread(value="Netty EventLoop")
    void drain() {
        HandleList flows;
        MqttStatefulPublish publish;
        ++this.runIndex;
        this.blockingFlowCount = 0;
        this.qos1Or2It.reset();
        while (this.qos1Or2It.hasNext()) {
            publish = (MqttStatefulPublish)this.qos1Or2It.next();
            flows = (HandleList)this.qos1Or2It.next();
            this.emit((MqttPublish)publish.stateless(), flows);
            if (this.qos1Or2It.getIterated() == 2 && flows.isEmpty()) {
                this.qos1Or2It.remove();
                this.incomingQosHandler.ack(publish);
                continue;
            }
            if (this.blockingFlowCount != this.referencedFlowCount) continue;
            return;
        }
        this.qos0It.reset();
        while (this.qos0It.hasNext()) {
            publish = (MqttStatefulPublish)this.qos0It.next();
            flows = (HandleList)this.qos0It.next();
            this.emit((MqttPublish)publish.stateless(), flows);
            if (this.qos0It.getIterated() == 2 && flows.isEmpty()) {
                this.qos0It.remove();
                continue;
            }
            if (this.blockingFlowCount != this.referencedFlowCount) continue;
            return;
        }
    }

    @CallByThread(value="Netty EventLoop")
    private void emit(@NotNull MqttPublish publish, @NotNull HandleList<MqttIncomingPublishFlow> flows) {
        for (HandleList.Handle h = (HandleList.Handle)flows.getFirst(); h != null; h = (HandleList.Handle)h.getNext()) {
            MqttIncomingPublishFlow flow = (MqttIncomingPublishFlow)h.getElement();
            if (flow.isCancelled()) {
                flows.remove(h);
                if (flow.dereference() != 0) continue;
                --this.referencedFlowCount;
                continue;
            }
            long requested = flow.requested(this.runIndex);
            if (requested > 0L) {
                flow.onNext(publish);
                flows.remove(h);
                if (flow.dereference() != 0) continue;
                --this.referencedFlowCount;
                flow.checkDone();
                continue;
            }
            if (requested != 0L) continue;
            ++this.blockingFlowCount;
            if (this.blockingFlowCount == this.referencedFlowCount) break;
        }
    }
}

