/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.outgoing;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlow;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttOutgoingQosHandler;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowableAckLink;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.NotNull;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.Nullable;
import com.hivemq.client.internal.util.collections.ChunkedArrayQueue;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class MqttAckFlowableFlow
extends MqttAckFlow
implements Subscription,
Runnable {
    private static final int STATE_NO_NEW_REQUESTS = 0;
    private static final int STATE_NEW_REQUESTS = 1;
    private static final int STATE_BLOCKED = 2;
    @NotNull
    private final Subscriber<? super MqttPublishResult> subscriber;
    @NotNull
    private final MqttOutgoingQosHandler outgoingQosHandler;
    private long requested;
    @NotNull
    private final AtomicLong newRequested = new AtomicLong();
    @NotNull
    private final AtomicInteger requestState = new AtomicInteger(0);
    private volatile long acknowledged;
    @NotNull
    private final AtomicLong published = new AtomicLong();
    @Nullable
    private Throwable error;
    @NotNull
    private final ChunkedArrayQueue<MqttPublishResult> queue = new ChunkedArrayQueue(32);
    private final @NotNull AtomicReference< @Nullable MqttPublishFlowableAckLink.LinkedFlow> linkedFlow = new AtomicReference();

    MqttAckFlowableFlow(@NotNull Subscriber<? super MqttPublishResult> subscriber, @NotNull MqttClientConfig clientConfig, @NotNull MqttOutgoingQosHandler outgoingQosHandler) {
        super(clientConfig);
        this.subscriber = subscriber;
        this.outgoingQosHandler = outgoingQosHandler;
        this.init();
    }

    @Override
    @CallByThread(value="Netty EventLoop")
    void onNext(@NotNull MqttPublishResult result) {
        this.queue.offer(result);
        this.run();
    }

    @Override
    @CallByThread(value="Netty EventLoop")
    public void run() {
        long emitted = 0L;
        long acknowledged = 0L;
        long requested = this.requested();
        block0: while (emitted < requested) {
            MqttPublishResult result;
            while ((result = this.queue.poll()) != null) {
                this.subscriber.onNext((Object)result);
                if (result.acknowledged()) {
                    ++acknowledged;
                }
                if (++emitted < requested) continue;
                if (this.isCancelled()) {
                    while ((result = this.queue.poll()) != null) {
                        if (!result.acknowledged()) continue;
                        ++acknowledged;
                    }
                    break block0;
                }
                requested = this.addNewRequested();
                continue block0;
            }
            break block0;
        }
        if (this.requested != Long.MAX_VALUE) {
            this.requested -= emitted;
        }
        this.acknowledged(acknowledged);
    }

    @Override
    @CallByThread(value="Netty EventLoop")
    void acknowledged(long newAcknowledged) {
        if (newAcknowledged > 0L) {
            long acknowledged;
            this.acknowledged = acknowledged = this.acknowledged + newAcknowledged;
            if (acknowledged == this.published.get() && this.setDone()) {
                if (this.error != null) {
                    this.subscriber.onError(this.error);
                } else {
                    this.subscriber.onComplete();
                }
            }
            this.outgoingQosHandler.request(newAcknowledged);
        }
    }

    void onComplete(long published) {
        if (!this.published.compareAndSet(0L, published)) {
            return;
        }
        if (this.acknowledged == published && this.setDone()) {
            this.subscriber.onComplete();
        }
    }

    void onError(@NotNull Throwable error, long published) {
        this.error = error;
        if (!this.published.compareAndSet(0L, published)) {
            RxJavaPlugins.onError((Throwable)error);
            return;
        }
        if (this.acknowledged == published && this.setDone()) {
            this.subscriber.onError(error);
        }
    }

    public void request(long n) {
        if (n > 0L && !this.isCancelled()) {
            BackpressureHelper.add((AtomicLong)this.newRequested, (long)n);
            if (this.requestState.getAndSet(1) == 2) {
                this.eventLoop.execute(this);
            }
        }
    }

    @CallByThread(value="Netty EventLoop")
    private long requested() {
        return this.requested > 0L ? this.requested : this.addNewRequested();
    }

    @CallByThread(value="Netty EventLoop")
    private long addNewRequested() {
        long newRequested;
        do {
            if (this.requestState.compareAndSet(0, 2)) {
                return 0L;
            }
            this.requestState.set(0);
        } while ((newRequested = this.newRequested.getAndSet(0L)) <= 0L);
        this.requested = BackpressureHelper.addCap((long)this.requested, (long)newRequested);
        return this.requested;
    }

    @Override
    protected void onCancel() {
        if (this.requestState.get() == 2) {
            this.eventLoop.execute(this);
        }
        this.cancelLink();
    }

    private void cancelLink() {
        MqttPublishFlowableAckLink.LinkedFlow linkedFlow = this.linkedFlow.getAndSet(MqttPublishFlowableAckLink.LinkedFlow.CANCELLED);
        if (linkedFlow != null) {
            linkedFlow.cancelLink();
        }
    }

    void link(@NotNull MqttPublishFlowableAckLink.LinkedFlow linkedFlow) {
        if (!this.linkedFlow.compareAndSet(null, linkedFlow)) {
            linkedFlow.cancelLink();
        }
    }
}

