/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.ConsumeOptions;
import io.nats.client.Dispatcher;
import io.nats.client.JetStreamApiException;
import io.nats.client.MessageHandler;
import io.nats.client.PullRequestOptions;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.impl.NatsMessageConsumerBase;
import io.nats.client.impl.PullManagerObserver;
import io.nats.client.impl.SimplifiedSubscriptionMaker;
import java.io.IOException;

class NatsMessageConsumer
extends NatsMessageConsumerBase
implements PullManagerObserver {
    protected final ConsumeOptions opts;
    protected final int thresholdMessages;
    protected final long thresholdBytes;
    protected final SimplifiedSubscriptionMaker subscriptionMaker;
    protected final Dispatcher userDispatcher;
    protected final MessageHandler userMessageHandler;

    NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts, Dispatcher userDispatcher, MessageHandler userMessageHandler) throws IOException, JetStreamApiException {
        super(cachedConsumerInfo);
        this.subscriptionMaker = subscriptionMaker;
        this.opts = opts;
        this.userDispatcher = userDispatcher;
        this.userMessageHandler = userMessageHandler;
        int bm = opts.getBatchSize();
        long bb = opts.getBatchBytes();
        int rePullMessages = Math.max(1, bm * opts.getThresholdPercent() / 100);
        long rePullBytes = bb == 0L ? 0L : Math.max(1L, bb * (long)opts.getThresholdPercent() / 100L);
        this.thresholdMessages = bm - rePullMessages;
        this.thresholdBytes = bb == 0L ? Integer.MIN_VALUE : bb - rePullBytes;
        this.doSub();
    }

    @Override
    public void heartbeatError() {
        try {
            this.lenientClose();
            this.doSub();
        }
        catch (JetStreamApiException | IOException e) {
            this.setupHbAlarmToTrigger();
        }
    }

    void doSub() throws JetStreamApiException, IOException {
        MessageHandler mh = this.userMessageHandler == null ? null : msg -> {
            this.userMessageHandler.onMessage(msg);
            if (this.stopped.get() && this.pmm.noMorePending()) {
                this.finished.set(true);
            }
        };
        try {
            super.initSub(this.subscriptionMaker.subscribe(mh, this.userDispatcher, this.pmm, null));
            this.repull();
            this.stopped.set(false);
            this.finished.set(false);
        }
        catch (JetStreamApiException | IOException e) {
            this.setupHbAlarmToTrigger();
        }
    }

    private void setupHbAlarmToTrigger() {
        this.pmm.resetTracking();
        this.pmm.initOrResetHeartbeatTimer();
    }

    @Override
    public void pendingUpdated() {
        if (!this.stopped.get() && (this.pmm.pendingMessages <= this.thresholdMessages || this.pmm.trackingBytes && this.pmm.pendingBytes <= this.thresholdBytes)) {
            this.repull();
        }
    }

    private void repull() {
        int rePullMessages = Math.max(1, this.opts.getBatchSize() - this.pmm.pendingMessages);
        long rePullBytes = this.opts.getBatchBytes() == 0L ? 0L : this.opts.getBatchBytes() - this.pmm.pendingBytes;
        PullRequestOptions pro = PullRequestOptions.builder(rePullMessages).maxBytes(rePullBytes).expiresIn(this.opts.getExpiresInMillis()).idleHeartbeat(this.opts.getIdleHeartbeat()).build();
        this.sub._pull(pro, false, this);
    }
}

