/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.ConsumeOptions;
import io.nats.client.Dispatcher;
import io.nats.client.JetStreamApiException;
import io.nats.client.MessageHandler;
import io.nats.client.PullRequestOptions;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.impl.NatsMessageConsumerBase;
import io.nats.client.impl.PullManagerObserver;
import io.nats.client.impl.SimplifiedSubscriptionMaker;
import java.io.IOException;

class NatsMessageConsumer
extends NatsMessageConsumerBase
implements PullManagerObserver {
    protected final ConsumeOptions consumeOpts;
    protected final int thresholdMessages;
    protected final long thresholdBytes;
    protected final SimplifiedSubscriptionMaker subscriptionMaker;
    protected final Dispatcher userDispatcher;
    protected final MessageHandler userMessageHandler;

    NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo cachedConsumerInfo, ConsumeOptions consumeOpts, Dispatcher userDispatcher, MessageHandler userMessageHandler) throws IOException, JetStreamApiException {
        super(cachedConsumerInfo);
        this.subscriptionMaker = subscriptionMaker;
        this.consumeOpts = consumeOpts;
        this.userDispatcher = userDispatcher;
        this.userMessageHandler = userMessageHandler;
        int bm = consumeOpts.getBatchSize();
        long bb = consumeOpts.getBatchBytes();
        int rePullMessages = Math.max(1, bm * consumeOpts.getThresholdPercent() / 100);
        long rePullBytes = bb == 0L ? 0L : Math.max(1L, bb * (long)consumeOpts.getThresholdPercent() / 100L);
        this.thresholdMessages = bm - rePullMessages;
        this.thresholdBytes = bb == 0L ? Integer.MIN_VALUE : bb - rePullBytes;
        this.doSub(true);
    }

    @Override
    public void heartbeatError() {
        try {
            if (this.stopped.get()) {
                this.fullClose();
            } else {
                this.shutdownSub();
                this.doSub(false);
            }
        }
        catch (JetStreamApiException | IOException e) {
            this.setupHbAlarmToTrigger();
        }
    }

    void doSub(boolean first) throws JetStreamApiException, IOException {
        MessageHandler mh = this.userMessageHandler == null ? null : msg -> {
            this.userMessageHandler.onMessage(msg);
            if (this.stopped.get() && this.pmm.noMorePending()) {
                this.finished.set(true);
            }
        };
        try {
            this.stopped.set(false);
            this.finished.set(false);
            super.initSub(this.subscriptionMaker.subscribe(mh, this.userDispatcher, this.pmm, null), !first);
            this.repull();
        }
        catch (JetStreamApiException | IOException e) {
            this.setupHbAlarmToTrigger();
        }
    }

    private void setupHbAlarmToTrigger() {
        this.pmm.resetTracking();
        this.pmm.initOrResetHeartbeatTimer();
    }

    @Override
    public void pendingUpdated() {
        if (this.stopped.get()) {
            if (this.pmm.noMorePending()) {
                this.fullClose();
            }
        } else if (this.pmm.pendingMessages <= this.thresholdMessages || this.pmm.trackingBytes && this.pmm.pendingBytes <= this.thresholdBytes) {
            this.repull();
        }
    }

    private void repull() {
        int rePullMessages = Math.max(1, this.consumeOpts.getBatchSize() - this.pmm.pendingMessages);
        long rePullBytes = this.consumeOpts.getBatchBytes() == 0L ? 0L : this.consumeOpts.getBatchBytes() - this.pmm.pendingBytes;
        PullRequestOptions pro = PullRequestOptions.builder(rePullMessages).maxBytes(rePullBytes).expiresIn(this.consumeOpts.getExpiresInMillis()).idleHeartbeat(this.consumeOpts.getIdleHeartbeat()).group(this.consumeOpts.getGroup()).minPending(this.consumeOpts.getMinPending()).minAckPending(this.consumeOpts.getMinAckPending()).build();
        this.sub._pull(pro, this.consumeOpts.raiseStatusWarnings(), this);
    }
}

