/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStreamStatusException;
import io.nats.client.Message;
import io.nats.client.PullRequestOptions;
import io.nats.client.impl.Headers;
import io.nats.client.impl.MessageManager;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.PullStatus;
import io.nats.client.support.Status;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

class PullMessageManager
extends MessageManager {
    protected final AtomicLong pendingMessages = new AtomicLong(0L);
    protected final AtomicLong pendingBytes = new AtomicLong(0L);
    protected final AtomicBoolean trackingBytes = new AtomicBoolean(false);

    protected PullMessageManager(NatsConnection conn, boolean syncMode) {
        super(conn, syncMode);
    }

    @Override
    protected void startup(NatsJetStreamSubscription sub) {
        super.startup(sub);
        sub.setBeforeQueueProcessor(this::beforeQueueProcessorImpl);
    }

    @Override
    protected void startPullRequest(PullRequestOptions pro) {
        this.pendingMessages.addAndGet(pro.getBatchSize());
        this.pendingBytes.addAndGet(pro.getMaxBytes());
        this.trackingBytes.set(this.pendingBytes.get() > 0L);
        this.initIdleHeartbeat(pro.getIdleHeartbeat(), -1L);
        if (this.hb) {
            this.initOrResetHeartbeatTimer();
        } else {
            this.shutdownHeartbeatTimer();
        }
    }

    @Override
    protected PullStatus getPullStatus() {
        return new PullStatus(this.pendingMessages.get(), this.pendingBytes.get(), this.hb);
    }

    private void trackPending(long m, long b) {
        boolean reachedEnd = false;
        if (m > 0L && this.pendingMessages.addAndGet(-m) < 1L) {
            reachedEnd = true;
        }
        if (this.trackingBytes.get() && b > 0L && this.pendingBytes.addAndGet(-b) < 1L) {
            reachedEnd = true;
        }
        if (reachedEnd) {
            this.pendingMessages.set(0L);
            this.pendingBytes.set(0L);
            this.trackingBytes.set(false);
            if (this.hb) {
                this.shutdownHeartbeatTimer();
            }
        }
    }

    @Override
    protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
        if (this.hb) {
            this.messageReceived();
            Status status = msg.getStatus();
            return status == null || !status.isHeartbeat();
        }
        return true;
    }

    @Override
    protected boolean manage(Message msg) {
        int statusCode;
        if (msg.getStatus() == null) {
            this.trackJsMessage(msg);
            this.trackPending(1L, this.bytesInMessage(msg));
            return false;
        }
        Status status = msg.getStatus();
        Headers h = msg.getHeaders();
        if (h != null) {
            String s = h.getFirst("Nats-Pending-Messages");
            long m = s == null ? -1L : Long.parseLong(s);
            s = h.getFirst("Nats-Pending-Bytes");
            long b = s == null ? -1L : Long.parseLong(s);
            this.trackPending(m, b);
        }
        if ((statusCode = status.getCode()) == 404 || statusCode == 408) {
            return true;
        }
        if (statusCode == 409 && status.getMessage().contains("Exceed")) {
            this.conn.executeCallback((c, el) -> el.pullStatusWarning(c, this.sub, status));
            return true;
        }
        this.conn.executeCallback((c, el) -> el.pullStatusError(c, this.sub, status));
        if (this.syncMode) {
            throw new JetStreamStatusException(this.sub, status);
        }
        return true;
    }

    private long bytesInMessage(Message msg) {
        NatsMessage nm = (NatsMessage)msg;
        return nm.subject.length() + nm.headerLen + nm.dataLen + (nm.replyTo == null ? 0 : nm.replyTo.length());
    }
}

