/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.AttributeKey;
import java.time.Clock;
import java.time.Duration;
import java.util.function.Supplier;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.neo4j.bolt.transport.DefaultThrottleLock;
import org.neo4j.bolt.transport.ThrottleLock;
import org.neo4j.bolt.transport.TransportThrottle;
import org.neo4j.bolt.transport.TransportThrottleException;

public class TransportWriteThrottle
implements TransportThrottle {
    static final AttributeKey<ThrottleLock> LOCK_KEY = AttributeKey.valueOf((String)"BOLT.WRITE_THROTTLE.LOCK");
    static final AttributeKey<Boolean> MAX_DURATION_EXCEEDED_KEY = AttributeKey.valueOf((String)"BOLT.WRITE_THROTTLE.MAX_DURATION_EXCEEDED");
    private final int lowWaterMark;
    private final int highWaterMark;
    private final Clock clock;
    private final long maxLockDuration;
    private final Supplier<ThrottleLock> lockSupplier;
    private final ChannelInboundHandler listener;

    public TransportWriteThrottle(int lowWaterMark, int highWaterMark, Clock clock, Duration maxLockDuration) {
        this(lowWaterMark, highWaterMark, clock, maxLockDuration, DefaultThrottleLock::new);
    }

    public TransportWriteThrottle(int lowWaterMark, int highWaterMark, Clock clock, Duration maxLockDuration, Supplier<ThrottleLock> lockSupplier) {
        this.lowWaterMark = lowWaterMark;
        this.highWaterMark = highWaterMark;
        this.clock = clock;
        this.maxLockDuration = maxLockDuration.toMillis();
        this.lockSupplier = lockSupplier;
        this.listener = new ChannelStatusListener();
    }

    @Override
    public void install(Channel channel) {
        ThrottleLock lock = this.lockSupplier.get();
        channel.attr(LOCK_KEY).set((Object)lock);
        channel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(this.lowWaterMark, this.highWaterMark));
        channel.pipeline().addLast(new ChannelHandler[]{this.listener});
    }

    @Override
    public void acquire(Channel channel) throws TransportThrottleException {
        if (!TransportWriteThrottle.isDurationAlreadyExceeded(channel)) {
            ThrottleLock lock = (ThrottleLock)channel.attr(LOCK_KEY).get();
            long startTimeMillis = 0L;
            while (channel.isOpen() && !channel.isWritable()) {
                if (this.maxLockDuration > 0L) {
                    long currentTimeMillis = this.clock.millis();
                    if (startTimeMillis == 0L) {
                        startTimeMillis = currentTimeMillis;
                    } else if (currentTimeMillis - startTimeMillis > this.maxLockDuration) {
                        TransportWriteThrottle.setDurationExceeded(channel);
                        throw new TransportThrottleException(String.format("Bolt connection [%s] will be closed because the client did not consume outgoing buffers for %s which is not expected.", channel.remoteAddress(), DurationFormatUtils.formatDurationHMS((long)this.maxLockDuration)));
                    }
                }
                try {
                    lock.lock(channel, 1000L);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    @Override
    public void release(Channel channel) {
        if (channel.isWritable()) {
            ThrottleLock lock = (ThrottleLock)channel.attr(LOCK_KEY).get();
            lock.unlock(channel);
        }
    }

    @Override
    public void uninstall(Channel channel) {
        channel.attr(LOCK_KEY).set(null);
    }

    private static boolean isDurationAlreadyExceeded(Channel channel) {
        Boolean marker = (Boolean)channel.attr(MAX_DURATION_EXCEEDED_KEY).get();
        return marker != null && marker != false;
    }

    private static void setDurationExceeded(Channel channel) {
        channel.attr(MAX_DURATION_EXCEEDED_KEY).set((Object)Boolean.TRUE);
    }

    @ChannelHandler.Sharable
    private class ChannelStatusListener
    extends ChannelInboundHandlerAdapter {
        private ChannelStatusListener() {
        }

        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            TransportWriteThrottle.this.release(ctx.channel());
        }
    }
}

