/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Message;
import io.nats.client.PullRequestOptions;
import io.nats.client.SubscribeOptions;
import io.nats.client.impl.Headers;
import io.nats.client.impl.MessageManager;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.PullManagerObserver;
import io.nats.client.support.Status;

class PullMessageManager
extends MessageManager {
    protected int pendingMessages;
    protected long pendingBytes;
    protected boolean trackingBytes;
    protected boolean raiseStatusWarnings;
    protected PullManagerObserver pullManagerObserver;

    protected PullMessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
        super(conn, so, syncMode);
        this.resetTracking();
    }

    @Override
    protected void startup(NatsJetStreamSubscription sub) {
        super.startup(sub);
        sub.setBeforeQueueProcessor(this::beforeQueueProcessorImpl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void startPullRequest(String pullSubject, PullRequestOptions pro, boolean raiseStatusWarnings, PullManagerObserver pullManagerObserver) {
        Object object = this.stateChangeLock;
        synchronized (object) {
            this.raiseStatusWarnings = raiseStatusWarnings;
            this.pullManagerObserver = pullManagerObserver;
            this.pendingMessages += pro.getBatchSize();
            this.pendingBytes += pro.getMaxBytes();
            this.trackingBytes = this.pendingBytes > 0L;
            this.configureIdleHeartbeat(pro.getIdleHeartbeat(), -1L);
            if (this.hb) {
                this.initOrResetHeartbeatTimer();
            } else {
                this.shutdownHeartbeatTimer();
            }
        }
    }

    @Override
    protected void handleHeartbeatError() {
        super.handleHeartbeatError();
        this.resetTracking();
        if (this.pullManagerObserver != null) {
            this.pullManagerObserver.heartbeatError();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void trackIncoming(int m, long b) {
        Object object = this.stateChangeLock;
        synchronized (object) {
            this.updateLastMessageReceived();
            if (m != Integer.MIN_VALUE) {
                boolean zero;
                this.pendingMessages -= m;
                boolean bl = zero = this.pendingMessages < 1;
                if (this.trackingBytes) {
                    this.pendingBytes -= b;
                    zero |= this.pendingBytes < 1L;
                }
                if (zero) {
                    this.resetTracking();
                }
                if (this.pullManagerObserver != null) {
                    this.pullManagerObserver.pendingUpdated();
                }
            }
        }
    }

    protected void resetTracking() {
        this.pendingMessages = 0;
        this.pendingBytes = 0L;
        this.trackingBytes = false;
        this.updateLastMessageReceived();
    }

    @Override
    protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
        Status status = msg.getStatus();
        if (status == null) {
            this.trackIncoming(1, msg.consumeByteCount());
            return true;
        }
        if (status.isHeartbeat()) {
            this.trackIncoming(Integer.MIN_VALUE, Integer.MIN_VALUE);
            return false;
        }
        int m = Integer.MIN_VALUE;
        long b = Long.MIN_VALUE;
        Headers h = msg.getHeaders();
        if (h != null) {
            try {
                m = Integer.parseInt(h.getFirst("Nats-Pending-Messages"));
                b = Long.parseLong(h.getFirst("Nats-Pending-Bytes"));
            }
            catch (NumberFormatException ignore) {
                m = Integer.MIN_VALUE;
            }
        }
        this.trackIncoming(m, b);
        return true;
    }

    @Override
    protected MessageManager.ManageResult manage(Message msg) {
        if (msg.getStatus() == null) {
            this.trackJsMessage(msg);
            return MessageManager.ManageResult.MESSAGE;
        }
        return this.manageStatus(msg);
    }

    protected MessageManager.ManageResult manageStatus(Message msg) {
        Status status = msg.getStatus();
        switch (status.getCode()) {
            case 404: 
            case 408: {
                if (this.raiseStatusWarnings) {
                    this.conn.executeCallback((c, el) -> el.pullStatusWarning(c, this.sub, status));
                }
                return MessageManager.ManageResult.STATUS_TERMINUS;
            }
            case 409: {
                String statMsg = status.getMessage();
                if (statMsg.startsWith("Exceeded Max")) {
                    if (this.raiseStatusWarnings) {
                        this.conn.executeCallback((c, el) -> el.pullStatusWarning(c, this.sub, status));
                    }
                    return MessageManager.ManageResult.STATUS_HANDLED;
                }
                if (!statMsg.equals(Status.BATCH_COMPLETED) && !statMsg.equals(Status.MESSAGE_SIZE_EXCEEDS_MAX_BYTES) && !statMsg.equals(Status.SERVER_SHUTDOWN)) break;
                return MessageManager.ManageResult.STATUS_TERMINUS;
            }
        }
        this.conn.executeCallback((c, el) -> el.pullStatusError(c, this.sub, status));
        return MessageManager.ManageResult.STATUS_ERROR;
    }

    protected boolean noMorePending() {
        return this.pendingMessages < 1 || this.trackingBytes && this.pendingBytes < 1L;
    }
}

