/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStreamStatusException;
import io.nats.client.Message;
import io.nats.client.PullRequestOptions;
import io.nats.client.impl.Headers;
import io.nats.client.impl.MessageManager;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.Status;

class PullMessageManager
extends MessageManager {
    protected long pendingMessages = 0L;
    protected long pendingBytes = 0L;
    protected boolean trackingBytes = false;

    protected PullMessageManager(NatsConnection conn, boolean syncMode) {
        super(conn, syncMode);
    }

    @Override
    protected void startup(NatsJetStreamSubscription sub) {
        super.startup(sub);
        sub.setBeforeQueueProcessor(this::beforeQueueProcessorImpl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void startPullRequest(PullRequestOptions pro) {
        Object object = this.stateChangeLock;
        synchronized (object) {
            this.pendingMessages += (long)pro.getBatchSize();
            this.pendingBytes += (long)pro.getMaxBytes();
            this.trackingBytes = this.pendingBytes > 0L;
            this.configureIdleHeartbeat(pro.getIdleHeartbeat(), -1L);
            if (this.hb) {
                this.initOrResetHeartbeatTimer();
            } else {
                this.shutdownHeartbeatTimer();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void trackPending(long m, long b) {
        Object object = this.stateChangeLock;
        synchronized (object) {
            boolean zero;
            this.pendingMessages -= m;
            boolean bl = zero = this.pendingMessages < 1L;
            if (this.trackingBytes) {
                this.pendingBytes -= b;
                zero |= this.pendingBytes < 1L;
            }
            if (zero) {
                this.pendingMessages = 0L;
                this.pendingBytes = 0L;
                this.trackingBytes = false;
                if (this.hb) {
                    this.shutdownHeartbeatTimer();
                }
            }
        }
    }

    @Override
    protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
        String s;
        this.messageReceived();
        Status status = msg.getStatus();
        if (status == null) {
            this.trackPending(1L, this.bytesInMessage(msg));
            return true;
        }
        if (status.isHeartbeat()) {
            return false;
        }
        Headers h = msg.getHeaders();
        if (h != null && (s = h.getFirst("Nats-Pending-Messages")) != null) {
            try {
                long m = Long.parseLong(s);
                long b = Long.parseLong(h.getFirst("Nats-Pending-Bytes"));
                this.trackPending(m, b);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        return true;
    }

    @Override
    protected MessageManager.ManageResult manage(Message msg) {
        Status status = msg.getStatus();
        if (status == null) {
            this.trackJsMessage(msg);
            return MessageManager.ManageResult.MESSAGE;
        }
        switch (status.getCode()) {
            case 404: 
            case 408: {
                return MessageManager.ManageResult.TERMINUS;
            }
            case 409: {
                String statMsg = status.getMessage();
                if (statMsg.startsWith("Exceeded Max")) {
                    this.conn.executeCallback((c, el) -> el.pullStatusWarning(c, this.sub, status));
                    return MessageManager.ManageResult.STATUS;
                }
                if (!statMsg.equals(Status.BATCH_COMPLETED) && !statMsg.equals(Status.MESSAGE_SIZE_EXCEEDS_MAX_BYTES)) break;
                return MessageManager.ManageResult.TERMINUS;
            }
        }
        this.conn.executeCallback((c, el) -> el.pullStatusError(c, this.sub, status));
        if (this.syncMode) {
            throw new JetStreamStatusException(this.sub, status);
        }
        return MessageManager.ManageResult.ERROR;
    }

    private long bytesInMessage(Message msg) {
        NatsMessage nm = (NatsMessage)msg;
        return nm.subject.length() + nm.headerLen + nm.dataLen + (nm.replyTo == null ? 0 : nm.replyTo.length());
    }
}

