/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jcsmp.impl;

import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.impl.flow.FlowHandleImpl;
import com.solacesystems.jcsmp.impl.flow.SubscriberQueueHooks;
import com.solacesystems.jcsmp.protocol.CSMPSubscriberChannel;
import com.solacesystems.jcsmp.statistics.StatType;

public class DefaultFlowQueueHookImpl
implements SubscriberQueueHooks {
    volatile boolean backPressuring = false;
    int enqueueCalls = 0;
    int dequeueCalls = 0;
    FlowHandleImpl flowHandle;
    final int congested = JCSMPFactory.onlyInstance().getGlobalProperties().getConsumerDefaultFlowCongestionLimit();
    final int uncongested = JCSMPFactory.onlyInstance().getGlobalProperties().getConsumerDefaultFlowUncongestedLimit();
    final int CALL_THRESHOLD = Math.max(this.uncongested / 10, 1);

    public DefaultFlowQueueHookImpl(FlowHandleImpl flowHandle) {
        this.flowHandle = flowHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void postDequeue() {
        if (++this.dequeueCalls == Integer.MAX_VALUE) {
            this.dequeueCalls = 0;
        }
        if (this.dequeueCalls % this.CALL_THRESHOLD != 0) {
            return;
        }
        if (this.backPressuring) {
            CSMPSubscriberChannel channel = this.flowHandle.getCSMPSubscriberChannel();
            int sz = this.flowHandle.getMessageQueueSize();
            if (sz < this.uncongested && channel.connected()) {
                if (this.flowHandle != null) {
                    this.flowHandle.getLogTrace().info("Subscriber flow uncongested, re-enabling connection read.");
                }
                try {
                    channel.start();
                }
                catch (JCSMPException ex) {
                    if (this.flowHandle != null) {
                        this.flowHandle.getLogTrace().warn((Object)ex);
                    }
                }
                finally {
                    this.backPressuring = false;
                }
            }
        }
    }

    @Override
    public void postEnqueue() {
        if (++this.enqueueCalls == Integer.MAX_VALUE) {
            this.enqueueCalls = 0;
        }
        if (this.enqueueCalls % this.CALL_THRESHOLD != 0) {
            return;
        }
        if (!this.backPressuring) {
            CSMPSubscriberChannel channel = this.flowHandle.getCSMPSubscriberChannel();
            int sz = this.flowHandle.getMessageQueueSize();
            if (sz > this.congested && channel.connected()) {
                this.flowHandle.getLogTrace().info("Subscriber flow congested, disabling connection read.");
                this.flowHandle.getSession().getSessionStats().incStat(StatType.SUBSCRIBER_CONGESTED_EVENT);
                channel.stop();
                this.backPressuring = true;
            }
        }
    }
}

