/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionBuilder;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.ConnectionPoint;
import io.lettuce.core.ConnectionState;
import io.lettuce.core.DefaultConnectionFuture;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisConnectionStateListener;
import io.lettuce.core.RedisHandshake;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.Transports;
import io.lettuce.core.internal.AsyncCloseable;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceStrings;
import io.lettuce.core.protocol.ConnectionWatchdog;
import io.lettuce.core.protocol.RedisHandshakeHandler;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.Closeable;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Mono;

public abstract class AbstractRedisClient {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractRedisClient.class);
    private static final int EVENTLOOP_ACQ_INACTIVE = 0;
    private static final int EVENTLOOP_ACQ_ACTIVE = 1;
    private final AtomicInteger eventLoopGroupCas = new AtomicInteger();
    protected final ConnectionEvents connectionEvents = new ConnectionEvents();
    protected final Set<Closeable> closeableResources = ConcurrentHashMap.newKeySet();
    protected final ChannelGroup channels;
    private final ClientResources clientResources;
    private final Map<Class<? extends EventLoopGroup>, EventLoopGroup> eventLoopGroups = new ConcurrentHashMap<Class<? extends EventLoopGroup>, EventLoopGroup>(2);
    private final boolean sharedResources;
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private volatile ClientOptions clientOptions = ClientOptions.create();
    private volatile Duration defaultTimeout = RedisURI.DEFAULT_TIMEOUT_DURATION;

    protected AbstractRedisClient(ClientResources clientResources) {
        if (clientResources == null) {
            this.sharedResources = false;
            this.clientResources = DefaultClientResources.create();
        } else {
            this.sharedResources = true;
            this.clientResources = clientResources;
        }
        this.channels = new DefaultChannelGroup(this.clientResources.eventExecutorGroup().next());
    }

    protected int getChannelCount() {
        return this.channels.size();
    }

    public Duration getDefaultTimeout() {
        return this.defaultTimeout;
    }

    public void setDefaultTimeout(Duration timeout) {
        LettuceAssert.notNull((Object)timeout, "Timeout duration must not be null");
        LettuceAssert.isTrue(!timeout.isNegative(), "Timeout duration must be greater or equal to zero");
        this.defaultTimeout = timeout;
    }

    @Deprecated
    public void setDefaultTimeout(long timeout, TimeUnit unit) {
        this.setDefaultTimeout(Duration.ofNanos(unit.toNanos(timeout)));
    }

    public ClientOptions getOptions() {
        return this.clientOptions;
    }

    protected void setOptions(ClientOptions clientOptions) {
        LettuceAssert.notNull((Object)clientOptions, "ClientOptions must not be null");
        this.clientOptions = clientOptions;
    }

    public ClientResources getResources() {
        return this.clientResources;
    }

    protected int getResourceCount() {
        return this.closeableResources.size();
    }

    public void addListener(RedisConnectionStateListener listener) {
        LettuceAssert.notNull((Object)listener, "RedisConnectionStateListener must not be null");
        this.connectionEvents.addListener(listener);
    }

    public void removeListener(RedisConnectionStateListener listener) {
        LettuceAssert.notNull((Object)listener, "RedisConnectionStateListener must not be null");
        this.connectionEvents.removeListener(listener);
    }

    protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder, RedisURI redisURI) {
        Bootstrap redisBootstrap = new Bootstrap();
        redisBootstrap.option(ChannelOption.ALLOCATOR, (Object)ByteBufAllocator.DEFAULT);
        ClientOptions clientOptions = this.getOptions();
        SocketOptions socketOptions = clientOptions.getSocketOptions();
        redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
        if (LettuceStrings.isEmpty(redisURI.getSocket())) {
            redisBootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)socketOptions.isKeepAlive());
            redisBootstrap.option(ChannelOption.TCP_NODELAY, (Object)socketOptions.isTcpNoDelay());
        }
        connectionBuilder.apply(redisURI);
        connectionBuilder.bootstrap(redisBootstrap);
        connectionBuilder.channelGroup(this.channels).connectionEvents(this.connectionEvents);
        connectionBuilder.socketAddressSupplier(socketAddressSupplier);
    }

    protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {
        LettuceAssert.notNull((Object)connectionPoint, "ConnectionPoint must not be null");
        connectionBuilder.bootstrap().group(this.getEventLoopGroup(connectionPoint));
        if (connectionPoint.getSocket() != null) {
            Transports.NativeTransports.assertAvailable();
            connectionBuilder.bootstrap().channel(Transports.NativeTransports.domainSocketChannelClass());
        } else {
            connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
        }
    }

    private EventLoopGroup getEventLoopGroup(ConnectionPoint connectionPoint) {
        while (!this.eventLoopGroupCas.compareAndSet(0, 1)) {
        }
        try {
            EventLoopGroup eventLoopGroup = this.doGetEventExecutor(connectionPoint);
            return eventLoopGroup;
        }
        finally {
            this.eventLoopGroupCas.set(0);
        }
    }

    private EventLoopGroup doGetEventExecutor(ConnectionPoint connectionPoint) {
        if (connectionPoint.getSocket() == null && !this.eventLoopGroups.containsKey(Transports.eventLoopGroupClass())) {
            this.eventLoopGroups.put(Transports.eventLoopGroupClass(), this.clientResources.eventLoopGroupProvider().allocate(Transports.eventLoopGroupClass()));
        }
        if (connectionPoint.getSocket() != null) {
            Transports.NativeTransports.assertAvailable();
            Class<? extends EventLoopGroup> eventLoopGroupClass = Transports.NativeTransports.eventLoopGroupClass();
            if (!this.eventLoopGroups.containsKey(Transports.NativeTransports.eventLoopGroupClass())) {
                this.eventLoopGroups.put(eventLoopGroupClass, this.clientResources.eventLoopGroupProvider().allocate(eventLoopGroupClass));
            }
        }
        if (connectionPoint.getSocket() == null) {
            return this.eventLoopGroups.get(Transports.eventLoopGroupClass());
        }
        if (connectionPoint.getSocket() != null) {
            Transports.NativeTransports.assertAvailable();
            return this.eventLoopGroups.get(Transports.NativeTransports.eventLoopGroupClass());
        }
        throw new IllegalStateException("This should not have happened in a binary decision. Please file a bug.");
    }

    protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {
        try {
            return connectionFuture.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), (Throwable)e);
        }
        catch (Exception e) {
            throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), Exceptions.unwrap(e));
        }
    }

    protected <T> T getConnection(CompletableFuture<T> connectionFuture) {
        try {
            return connectionFuture.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw RedisConnectionException.create(e);
        }
        catch (Exception e) {
            throw RedisConnectionException.create(Exceptions.unwrap(e));
        }
    }

    protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(ConnectionBuilder connectionBuilder) {
        Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();
        if (this.clientResources.eventExecutorGroup().isShuttingDown()) {
            throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
        }
        CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<SocketAddress>();
        CompletableFuture channelReadyFuture = new CompletableFuture();
        socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete).subscribe(redisAddress -> {
            if (channelReadyFuture.isCancelled()) {
                return;
            }
            this.initializeChannelAsync0(connectionBuilder, channelReadyFuture, (SocketAddress)redisAddress);
        }, channelReadyFuture::completeExceptionally);
        return new DefaultConnectionFuture(socketAddressFuture, channelReadyFuture.thenApply(channel -> connectionBuilder.connection()));
    }

    private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture, SocketAddress redisAddress) {
        logger.debug("Connecting to Redis at {}", (Object)redisAddress);
        Bootstrap redisBootstrap = connectionBuilder.bootstrap();
        ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress);
        redisBootstrap.handler(initializer);
        this.clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
        channelReadyFuture.whenComplete((c, t) -> {
            if (t instanceof CancellationException) {
                connectFuture.cancel(true);
            }
        });
        connectFuture.addListener(future -> {
            if (!future.isSuccess()) {
                logger.debug("Connecting to Redis at {}: {}", (Object)redisAddress, (Object)future.cause());
                connectionBuilder.endpoint().initialState();
                channelReadyFuture.completeExceptionally(future.cause());
                return;
            }
            RedisHandshakeHandler handshakeHandler = (RedisHandshakeHandler)connectFuture.channel().pipeline().get(RedisHandshakeHandler.class);
            if (handshakeHandler == null) {
                channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered"));
                return;
            }
            handshakeHandler.channelInitialized().whenComplete((success, throwable) -> {
                if (throwable == null) {
                    logger.debug("Connecting to Redis at {}: Success", (Object)redisAddress);
                    RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
                    connection.registerCloseables(this.closeableResources, connection);
                    channelReadyFuture.complete(connectFuture.channel());
                    return;
                }
                logger.debug("Connecting to Redis at {}, initialization: {}", (Object)redisAddress, throwable);
                connectionBuilder.endpoint().initialState();
                Throwable failure = throwable instanceof RedisConnectionException ? throwable : (throwable instanceof TimeoutException ? new RedisConnectionException("Could not initialize channel within " + connectionBuilder.getTimeout(), (Throwable)throwable) : throwable);
                channelReadyFuture.completeExceptionally(failure);
            });
        });
    }

    public void shutdown() {
        this.shutdown(0L, 2L, TimeUnit.SECONDS);
    }

    public void shutdown(Duration quietPeriod, Duration timeout) {
        this.shutdown(quietPeriod.toNanos(), timeout.toNanos(), TimeUnit.NANOSECONDS);
    }

    public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {
        try {
            this.shutdownAsync(quietPeriod, timeout, timeUnit).get();
        }
        catch (Exception e) {
            throw Exceptions.bubble(e);
        }
    }

    public CompletableFuture<Void> shutdownAsync() {
        return this.shutdownAsync(0L, 2L, TimeUnit.SECONDS);
    }

    public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {
        if (this.shutdown.compareAndSet(false, true)) {
            logger.debug("Initiate shutdown ({}, {}, {})", new Object[]{quietPeriod, timeout, timeUnit});
            return this.closeResources().thenCompose(value -> this.closeClientResources(quietPeriod, timeout, timeUnit));
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> closeResources() {
        ArrayList<CompletionStage<Void>> closeFutures = new ArrayList<CompletionStage<Void>>();
        ArrayList<Closeable> closeableResources = new ArrayList<Closeable>(this.closeableResources);
        for (Closeable closeableResource : closeableResources) {
            if (closeableResource instanceof AsyncCloseable) {
                closeFutures.add(((AsyncCloseable)((Object)closeableResource)).closeAsync());
            } else {
                try {
                    closeableResource.close();
                }
                catch (Exception e) {
                    logger.debug("Exception on Close: " + e.getMessage(), (Throwable)e);
                }
            }
            this.closeableResources.remove(closeableResource);
        }
        for (Channel c : (Channel[])this.channels.toArray((Object[])new Channel[0])) {
            ChannelPipeline pipeline;
            ConnectionWatchdog commandHandler;
            if (c == null || (commandHandler = (ConnectionWatchdog)(pipeline = c.pipeline()).get(ConnectionWatchdog.class)) == null) continue;
            commandHandler.setListenOnChannelInactive(false);
        }
        try {
            closeFutures.add(Futures.toCompletionStage(this.channels.close()));
        }
        catch (Exception e) {
            logger.debug("Cannot close channels", (Throwable)e);
        }
        return Futures.allOf(closeFutures);
    }

    private CompletableFuture<Void> closeClientResources(long quietPeriod, long timeout, TimeUnit timeUnit) {
        ArrayList<CompletionStage<Boolean>> groupCloseFutures = new ArrayList<CompletionStage<Boolean>>();
        if (!this.sharedResources) {
            Future<Boolean> groupCloseFuture = this.clientResources.shutdown(quietPeriod, timeout, timeUnit);
            groupCloseFutures.add(Futures.toCompletionStage(groupCloseFuture));
        } else {
            for (EventLoopGroup eventExecutors : this.eventLoopGroups.values()) {
                Future<Boolean> groupCloseFuture = this.clientResources.eventLoopGroupProvider().release((EventExecutorGroup)eventExecutors, quietPeriod, timeout, timeUnit);
                groupCloseFutures.add(Futures.toCompletionStage(groupCloseFuture));
            }
        }
        return Futures.allOf(groupCloseFutures);
    }

    protected RedisHandshake createHandshake(ConnectionState state) {
        return new RedisHandshake(this.clientOptions.getConfiguredProtocolVersion(), this.clientOptions.isPingBeforeActivateConnection(), state);
    }
}

