/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.ConsumeOptions;
import io.nats.client.Dispatcher;
import io.nats.client.JetStreamApiException;
import io.nats.client.MessageHandler;
import io.nats.client.PullRequestOptions;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.impl.NatsMessageConsumerBase;
import io.nats.client.impl.PullManagerObserver;
import io.nats.client.impl.SimplifiedSubscriptionMaker;
import java.io.IOException;

class NatsMessageConsumer
extends NatsMessageConsumerBase
implements PullManagerObserver {
    protected final PullRequestOptions rePullPro;
    protected final int thresholdMessages;
    protected final long thresholdBytes;

    NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts, Dispatcher userDispatcher, MessageHandler userMessageHandler) throws IOException, JetStreamApiException {
        super(cachedConsumerInfo);
        int bm = opts.getBatchSize();
        long bb = opts.getBatchBytes();
        int rePullMessages = Math.max(1, bm * opts.getThresholdPercent() / 100);
        long rePullBytes = bb == 0L ? 0L : Math.max(1L, bb * (long)opts.getThresholdPercent() / 100L);
        this.rePullPro = PullRequestOptions.builder(rePullMessages).maxBytes(rePullBytes).expiresIn(opts.getExpiresInMillis()).idleHeartbeat(opts.getIdleHeartbeat()).build();
        this.thresholdMessages = bm - rePullMessages;
        this.thresholdBytes = bb == 0L ? Integer.MIN_VALUE : bb - rePullBytes;
        MessageHandler mh = userMessageHandler == null ? null : msg -> {
            userMessageHandler.onMessage(msg);
            if (this.stopped.get() && this.pmm.noMorePending()) {
                this.finished.set(true);
            }
        };
        this.initSub(subscriptionMaker.subscribe(mh, userDispatcher));
        this.sub._pull(PullRequestOptions.builder(bm).maxBytes(bb).expiresIn(opts.getExpiresInMillis()).idleHeartbeat(opts.getIdleHeartbeat()).build(), false, this);
    }

    @Override
    public void pendingUpdated() {
        if (!this.stopped.get() && (this.pmm.pendingMessages <= this.thresholdMessages || this.pmm.trackingBytes && this.pmm.pendingBytes <= this.thresholdBytes)) {
            this.sub._pull(this.rePullPro, false, this);
        }
    }
}

