/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.outgoing;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlow;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttOutgoingQosHandler;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishWithFlow;
import com.hivemq.client.internal.mqtt.ioc.ClientComponent;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import io.reactivex.Flowable;
import io.reactivex.internal.subscriptions.EmptySubscription;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MqttAckSingleFlowable
extends Flowable<Mqtt5PublishResult> {
    @NotNull
    private final MqttClientConfig clientConfig;
    @NotNull
    private final MqttPublish publish;

    public MqttAckSingleFlowable(@NotNull MqttClientConfig clientConfig, @NotNull MqttPublish publish) {
        this.clientConfig = clientConfig;
        this.publish = publish;
    }

    protected void subscribeActual(@NotNull Subscriber<? super Mqtt5PublishResult> subscriber) {
        if (this.clientConfig.getState().isConnectedOrReconnect()) {
            ClientComponent clientComponent = this.clientConfig.getClientComponent();
            MqttOutgoingQosHandler outgoingQosHandler = clientComponent.outgoingQosHandler();
            MqttPublishFlowables publishFlowables = outgoingQosHandler.getPublishFlowables();
            Flow flow = new Flow(subscriber, this.clientConfig, outgoingQosHandler);
            subscriber.onSubscribe((Subscription)flow);
            publishFlowables.add((Flowable<MqttPublishWithFlow>)Flowable.just((Object)new MqttPublishWithFlow(this.publish, flow)));
        } else {
            EmptySubscription.error((Throwable)MqttClientStateExceptions.notConnected(), subscriber);
        }
    }

    private static class Flow
    extends MqttAckFlow
    implements Subscription,
    Runnable {
        private static final int STATE_NONE = 0;
        private static final int STATE_RESULT = 1;
        private static final int STATE_REQUESTED = 2;
        @NotNull
        private final Subscriber<? super Mqtt5PublishResult> subscriber;
        @NotNull
        private final MqttOutgoingQosHandler outgoingQosHandler;
        @NotNull
        private final AtomicInteger state = new AtomicInteger(0);
        @Nullable
        private MqttPublishResult result;

        Flow(@NotNull Subscriber<? super Mqtt5PublishResult> subscriber, @NotNull MqttClientConfig clientConfig, @NotNull MqttOutgoingQosHandler outgoingQosHandler) {
            super(clientConfig);
            this.subscriber = subscriber;
            this.outgoingQosHandler = outgoingQosHandler;
            this.init();
        }

        @Override
        @CallByThread(value="Netty EventLoop")
        void onNext(@NotNull MqttPublishResult result) {
            if (this.state.get() == 2) {
                this.onNextUnsafe(result);
            } else {
                this.result = result;
                if (!this.state.compareAndSet(0, 1)) {
                    this.result = null;
                    this.onNextUnsafe(result);
                } else if (this.isCancelled()) {
                    this.result = null;
                    if (result.acknowledged()) {
                        this.acknowledged(1L);
                    }
                }
            }
        }

        @CallByThread(value="Netty EventLoop")
        private void onNextUnsafe(@NotNull MqttPublishResult result) {
            this.subscriber.onNext((Object)result);
            if (result.acknowledged()) {
                this.acknowledged(1L);
            }
        }

        @Override
        @CallByThread(value="Netty EventLoop")
        void acknowledged(long acknowledged) {
            if (acknowledged != 1L) {
                throw new IllegalStateException("A single publish must be acknowledged exactly once. This must not happen and is a bug.");
            }
            if (this.setDone()) {
                this.subscriber.onComplete();
            }
            this.outgoingQosHandler.request(1L);
        }

        public void request(long n) {
            if (n > 0L && !this.isCancelled() && this.state.getAndSet(2) == 1) {
                this.eventLoop.execute((Runnable)this);
            }
        }

        @Override
        protected void onCancel() {
            if (this.state.get() == 1) {
                this.eventLoop.execute((Runnable)this);
            }
        }

        @Override
        @CallByThread(value="Netty EventLoop")
        public void run() {
            MqttPublishResult result = this.result;
            if (result != null) {
                this.result = null;
                if (this.isCancelled()) {
                    if (result.acknowledged()) {
                        this.acknowledged(1L);
                    }
                } else {
                    this.onNextUnsafe(result);
                }
            }
        }
    }
}

