/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.channel.Channel;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpBufferAckRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.transport.netty.DcpPipeline;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChannelFlowControllerImpl
implements ChannelFlowController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFlowControllerImpl.class);
    private final Channel channel;
    private final String description;
    private final boolean needsBufferAck;
    private final int bufferAckWatermark;
    private int bufferAckCounter;
    private boolean hasLoggedMessageAboutSkippingForInactiveChannel;

    public ChannelFlowControllerImpl(Channel channel, Client.Environment environment) {
        this.channel = Objects.requireNonNull(channel);
        this.description = DcpPipeline.describe(channel).toString();
        this.needsBufferAck = environment.dcpControl().bufferAckEnabled();
        if (this.needsBufferAck) {
            int bufferAckPercent = environment.bufferAckWatermark();
            int bufferSize = Integer.parseInt(environment.dcpControl().get(DcpControl.Names.CONNECTION_BUFFER_SIZE));
            this.bufferAckWatermark = (int)Math.round((double)bufferSize / 100.0 * (double)bufferAckPercent);
            LOGGER.info("{} BufferAckWatermark absolute is {} ({}%)", new Object[]{this.description, this.bufferAckWatermark, bufferAckPercent});
        } else {
            LOGGER.info("{} Flow control disabled", (Object)this.description);
            this.bufferAckWatermark = 0;
        }
        this.bufferAckCounter = 0;
    }

    @Override
    public void ack(ByteBuf message) {
        if (this.needsBufferAck && MessageUtil.requiresFlowControlAck(message)) {
            this.ack(message.readableBytes());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void ack(int numBytes) {
        if (!this.needsBufferAck) {
            return;
        }
        try {
            ChannelFlowControllerImpl channelFlowControllerImpl = this;
            synchronized (channelFlowControllerImpl) {
                this.bufferAckCounter += numBytes;
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("{} BufferAckCounter is now {}", (Object)this.description, (Object)this.bufferAckCounter);
                }
                if (this.bufferAckCounter >= this.bufferAckWatermark) {
                    if (!this.channel.isActive()) {
                        String message = "{} Skipping flow control ACK because channel is no longer active.";
                        if (!this.hasLoggedMessageAboutSkippingForInactiveChannel) {
                            this.hasLoggedMessageAboutSkippingForInactiveChannel = true;
                            LOGGER.info(message, (Object)this.description);
                        } else if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace(message, (Object)this.description);
                        }
                    } else {
                        int bytesToAck = this.bufferAckCounter;
                        if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace("{} BufferAckWatermark reached; acking now against the server for {} bytes.", (Object)this.description, (Object)bytesToAck);
                        }
                        ByteBuf buffer = this.channel.alloc().buffer();
                        DcpBufferAckRequest.init(buffer);
                        DcpBufferAckRequest.ackBytes(buffer, bytesToAck);
                        this.channel.writeAndFlush((Object)buffer).addListener(future -> {
                            if (future.isSuccess()) {
                                LOGGER.debug("{} Flow control ACK success, confirmed {} bytes", DcpPipeline.describe(this.channel), (Object)bytesToAck);
                            } else {
                                LOGGER.error("{} Flow control ACK failed; closing channel.", (Object)this.description, (Object)future.cause());
                                this.channel.close();
                            }
                        });
                    }
                    this.bufferAckCounter = 0;
                }
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("{} Acknowledging {} bytes.", (Object)this.description, (Object)numBytes);
                }
            }
        }
        catch (Throwable t) {
            if (!this.channel.isActive()) {
                LOGGER.debug("{} Flow control ack failed (channel already closed?)", (Object)this.description, (Object)t);
                return;
            }
            LOGGER.error("{} Flow control ACK failed; closing channel.", (Object)this.description, (Object)t);
            this.channel.close();
            if (t instanceof Error) {
                throw (Error)t;
            }
            throw (RuntimeException)t;
        }
    }
}

