/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.subscribe;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig;
import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl;
import com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttGlobalIncomingPublishFlow;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlows;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubOrUnsubAckFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubOrUnsubWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubscribeWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubscriptionFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubscribeWithFlow;
import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.MqttCommonReasonCode;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttStatefulUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.MqttUnsubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.mqtt3.Mqtt3UnsubAckView;
import com.hivemq.client.internal.shaded.io.netty.channel.ChannelHandlerContext;
import com.hivemq.client.internal.shaded.io.netty.channel.EventLoop;
import com.hivemq.client.internal.shaded.javax.inject.Inject;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.NotNull;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.Nullable;
import com.hivemq.client.internal.util.Ranges;
import com.hivemq.client.internal.util.collections.ImmutableList;
import com.hivemq.client.internal.util.collections.IntIndex;
import com.hivemq.client.internal.util.collections.NodeList;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5UnsubAckException;
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5ReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAckReasonCode;
import java.io.IOException;
import java.util.List;

@ClientScope
public class MqttSubscriptionHandler
extends MqttSessionAwareHandler
implements Runnable {
    @NotNull
    public static final String NAME = "subscription";
    @NotNull
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttSubscriptionHandler.class);
    private static final @NotNull IntIndex.Spec<MqttSubOrUnsubWithFlow> INDEX_SPEC = new IntIndex.Spec<MqttSubOrUnsubWithFlow>(x -> x.packetIdentifier, 4);
    public static final int MAX_SUB_PENDING = 10;
    @NotNull
    private final MqttClientConfig clientConfig;
    @NotNull
    private final MqttIncomingPublishFlows incomingPublishFlows;
    @NotNull
    private final NodeList<MqttSubOrUnsubWithFlow> pending = new NodeList();
    @NotNull
    private final Ranges packetIdentifiers;
    private int nextSubscriptionIdentifier = 1;
    @NotNull
    private final IntIndex<MqttSubOrUnsubWithFlow> pendingIndex = new IntIndex<MqttSubOrUnsubWithFlow>(INDEX_SPEC);
    @Nullable
    private MqttSubOrUnsubWithFlow sendPending;
    @Nullable
    private MqttSubOrUnsubWithFlow currentPending;
    private boolean subscriptionIdentifiersAvailable;

    @Inject
    MqttSubscriptionHandler(@NotNull MqttClientConfig clientConfig, @NotNull MqttIncomingPublishFlows incomingPublishFlows) {
        this.clientConfig = clientConfig;
        this.incomingPublishFlows = incomingPublishFlows;
        int maxPacketIdentifier = 65535;
        int minPacketIdentifier = 65526;
        this.packetIdentifiers = new Ranges(65526, 65535);
    }

    @Override
    public void onSessionStartOrResume(@NotNull MqttClientConnectionConfig connectionConfig, @NotNull EventLoop eventLoop) {
        this.subscriptionIdentifiersAvailable = connectionConfig.areSubscriptionIdentifiersAvailable();
        if (!this.hasSession || this.clientConfig.isResubscribeIfSessionPresent()) {
            this.incomingPublishFlows.getSubscriptions().forEach((subscriptionIdentifier, subscriptions) -> {
                MqttSubscribe subscribe = new MqttSubscribe(ImmutableList.copyOf(subscriptions), MqttUserPropertiesImpl.NO_USER_PROPERTIES);
                this.pending.addFirst(new MqttSubscribeWithFlow(subscribe, (int)subscriptionIdentifier, null));
            });
        }
        this.pendingIndex.clear();
        this.sendPending = this.pending.getFirst();
        if (this.sendPending != null) {
            eventLoop.execute(this);
        }
        super.onSessionStartOrResume(connectionConfig, eventLoop);
    }

    public void subscribe(@NotNull MqttSubscribe subscribe, @NotNull MqttSubscriptionFlow<MqttSubAck> flow) {
        flow.getEventLoop().execute(() -> {
            if (flow.init()) {
                int subscriptionIdentifier = this.nextSubscriptionIdentifier++;
                this.incomingPublishFlows.subscribe(subscribe, subscriptionIdentifier, flow instanceof MqttSubscribedPublishFlow ? (MqttSubscribedPublishFlow)flow : null);
                this.queue(new MqttSubscribeWithFlow(subscribe, subscriptionIdentifier, flow));
            }
        });
    }

    public void unsubscribe(@NotNull MqttUnsubscribe unsubscribe, @NotNull MqttSubOrUnsubAckFlow<MqttUnsubAck> flow) {
        flow.getEventLoop().execute(() -> {
            if (flow.init()) {
                this.queue(new MqttUnsubscribeWithFlow(unsubscribe, flow));
            }
        });
    }

    public void subscribeGlobal(@NotNull MqttGlobalIncomingPublishFlow flow) {
        flow.getEventLoop().execute(() -> {
            if (flow.init()) {
                this.incomingPublishFlows.subscribeGlobal(flow);
            }
        });
    }

    private void queue(@NotNull MqttSubOrUnsubWithFlow subOrUnsubWithFlow) {
        this.pending.add(subOrUnsubWithFlow);
        if (this.sendPending == null) {
            this.sendPending = subOrUnsubWithFlow;
            this.run();
        }
    }

    @Override
    @CallByThread(value="Netty EventLoop")
    public void run() {
        ChannelHandlerContext ctx = this.ctx;
        if (ctx == null) {
            return;
        }
        int written = 0;
        MqttSubOrUnsubWithFlow subOrUnsubWithFlow = this.sendPending;
        while (subOrUnsubWithFlow != null && this.pendingIndex.size() < 10) {
            if (subOrUnsubWithFlow.packetIdentifier == 0) {
                int packetIdentifier = this.packetIdentifiers.getId();
                if (packetIdentifier == -1) {
                    LOGGER.error("No Packet Identifier available for (UN)SUBSCRIBE. This must not happen and is a bug.");
                    return;
                }
                subOrUnsubWithFlow.packetIdentifier = packetIdentifier;
            }
            this.pendingIndex.put(subOrUnsubWithFlow);
            if (this.sendPending instanceof MqttSubscribeWithFlow) {
                this.writeSubscribe(ctx, (MqttSubscribeWithFlow)subOrUnsubWithFlow);
            } else {
                this.writeUnsubscribe(ctx, (MqttUnsubscribeWithFlow)subOrUnsubWithFlow);
            }
            ++written;
            this.sendPending = subOrUnsubWithFlow = (MqttSubOrUnsubWithFlow)subOrUnsubWithFlow.getNext();
        }
        if (written > 0) {
            ctx.flush();
        }
    }

    private void writeSubscribe(@NotNull ChannelHandlerContext ctx, @NotNull MqttSubscribeWithFlow subscribeWithFlow) {
        int subscriptionIdentifier = this.subscriptionIdentifiersAvailable ? subscribeWithFlow.subscriptionIdentifier : -1;
        MqttStatefulSubscribe statefulSubscribe = subscribeWithFlow.subscribe.createStateful(subscribeWithFlow.packetIdentifier, subscriptionIdentifier);
        this.currentPending = subscribeWithFlow;
        ctx.write(statefulSubscribe, ctx.voidPromise());
        this.currentPending = null;
    }

    private void writeUnsubscribe(@NotNull ChannelHandlerContext ctx, @NotNull MqttUnsubscribeWithFlow unsubscribeWithFlow) {
        MqttStatefulUnsubscribe statefulUnsubscribe = unsubscribeWithFlow.unsubscribe.createStateful(unsubscribeWithFlow.packetIdentifier);
        this.currentPending = unsubscribeWithFlow;
        ctx.write(statefulUnsubscribe, ctx.voidPromise());
        this.currentPending = null;
    }

    @Override
    public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) {
        if (msg instanceof MqttSubAck) {
            this.readSubAck(ctx, (MqttSubAck)msg);
        } else if (msg instanceof MqttUnsubAck) {
            this.readUnsubAck(ctx, (MqttUnsubAck)msg);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private void readSubAck(@NotNull ChannelHandlerContext ctx, @NotNull MqttSubAck subAck) {
        MqttSubOrUnsubWithFlow subOrUnsubWithFlow = this.pendingIndex.remove(subAck.getPacketIdentifier());
        if (subOrUnsubWithFlow == null) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for SUBACK");
            return;
        }
        if (!(subOrUnsubWithFlow instanceof MqttSubscribeWithFlow)) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "SUBACK received for an UNSUBSCRIBE");
            return;
        }
        MqttSubscribeWithFlow subscribeWithFlow = (MqttSubscribeWithFlow)subOrUnsubWithFlow;
        MqttSubscriptionFlow<MqttSubAck> flow = subscribeWithFlow.getFlow();
        List reasonCodes = subAck.getReasonCodes();
        boolean countNotMatching = subscribeWithFlow.subscribe.getSubscriptions().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors((ImmutableList<? extends Mqtt5ReasonCode>)subAck.getReasonCodes());
        this.incomingPublishFlows.subAck(subscribeWithFlow.subscribe, subscribeWithFlow.subscriptionIdentifier, (ImmutableList<Mqtt5SubAckReasonCode>)reasonCodes);
        if (flow != null) {
            if (!countNotMatching && !allErrors) {
                if (!flow.isCancelled()) {
                    flow.onSuccess(subAck);
                } else {
                    LOGGER.warn("Subscribe was successful but the SubAck flow has been cancelled");
                }
            } else {
                String errorMessage = countNotMatching ? "Count of Reason Codes in SUBACK does not match count of subscriptions in SUBSCRIBE" : "SUBACK contains only Error Codes";
                if (!flow.isCancelled()) {
                    flow.onError(new Mqtt5SubAckException(subAck, errorMessage));
                } else {
                    LOGGER.warn(errorMessage + " but the SubAck flow has been cancelled");
                }
            }
        }
        this.completePending(subscribeWithFlow);
    }

    private void readUnsubAck(@NotNull ChannelHandlerContext ctx, @NotNull MqttUnsubAck unsubAck) {
        MqttSubOrUnsubWithFlow subOrUnsubWithFlow = this.pendingIndex.remove(unsubAck.getPacketIdentifier());
        if (subOrUnsubWithFlow == null) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "Unknown packet identifier for UNSUBACK");
            return;
        }
        if (!(subOrUnsubWithFlow instanceof MqttUnsubscribeWithFlow)) {
            MqttDisconnectUtil.disconnect(ctx.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "UNSUBACK received for a SUBSCRIBE");
            return;
        }
        MqttUnsubscribeWithFlow unsubscribeWithFlow = (MqttUnsubscribeWithFlow)subOrUnsubWithFlow;
        MqttSubscriptionFlow flow = unsubscribeWithFlow.getFlow();
        List reasonCodes = unsubAck.getReasonCodes();
        boolean countNotMatching = unsubscribeWithFlow.unsubscribe.getTopicFilters().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors((ImmutableList<? extends Mqtt5ReasonCode>)unsubAck.getReasonCodes());
        if (reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS || !countNotMatching && !allErrors) {
            this.incomingPublishFlows.unsubscribe(unsubscribeWithFlow.unsubscribe, (ImmutableList<Mqtt5UnsubAckReasonCode>)reasonCodes);
            if (!((FlowWithEventLoop)((Object)flow)).isCancelled()) {
                ((MqttSubOrUnsubAckFlow)flow).onSuccess(unsubAck);
            } else {
                LOGGER.warn("Unsubscribe was successful but the UnsubAck flow has been cancelled");
            }
        } else {
            String errorMessage = countNotMatching ? "Count of Reason Codes in UNSUBACK does not match count of Topic Filters in UNSUBSCRIBE" : "UNSUBACK contains only Error Codes";
            if (!((FlowWithEventLoop)((Object)flow)).isCancelled()) {
                ((MqttSubOrUnsubAckFlow)flow).onError(new Mqtt5UnsubAckException(unsubAck, errorMessage));
            } else {
                LOGGER.warn(errorMessage + " but the UnsubAck flow has been cancelled");
            }
        }
        this.completePending(unsubscribeWithFlow);
    }

    private void completePending(@NotNull MqttSubOrUnsubWithFlow oldPending) {
        this.pending.remove(oldPending);
        this.packetIdentifiers.returnId(oldPending.packetIdentifier);
        this.run();
    }

    @Override
    public void exceptionCaught(@NotNull ChannelHandlerContext ctx, @NotNull Throwable cause) {
        if (!(cause instanceof IOException) && this.currentPending != null) {
            this.pending.remove(this.currentPending);
            this.packetIdentifiers.returnId(this.currentPending.packetIdentifier);
            this.pendingIndex.remove(this.currentPending.packetIdentifier);
            MqttSubscriptionFlow<?> flow = this.currentPending.getFlow();
            if (flow != null) {
                flow.onError(cause);
            }
            if (this.currentPending instanceof MqttSubscribeWithFlow) {
                MqttSubscribeWithFlow subscribeWithFlow = (MqttSubscribeWithFlow)this.currentPending;
                this.incomingPublishFlows.subAck(subscribeWithFlow.subscribe, subscribeWithFlow.subscriptionIdentifier, ImmutableList.of(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR));
            }
            this.currentPending = null;
        } else {
            ctx.fireExceptionCaught(cause);
        }
    }

    @Override
    public void onSessionEnd(@NotNull Throwable cause) {
        MqttSubOrUnsubWithFlow current;
        super.onSessionEnd(cause);
        this.pendingIndex.clear();
        this.sendPending = null;
        for (current = this.pending.getFirst(); current != null && current.packetIdentifier != 0; current = (MqttSubOrUnsubWithFlow)current.getNext()) {
            this.packetIdentifiers.returnId(current.packetIdentifier);
            current.packetIdentifier = 0;
        }
        if (this.clientConfig.isResubscribeIfSessionExpired() && this.clientConfig.getState() != MqttClientState.DISCONNECTED) {
            return;
        }
        this.incomingPublishFlows.clear(cause);
        for (current = this.pending.getFirst(); current != null; current = (MqttSubOrUnsubWithFlow)current.getNext()) {
            MqttSubscriptionFlow<?> flow = current.getFlow();
            if (flow == null) continue;
            flow.onError(cause);
        }
        this.pending.clear();
        this.nextSubscriptionIdentifier = 1;
    }
}

