/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.protocol;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceFactories;
import io.lettuce.core.protocol.ChannelLogDescriptor;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.ConnectionFacade;
import io.lettuce.core.protocol.ConnectionWatchdog;
import io.lettuce.core.protocol.DemandAware;
import io.lettuce.core.protocol.Endpoint;
import io.lettuce.core.protocol.HasQueuedCommands;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.protocol.SharedLock;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Recycler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

public class DefaultEndpoint
implements RedisChannelWriter,
Endpoint {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultEndpoint.class);
    private static final AtomicLong ENDPOINT_COUNTER = new AtomicLong();
    private static final AtomicIntegerFieldUpdater<DefaultEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater.newUpdater(DefaultEndpoint.class, "queueSize");
    protected volatile Channel channel;
    private final Reliability reliability;
    private final ClientOptions clientOptions;
    private final Queue<RedisCommand<?, ?, ?>> disconnectedBuffer;
    private final Queue<RedisCommand<?, ?, ?>> commandBuffer;
    private final boolean boundedQueues;
    private final long endpointId = ENDPOINT_COUNTER.incrementAndGet();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final SharedLock sharedLock = new SharedLock();
    private final boolean debugEnabled = logger.isDebugEnabled();
    private String logPrefix;
    private boolean autoFlushCommands = true;
    private ConnectionWatchdog connectionWatchdog;
    private ConnectionFacade connectionFacade;
    private volatile Throwable connectionError;
    private volatile int queueSize = 0;

    public DefaultEndpoint(ClientOptions clientOptions) {
        LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
        this.clientOptions = clientOptions;
        this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE;
        this.disconnectedBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
        this.commandBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
        this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
    }

    @Override
    public void setConnectionFacade(ConnectionFacade connectionFacade) {
        this.connectionFacade = connectionFacade;
    }

    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        this.autoFlushCommands = autoFlush;
    }

    @Override
    public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
        LettuceAssert.notNull(command, "Command must not be null");
        try {
            this.sharedLock.incrementWriters();
            this.validateWrite(1);
            if (this.autoFlushCommands) {
                if (this.isConnected()) {
                    this.writeToChannelAndFlush(command);
                } else {
                    this.writeToDisconnectedBuffer(command);
                }
            } else {
                this.writeToBuffer(command);
            }
        }
        finally {
            this.sharedLock.decrementWriters();
            if (this.debugEnabled) {
                logger.debug("{} write() done", (Object)this.logPrefix());
            }
        }
        return command;
    }

    @Override
    public <K, V> Collection<RedisCommand<K, V, ?>> write(Collection<? extends RedisCommand<K, V, ?>> commands) {
        LettuceAssert.notNull(commands, "Commands must not be null");
        try {
            this.sharedLock.incrementWriters();
            this.validateWrite(commands.size());
            if (this.autoFlushCommands) {
                if (this.isConnected()) {
                    this.writeToChannelAndFlush(commands);
                } else {
                    this.writeToDisconnectedBuffer(commands);
                }
            } else {
                this.writeToBuffer((RedisCommand)((Object)commands));
            }
        }
        finally {
            this.sharedLock.decrementWriters();
            if (this.debugEnabled) {
                logger.debug("{} write() done", (Object)this.logPrefix());
            }
        }
        return commands;
    }

    private void validateWrite(int commands) {
        if (this.isClosed()) {
            throw new RedisException("Connection is closed");
        }
        if (this.usesBoundedQueues()) {
            boolean connected = this.isConnected();
            if (QUEUE_SIZE.get(this) + commands > this.clientOptions.getRequestQueueSize()) {
                throw new RedisException("Request queue size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
            }
            if (!connected && this.disconnectedBuffer.size() + commands > this.clientOptions.getRequestQueueSize()) {
                throw new RedisException("Request queue size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
            }
            if (connected && this.commandBuffer.size() + commands > this.clientOptions.getRequestQueueSize()) {
                throw new RedisException("Command buffer size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
            }
        }
        if (!this.isConnected() && this.isRejectCommand()) {
            throw new RedisException("Currently not connected. Commands are rejected.");
        }
    }

    private boolean usesBoundedQueues() {
        return this.boundedQueues;
    }

    private void writeToBuffer(Iterable<? extends RedisCommand<?, ?, ?>> commands) {
        for (RedisCommand<?, ?, ?> command : commands) {
            this.writeToBuffer(command);
        }
    }

    private void writeToDisconnectedBuffer(Collection<? extends RedisCommand<?, ?, ?>> commands) {
        for (RedisCommand<?, ?, ?> command : commands) {
            this.writeToDisconnectedBuffer(command);
        }
    }

    private void writeToDisconnectedBuffer(RedisCommand<?, ?, ?> command) {
        if (this.connectionError != null) {
            if (this.debugEnabled) {
                logger.debug("{} writeToDisconnectedBuffer() Completing command {} due to connection error", (Object)this.logPrefix(), command);
            }
            command.completeExceptionally(this.connectionError);
            return;
        }
        if (this.debugEnabled) {
            logger.debug("{} writeToDisconnectedBuffer() buffering (disconnected) command {}", (Object)this.logPrefix(), command);
        }
        this.disconnectedBuffer.add(command);
    }

    protected <C extends RedisCommand<?, ?, T>, T> void writeToBuffer(C command) {
        if (this.debugEnabled) {
            logger.debug("{} writeToBuffer() buffering command {}", (Object)this.logPrefix(), command);
        }
        if (this.connectionError != null) {
            if (this.debugEnabled) {
                logger.debug("{} writeToBuffer() Completing command {} due to connection error", (Object)this.logPrefix(), command);
            }
            command.completeExceptionally(this.connectionError);
            return;
        }
        this.commandBuffer.add(command);
    }

    private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
        QUEUE_SIZE.incrementAndGet(this);
        if (this.reliability == Reliability.AT_MOST_ONCE) {
            this.channelWriteAndFlush(command).addListener((GenericFutureListener)AtMostOnceWriteListener.newInstance(this, command));
        }
        if (this.reliability == Reliability.AT_LEAST_ONCE) {
            this.channelWriteAndFlush(command).addListener((GenericFutureListener)RetryListener.newInstance(this, command));
        }
    }

    private void writeToChannelAndFlush(Collection<? extends RedisCommand<?, ?, ?>> commands) {
        QUEUE_SIZE.addAndGet(this, commands.size());
        if (this.reliability == Reliability.AT_MOST_ONCE) {
            for (RedisCommand<?, ?, ?> command : commands) {
                this.channelWrite(command).addListener((GenericFutureListener)AtMostOnceWriteListener.newInstance(this, command));
            }
        }
        if (this.reliability == Reliability.AT_LEAST_ONCE) {
            for (RedisCommand<?, ?, ?> command : commands) {
                this.channelWrite(command).addListener((GenericFutureListener)RetryListener.newInstance(this, command));
            }
        }
        this.channelFlush();
    }

    private void channelFlush() {
        if (this.debugEnabled) {
            logger.debug("{} write() channelFlush", (Object)this.logPrefix());
        }
        this.channel.flush();
    }

    private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
        if (this.debugEnabled) {
            logger.debug("{} write() channelWrite command {}", (Object)this.logPrefix(), command);
        }
        return this.channel.write(command);
    }

    private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
        if (this.debugEnabled) {
            logger.debug("{} write() writeAndFlush command {}", (Object)this.logPrefix(), command);
        }
        return this.channel.writeAndFlush(command);
    }

    private boolean isRejectCommand() {
        if (this.clientOptions == null) {
            return false;
        }
        switch (this.clientOptions.getDisconnectedBehavior()) {
            case REJECT_COMMANDS: {
                return true;
            }
            case ACCEPT_COMMANDS: {
                return false;
            }
        }
        return !this.clientOptions.isAutoReconnect();
    }

    @Override
    public void notifyChannelActive(Channel channel) {
        this.logPrefix = null;
        this.channel = channel;
        this.connectionError = null;
        if (this.isClosed()) {
            logger.info("{} Closing channel because endpoint is already closed", (Object)this.logPrefix());
            channel.close();
            return;
        }
        if (this.connectionWatchdog != null) {
            this.connectionWatchdog.arm();
        }
        this.sharedLock.doExclusive(() -> {
            try {
                if (this.debugEnabled) {
                    logger.debug("{} activateEndpointAndExecuteBufferedCommands {} command(s) buffered", (Object)this.logPrefix(), (Object)this.disconnectedBuffer.size());
                }
                if (this.debugEnabled) {
                    logger.debug("{} activating endpoint", (Object)this.logPrefix());
                }
                this.connectionFacade.activated();
                this.flushCommands(this.disconnectedBuffer);
            }
            catch (Exception e) {
                if (this.debugEnabled) {
                    logger.debug("{} channelActive() ran into an exception", (Object)this.logPrefix());
                }
                if (this.clientOptions.isCancelCommandsOnReconnectFailure()) {
                    this.reset();
                }
                throw e;
            }
        });
    }

    @Override
    public void notifyChannelInactive(Channel channel) {
        if (this.isClosed()) {
            this.cancelBufferedCommands("Connection closed");
        }
        this.sharedLock.doExclusive(() -> {
            if (this.debugEnabled) {
                logger.debug("{} deactivating endpoint handler", (Object)this.logPrefix());
            }
            this.connectionFacade.deactivated();
        });
        if (this.channel == channel) {
            this.channel = null;
        }
    }

    @Override
    public void notifyException(Throwable t) {
        if (t instanceof RedisConnectionException && RedisConnectionException.isProtectedMode(t.getMessage())) {
            this.connectionError = t;
            if (this.connectionWatchdog != null) {
                this.connectionWatchdog.setListenOnChannelInactive(false);
                this.connectionWatchdog.setReconnectSuspended(false);
            }
            this.doExclusive(this::drainCommands).forEach(cmd -> cmd.completeExceptionally(t));
        }
        if (!this.isConnected()) {
            this.connectionError = t;
        }
    }

    @Override
    public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) {
        this.connectionWatchdog = connectionWatchdog;
    }

    @Override
    public void flushCommands() {
        this.flushCommands(this.commandBuffer);
    }

    private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {
        if (this.debugEnabled) {
            logger.debug("{} flushCommands()", (Object)this.logPrefix());
        }
        if (this.isConnected()) {
            List commands = this.sharedLock.doExclusive(() -> {
                if (queue.isEmpty()) {
                    return Collections.emptyList();
                }
                return DefaultEndpoint.drainCommands(queue);
            });
            if (this.debugEnabled) {
                logger.debug("{} flushCommands() Flushing {} commands", (Object)this.logPrefix(), (Object)commands.size());
            }
            if (!commands.isEmpty()) {
                this.writeToChannelAndFlush(commands);
            }
        }
    }

    @Override
    public void close() {
        if (this.debugEnabled) {
            logger.debug("{} close()", (Object)this.logPrefix());
        }
        if (this.isClosed()) {
            return;
        }
        if (this.closed.compareAndSet(false, true)) {
            if (this.connectionWatchdog != null) {
                this.connectionWatchdog.prepareClose();
            }
            this.cancelBufferedCommands("Close");
            Channel currentChannel = this.channel;
            if (currentChannel != null) {
                ChannelFuture close = currentChannel.close();
                if (currentChannel.isOpen()) {
                    close.syncUninterruptibly();
                }
            }
        }
    }

    @Override
    public void reset() {
        if (this.debugEnabled) {
            logger.debug("{} reset()", (Object)this.logPrefix());
        }
        if (this.channel != null) {
            this.channel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Reset());
        }
        this.cancelBufferedCommands("Reset");
    }

    public void initialState() {
        this.commandBuffer.clear();
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            ChannelFuture close = currentChannel.close();
            if (currentChannel.isOpen()) {
                close.syncUninterruptibly();
            }
        }
    }

    @Override
    public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {
        if (this.isClosed()) {
            this.cancelCommands("Connection closed", queuedCommands.drainQueue());
            this.cancelCommands("Connection closed", this.drainCommands());
            return;
        }
        this.sharedLock.doExclusive(() -> {
            Collection<RedisCommand<RedisCommand<?, ?, ?>, RedisCommand<?, ?, ?>, RedisCommand<?, ?, ?>>> commands = queuedCommands.drainQueue();
            if (this.debugEnabled) {
                logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", (Object)this.logPrefix(), (Object)commands.size());
            }
            commands.addAll(DefaultEndpoint.drainCommands(this.disconnectedBuffer));
            for (RedisCommand<?, ?, ?> redisCommand : commands) {
                if (!(redisCommand instanceof DemandAware.Sink)) continue;
                ((DemandAware.Sink)((Object)redisCommand)).removeSource();
            }
            try {
                this.disconnectedBuffer.addAll(commands);
            }
            catch (RuntimeException e) {
                if (this.debugEnabled) {
                    logger.debug("{} notifyQueuedCommands Queue overcommit. Cannot add all commands to buffer (disconnected).", (Object)this.logPrefix(), (Object)commands.size());
                }
                commands.removeAll(this.disconnectedBuffer);
                for (RedisCommand<?, ?, ?> redisCommand : commands) {
                    redisCommand.completeExceptionally(e);
                }
            }
            if (this.isConnected()) {
                this.flushCommands(this.disconnectedBuffer);
            }
        });
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    protected <T> T doExclusive(Supplier<T> supplier) {
        return this.sharedLock.doExclusive(supplier);
    }

    protected List<RedisCommand<?, ?, ?>> drainCommands() {
        ArrayList target = new ArrayList();
        target.addAll(DefaultEndpoint.drainCommands(this.disconnectedBuffer));
        target.addAll(DefaultEndpoint.drainCommands(this.commandBuffer));
        return target;
    }

    private static List<RedisCommand<?, ?, ?>> drainCommands(Queue<? extends RedisCommand<?, ?, ?>> source) {
        RedisCommand<?, ?, ?> cmd;
        ArrayList target = new ArrayList(source.size());
        while ((cmd = source.poll()) != null) {
            target.add(cmd);
        }
        return target;
    }

    private void cancelBufferedCommands(String message) {
        this.cancelCommands(message, this.doExclusive(this::drainCommands));
    }

    private void cancelCommands(String message, Iterable<? extends RedisCommand<?, ?, ?>> toCancel) {
        for (RedisCommand<?, ?, ?> cmd : toCancel) {
            if (cmd.getOutput() != null) {
                cmd.getOutput().setError(message);
            }
            cmd.cancel();
        }
    }

    private boolean isConnected() {
        Channel channel = this.channel;
        return channel != null && channel.isActive();
    }

    protected String logPrefix() {
        String buffer;
        if (this.logPrefix != null) {
            return this.logPrefix;
        }
        this.logPrefix = buffer = "[" + ChannelLogDescriptor.logDescriptor(this.channel) + ", epid=0x" + Long.toHexString(this.endpointId) + ']';
        return this.logPrefix;
    }

    private static enum Reliability {
        AT_MOST_ONCE,
        AT_LEAST_ONCE;

    }

    static class RetryListener
    extends ListenerSupport
    implements GenericFutureListener<Future<Void>> {
        private static final Recycler<RetryListener> RECYCLER = new Recycler<RetryListener>(){

            protected RetryListener newObject(Recycler.Handle<RetryListener> handle) {
                return new RetryListener(handle);
            }
        };
        private final Recycler.Handle<RetryListener> handle;

        RetryListener(Recycler.Handle<RetryListener> handle) {
            this.handle = handle;
        }

        static RetryListener newInstance(DefaultEndpoint endpoint, RedisCommand<?, ?, ?> command) {
            RetryListener entry = (RetryListener)RECYCLER.get();
            if (command == null) {
                System.out.println();
            }
            entry.endpoint = endpoint;
            entry.sentCommand = command;
            return entry;
        }

        static RetryListener newInstance(DefaultEndpoint endpoint, Collection<? extends RedisCommand<?, ?, ?>> commands) {
            RetryListener entry = (RetryListener)RECYCLER.get();
            entry.endpoint = endpoint;
            entry.sentCommands = commands;
            return entry;
        }

        public void operationComplete(Future<Void> future) {
            try {
                this.doComplete(future);
            }
            finally {
                this.recycle();
            }
        }

        private void doComplete(Future<Void> future) {
            Throwable cause = future.cause();
            boolean success = future.isSuccess();
            this.dequeue();
            if (success) {
                return;
            }
            if (cause instanceof EncoderException || cause instanceof Error || cause.getCause() instanceof Error) {
                this.complete(cause);
                return;
            }
            Channel channel = this.endpoint.channel;
            if (channel != null) {
                RedisCommand sentCommand = this.sentCommand;
                Collection sentCommands = this.sentCommands;
                DefaultEndpoint endpoint = this.endpoint;
                channel.eventLoop().submit(() -> this.requeueCommands(sentCommand, sentCommands, endpoint));
            } else {
                this.requeueCommands(this.sentCommand, this.sentCommands, this.endpoint);
            }
            if (!(cause instanceof ClosedChannelException)) {
                String message = "Unexpected exception during request: {}";
                InternalLogLevel logLevel = InternalLogLevel.WARN;
                if (cause instanceof IOException && CommandHandler.SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                    logLevel = InternalLogLevel.DEBUG;
                }
                logger.log(logLevel, message, (Object)cause.toString(), (Object)cause);
            }
        }

        private void requeueCommands(RedisCommand<?, ?, ?> sentCommand, Collection sentCommands, DefaultEndpoint endpoint) {
            if (sentCommand != null) {
                try {
                    endpoint.write(sentCommand);
                }
                catch (Exception e) {
                    sentCommand.completeExceptionally(e);
                }
            } else {
                try {
                    endpoint.write(sentCommands);
                }
                catch (Exception e) {
                    for (RedisCommand command : sentCommands) {
                        command.completeExceptionally(e);
                    }
                }
            }
        }

        private void recycle() {
            this.endpoint = null;
            this.sentCommand = null;
            this.sentCommands = null;
            this.handle.recycle((Object)this);
        }
    }

    static class AtMostOnceWriteListener
    extends ListenerSupport
    implements ChannelFutureListener {
        private static final Recycler<AtMostOnceWriteListener> RECYCLER = new Recycler<AtMostOnceWriteListener>(){

            protected AtMostOnceWriteListener newObject(Recycler.Handle<AtMostOnceWriteListener> handle) {
                return new AtMostOnceWriteListener(handle);
            }
        };
        private final Recycler.Handle<AtMostOnceWriteListener> handle;

        AtMostOnceWriteListener(Recycler.Handle<AtMostOnceWriteListener> handle) {
            this.handle = handle;
        }

        static AtMostOnceWriteListener newInstance(DefaultEndpoint endpoint, RedisCommand<?, ?, ?> command) {
            AtMostOnceWriteListener entry = (AtMostOnceWriteListener)RECYCLER.get();
            entry.endpoint = endpoint;
            entry.sentCommand = command;
            return entry;
        }

        static AtMostOnceWriteListener newInstance(DefaultEndpoint endpoint, Collection<? extends RedisCommand<?, ?, ?>> commands) {
            AtMostOnceWriteListener entry = (AtMostOnceWriteListener)RECYCLER.get();
            entry.endpoint = endpoint;
            entry.sentCommands = commands;
            return entry;
        }

        public void operationComplete(ChannelFuture future) {
            try {
                this.dequeue();
                if (!future.isSuccess() && future.cause() != null) {
                    this.complete(future.cause());
                }
            }
            finally {
                this.recycle();
            }
        }

        private void recycle() {
            this.endpoint = null;
            this.sentCommand = null;
            this.sentCommands = null;
            this.handle.recycle((Object)this);
        }
    }

    static class ListenerSupport {
        Collection<? extends RedisCommand<?, ?, ?>> sentCommands;
        RedisCommand<?, ?, ?> sentCommand;
        DefaultEndpoint endpoint;

        ListenerSupport() {
        }

        void dequeue() {
            if (this.sentCommand != null) {
                QUEUE_SIZE.decrementAndGet(this.endpoint);
            } else {
                QUEUE_SIZE.addAndGet(this.endpoint, -this.sentCommands.size());
            }
        }

        protected void complete(Throwable t) {
            if (this.sentCommand != null) {
                this.sentCommand.completeExceptionally(t);
            } else {
                for (RedisCommand<?, ?, ?> sentCommand : this.sentCommands) {
                    sentCommand.completeExceptionally(t);
                }
            }
        }
    }
}

