/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Message;
import io.nats.client.NatsSystemClock;
import io.nats.client.PullRequestOptions;
import io.nats.client.SubscribeOptions;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStreamMetaData;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.PullManagerObserver;
import io.nats.client.support.ScheduledTask;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

abstract class MessageManager {
    protected static final int THRESHOLD = 3;
    protected final ReentrantLock stateChangeLock = new ReentrantLock();
    protected final NatsConnection conn;
    protected final SubscribeOptions so;
    protected final boolean syncMode;
    protected NatsJetStreamSubscription sub;
    protected long lastStreamSeq;
    protected long lastConsumerSeq;
    protected AtomicLong lastMsgReceivedNanoTime;
    protected final AtomicBoolean hb;
    protected final AtomicLong idleHeartbeatSettingMillis;
    protected final AtomicLong alarmPeriodSettingNanos;
    protected final AtomicReference<ScheduledTask> heartbeatTaskRef;

    protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
        this.conn = conn;
        this.so = so;
        this.syncMode = syncMode;
        this.lastStreamSeq = 0L;
        this.lastConsumerSeq = 0L;
        this.hb = new AtomicBoolean(false);
        this.idleHeartbeatSettingMillis = new AtomicLong();
        this.alarmPeriodSettingNanos = new AtomicLong();
        this.lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
        this.heartbeatTaskRef = new AtomicReference();
    }

    protected boolean isSyncMode() {
        return this.syncMode;
    }

    protected long getLastStreamSequence() {
        return this.lastStreamSeq;
    }

    protected long getLastConsumerSequence() {
        return this.lastConsumerSeq;
    }

    protected long getLastMsgReceivedNanoTime() {
        return this.lastMsgReceivedNanoTime.get();
    }

    protected boolean isHb() {
        return this.hb.get();
    }

    protected long getIdleHeartbeatSetting() {
        return this.idleHeartbeatSettingMillis.get();
    }

    protected long getAlarmPeriodSettingNanos() {
        return this.alarmPeriodSettingNanos.get();
    }

    protected void startup(NatsJetStreamSubscription sub) {
        this.sub = sub;
    }

    protected void shutdown() {
        this.shutdownHeartbeatTimer();
    }

    protected void startPullRequest(String pullSubject, PullRequestOptions pullRequestOptions, boolean raiseStatusWarnings, PullManagerObserver pullManagerObserver) {
    }

    protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
        return true;
    }

    protected abstract ManageResult manage(Message var1);

    protected void trackJsMessage(Message msg) {
        this.stateChangeLock.lock();
        try {
            NatsJetStreamMetaData meta = msg.metaData();
            this.lastStreamSeq = meta.streamSequence();
            ++this.lastConsumerSeq;
        }
        finally {
            this.stateChangeLock.unlock();
        }
    }

    protected void handleHeartbeatError() {
        this.conn.executeCallback((c, el) -> el.heartbeatAlarm(c, this.sub, this.lastStreamSeq, this.lastConsumerSeq));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {
        this.stateChangeLock.lock();
        try {
            long idleSettingMillis = configIdleHeartbeat == null ? 0L : configIdleHeartbeat.toMillis();
            this.idleHeartbeatSettingMillis.set(idleSettingMillis);
            if (idleSettingMillis <= 0L) {
                this.alarmPeriodSettingNanos.set(0L);
                this.hb.set(false);
            } else {
                long alarmPeriodSettingMillis = configMessageAlarmTime < idleSettingMillis ? idleSettingMillis * 3L : configMessageAlarmTime;
                this.alarmPeriodSettingNanos.set(alarmPeriodSettingMillis * 1000000L);
                this.hb.set(true);
            }
        }
        finally {
            this.stateChangeLock.unlock();
        }
    }

    protected void updateLastMessageReceived() {
        this.lastMsgReceivedNanoTime.set(NatsSystemClock.nanoTime());
    }

    protected void initOrResetHeartbeatTimer() {
        this.stateChangeLock.lock();
        try {
            ScheduledTask hbTask = this.heartbeatTaskRef.get();
            if (hbTask != null) {
                hbTask.shutdown();
            }
            this.heartbeatTaskRef.set(new ScheduledTask(this.conn.getScheduledExecutor(), this.alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS, () -> {
                long sinceLast = NatsSystemClock.nanoTime() - this.lastMsgReceivedNanoTime.get();
                if (sinceLast > this.alarmPeriodSettingNanos.get()) {
                    this.handleHeartbeatError();
                }
            }));
            this.updateLastMessageReceived();
        }
        finally {
            this.stateChangeLock.unlock();
        }
    }

    protected void shutdownHeartbeatTimer() {
        this.stateChangeLock.lock();
        try {
            ScheduledTask hbTask = this.heartbeatTaskRef.get();
            if (hbTask != null) {
                hbTask.shutdown();
                this.heartbeatTaskRef.set(null);
            }
        }
        finally {
            this.stateChangeLock.unlock();
        }
    }

    public static enum ManageResult {
        MESSAGE,
        STATUS_HANDLED,
        STATUS_TERMINUS,
        STATUS_ERROR;

    }
}

