/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.core.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpBufferAckRequest;
import com.couchbase.client.dcp.message.DcpControlRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.metrics.LogLevel;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowControlDiagnosticHandler
extends ChannelDuplexHandler {
    private static final Logger log = LoggerFactory.getLogger(FlowControlDiagnosticHandler.class);
    private static final Duration warningThreshold = Duration.ofMinutes(1L);
    private final String channelDesc;
    private long flowControlBufferSizeInBytes;
    private long unAckedBytes;
    private ScheduledFuture<?> scheduledTask;
    private NanoTimestamp lastAckTime = NanoTimestamp.now();

    public FlowControlDiagnosticHandler(String channelDesc) {
        this.channelDesc = Objects.requireNonNull(channelDesc);
    }

    public void channelActive(ChannelHandlerContext ctx) {
        this.scheduledTask = ctx.executor().scheduleAtFixedRate(this::logStatus, 10L, 10L, TimeUnit.SECONDS);
        ctx.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        if (this.scheduledTask != null) {
            this.scheduledTask.cancel(true);
        }
        ctx.fireChannelInactive();
    }

    private void logStatus() {
        if (this.flowControlBufferSizeInBytes > 0L) {
            LogLevel logLevel = this.bufferFull() && this.lastAckTime.hasElapsed(warningThreshold) ? LogLevel.WARN : LogLevel.DEBUG;
            logLevel.log(log, "{} STATUS: {} ; time since last ack = {}", this.channelDesc, this.bufferStatus(), this.lastAckTime.elapsed());
        }
    }

    private boolean bufferFull() {
        return this.unAckedBytes >= this.flowControlBufferSizeInBytes;
    }

    private Object bufferStatus() {
        return new Object(){

            public String toString() {
                int percent = (int)Math.ceil((double)FlowControlDiagnosticHandler.this.unAckedBytes / (double)FlowControlDiagnosticHandler.this.flowControlBufferSizeInBytes * 100.0);
                String fullOrNot = FlowControlDiagnosticHandler.this.bufferFull() ? "BUFFER FULL" : "ok";
                return "un-acked bytes = " + FlowControlDiagnosticHandler.this.unAckedBytes + " / " + FlowControlDiagnosticHandler.this.flowControlBufferSizeInBytes + " (" + percent + "%) " + fullOrNot;
            }
        };
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf)msg;
        if (MessageUtil.requiresFlowControlAck(buf)) {
            int messageSize = buf.readableBytes();
            this.unAckedBytes += (long)messageSize;
            if (log.isTraceEnabled()) {
                log.trace("{} Received {} flow-controlled bytes; new {}", new Object[]{this.channelDesc, messageSize, this.bufferStatus()});
            }
        }
        ctx.fireChannelRead(msg);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ByteBuf buf = (ByteBuf)msg;
        if (DcpControlRequest.is(buf) && MessageUtil.getKeyAsString(buf).equals(DcpControl.Names.CONNECTION_BUFFER_SIZE.value())) {
            this.flowControlBufferSizeInBytes = Long.parseLong(MessageUtil.getContentAsString(buf));
            log.debug("{} Flow control buffer size initialized to {} bytes", (Object)this.channelDesc, (Object)this.flowControlBufferSizeInBytes);
        }
        if (DcpBufferAckRequest.is(buf)) {
            long ackedBytes = MessageUtil.getExtras(buf).readUnsignedInt();
            if (ackedBytes > Integer.MAX_VALUE) {
                log.error("{} Acked byte count {} is > Integer.MAX_VALUE; is that a problem?", (Object)this.channelDesc, (Object)ackedBytes);
            }
            if (this.unAckedBytes < 0L) {
                log.error("{} Un-acked byte count {} is < 0", (Object)this.channelDesc, (Object)this.unAckedBytes);
            }
            promise = promise.unvoid();
            promise.addListener(future -> {
                if (future.isSuccess()) {
                    this.lastAckTime = NanoTimestamp.now();
                    this.unAckedBytes -= ackedBytes;
                    log.debug("{} Wrote flow control ack for {} bytes; new {}", new Object[]{this.channelDesc, ackedBytes, this.bufferStatus()});
                } else {
                    log.error("{} Failed to write flow control ack for {} bytes; some other component will close this chanel.", (Object)this.channelDesc, (Object)ackedBytes);
                }
            });
        }
        ctx.write(msg, promise);
    }
}

