/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.transport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.Version;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
import org.apache.cassandra.service.MigrationListener;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.NativeTransportService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.transport.Connection;
import org.apache.cassandra.transport.ConnectionLimitHandler;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Frame;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ServerConnection;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server
implements CassandraDaemon.Server {
    private static final Logger logger;
    private static final boolean useEpoll;
    public static final int VERSION_3 = 3;
    public static final int VERSION_4 = 4;
    public static final int CURRENT_VERSION = 4;
    public static final int MIN_SUPPORTED_VERSION = 3;
    private final ConnectionTracker connectionTracker = new ConnectionTracker();
    private final Connection.Factory connectionFactory = new Connection.Factory(){

        @Override
        public Connection newConnection(Channel channel, int version) {
            return new ServerConnection(channel, version, Server.this.connectionTracker);
        }
    };
    public final InetSocketAddress socket;
    public boolean useSSL = false;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private EventLoopGroup workerGroup;
    private EventExecutor eventExecutorGroup;

    private Server(Builder builder) {
        this.socket = builder.getSocket();
        this.useSSL = builder.useSSL;
        this.workerGroup = builder.workerGroup != null ? builder.workerGroup : (useEpoll ? new EpollEventLoopGroup() : new NioEventLoopGroup());
        if (builder.eventExecutorGroup != null) {
            this.eventExecutorGroup = builder.eventExecutorGroup;
        }
        EventNotifier notifier = new EventNotifier(this);
        StorageService.instance.register(notifier);
        MigrationManager.instance.register(notifier);
    }

    @Override
    public void stop() {
        if (this.isRunning.compareAndSet(true, false)) {
            this.close();
        }
    }

    @Override
    public boolean isRunning() {
        return this.isRunning.get();
    }

    @Override
    public synchronized void start() {
        if (this.isRunning()) {
            return;
        }
        ServerBootstrap bootstrap = ((ServerBootstrap)new ServerBootstrap().channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_LINGER, 0).childOption(ChannelOption.SO_KEEPALIVE, DatabaseDescriptor.getRpcKeepAlive()).childOption(ChannelOption.ALLOCATOR, CBUtil.allocator).childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32768).childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8192);
        if (this.workerGroup != null) {
            bootstrap = bootstrap.group(this.workerGroup);
        }
        EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
        if (this.useSSL) {
            if (clientEnc.optional) {
                logger.info("Enabling optionally encrypted CQL connections between client and server");
                bootstrap.childHandler(new OptionalSecureInitializer(this, clientEnc));
            } else {
                logger.info("Enabling encrypted CQL connections between client and server");
                bootstrap.childHandler(new SecureInitializer(this, clientEnc));
            }
        } else {
            bootstrap.childHandler(new Initializer(this));
        }
        logger.info("Using Netty Version: {}", (Object)Version.identify().entrySet());
        logger.info("Starting listening for CQL clients on {} ({})...", (Object)this.socket, (Object)(this.useSSL ? "encrypted" : "unencrypted"));
        ChannelFuture bindFuture = bootstrap.bind(this.socket);
        if (!bindFuture.awaitUninterruptibly().isSuccess()) {
            throw new IllegalStateException(String.format("Failed to bind port %d on %s.", this.socket.getPort(), this.socket.getAddress().getHostAddress()));
        }
        this.connectionTracker.allChannels.add(bindFuture.channel());
        this.isRunning.set(true);
    }

    public int getConnectedClients() {
        return this.connectionTracker.getConnectedClients();
    }

    private void close() {
        this.connectionTracker.closeAll();
        logger.info("Stop listening for CQL clients");
    }

    static {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        logger = LoggerFactory.getLogger(Server.class);
        useEpoll = NativeTransportService.useEpoll();
    }

    private static class EventNotifier
    extends MigrationListener
    implements IEndpointLifecycleSubscriber {
        private final Server server;
        private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<InetAddress, LatestEvent>();
        private static final InetAddress bindAll;

        private EventNotifier(Server server) {
            this.server = server;
        }

        private InetAddress getRpcAddress(InetAddress endpoint) {
            try {
                InetAddress rpcAddress = InetAddress.getByName(StorageService.instance.getRpcaddress(endpoint));
                return rpcAddress.equals(bindAll) ? endpoint : rpcAddress;
            }
            catch (UnknownHostException e) {
                logger.error("Problem retrieving RPC address for {}", (Object)endpoint, (Object)e);
                return endpoint;
            }
        }

        private void send(InetAddress endpoint, Event.NodeEvent event) {
            if (logger.isTraceEnabled()) {
                logger.trace("Sending event for endpoint {}, rpc address {}", (Object)endpoint, (Object)event.nodeAddress());
            }
            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && event.nodeAddress().equals(DatabaseDescriptor.getBroadcastRpcAddress())) {
                return;
            }
            this.send(event);
        }

        private void send(Event event) {
            this.server.connectionTracker.send(event);
        }

        @Override
        public void onJoinCluster(InetAddress endpoint) {
            this.onTopologyChange(endpoint, Event.TopologyChange.newNode(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onLeaveCluster(InetAddress endpoint) {
            this.onTopologyChange(endpoint, Event.TopologyChange.removedNode(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onMove(InetAddress endpoint) {
            this.onTopologyChange(endpoint, Event.TopologyChange.movedNode(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onUp(InetAddress endpoint) {
            this.onStatusChange(endpoint, Event.StatusChange.nodeUp(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onDown(InetAddress endpoint) {
            this.onStatusChange(endpoint, Event.StatusChange.nodeDown(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        private void onTopologyChange(InetAddress endpoint, Event.TopologyChange event) {
            LatestEvent ret;
            LatestEvent prev;
            if (logger.isTraceEnabled()) {
                logger.trace("Topology changed event : {}, {}", (Object)endpoint, (Object)event.change);
            }
            if (((prev = this.latestEvents.get(endpoint)) == null || prev.topology != event.change) && (ret = this.latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev))) == prev) {
                this.send(endpoint, event);
            }
        }

        private void onStatusChange(InetAddress endpoint, Event.StatusChange event) {
            LatestEvent ret;
            LatestEvent prev;
            if (logger.isTraceEnabled()) {
                logger.trace("Status changed event : {}, {}", (Object)endpoint, (Object)event.status);
            }
            if (((prev = this.latestEvents.get(endpoint)) == null || prev.status != event.status) && (ret = this.latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, null))) == prev) {
                this.send(endpoint, event);
            }
        }

        @Override
        public void onCreateKeyspace(String ksName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
        }

        @Override
        public void onCreateColumnFamily(String ksName, String cfName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onCreateUserType(String ksName, String typeName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onUpdateKeyspace(String ksName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
        }

        @Override
        public void onUpdateColumnFamily(String ksName, String cfName, boolean affectsStatements) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onUpdateUserType(String ksName, String typeName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION, ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onDropKeyspace(String ksName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
        }

        @Override
        public void onDropColumnFamily(String ksName, String cfName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onDropUserType(String ksName, String typeName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        static {
            try {
                bindAll = InetAddress.getByAddress(new byte[4]);
            }
            catch (UnknownHostException e) {
                throw new AssertionError((Object)e);
            }
        }
    }

    private static class LatestEvent {
        public final Event.StatusChange.Status status;
        public final Event.TopologyChange.Change topology;

        private LatestEvent(Event.StatusChange.Status status, Event.TopologyChange.Change topology) {
            this.status = status;
            this.topology = topology;
        }

        public String toString() {
            return String.format("Status %s, Topology %s", new Object[]{this.status, this.topology});
        }

        public static LatestEvent forStatusChange(Event.StatusChange.Status status, LatestEvent prev) {
            return new LatestEvent(status, prev == null ? null : prev.topology);
        }

        public static LatestEvent forTopologyChange(Event.TopologyChange.Change change, LatestEvent prev) {
            return new LatestEvent(prev == null ? null : prev.status, change);
        }
    }

    private static class SecureInitializer
    extends AbstractSecureIntializer {
        public SecureInitializer(Server server, EncryptionOptions encryptionOptions) {
            super(server, encryptionOptions);
        }

        @Override
        protected void initChannel(Channel channel) throws Exception {
            SslHandler sslHandler = this.createSslHandler();
            super.initChannel(channel);
            channel.pipeline().addFirst("ssl", (ChannelHandler)sslHandler);
        }
    }

    private static class OptionalSecureInitializer
    extends AbstractSecureIntializer {
        public OptionalSecureInitializer(Server server, EncryptionOptions encryptionOptions) {
            super(server, encryptionOptions);
        }

        @Override
        protected void initChannel(Channel channel) throws Exception {
            super.initChannel(channel);
            channel.pipeline().addFirst("sslDetectionHandler", (ChannelHandler)new ByteToMessageDecoder(){

                @Override
                protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
                    if (byteBuf.readableBytes() < 5) {
                        return;
                    }
                    if (SslHandler.isEncrypted(byteBuf)) {
                        SslHandler sslHandler = this.createSslHandler();
                        channelHandlerContext.pipeline().replace(this, "ssl", (ChannelHandler)sslHandler);
                    } else {
                        channelHandlerContext.pipeline().remove(this);
                    }
                }
            });
        }
    }

    protected static abstract class AbstractSecureIntializer
    extends Initializer {
        private final SSLContext sslContext;
        private final EncryptionOptions encryptionOptions;

        protected AbstractSecureIntializer(Server server, EncryptionOptions encryptionOptions) {
            super(server);
            this.encryptionOptions = encryptionOptions;
            try {
                this.sslContext = SSLFactory.createSSLContext(encryptionOptions, encryptionOptions.require_client_auth);
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to setup secure pipeline", e);
            }
        }

        protected final SslHandler createSslHandler() {
            SSLEngine sslEngine = this.sslContext.createSSLEngine();
            sslEngine.setUseClientMode(false);
            String[] suites = SSLFactory.filterCipherSuites(sslEngine.getSupportedCipherSuites(), this.encryptionOptions.cipher_suites);
            sslEngine.setEnabledCipherSuites(suites);
            sslEngine.setNeedClientAuth(this.encryptionOptions.require_client_auth);
            sslEngine.setEnabledProtocols(SSLFactory.ACCEPTED_PROTOCOLS);
            return new SslHandler(sslEngine);
        }
    }

    private static class Initializer
    extends ChannelInitializer<Channel> {
        private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
        private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
        private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
        private static final Frame.Compressor frameCompressor = new Frame.Compressor();
        private static final Frame.Encoder frameEncoder = new Frame.Encoder();
        private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
        private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
        private final Server server;

        public Initializer(Server server) {
            this.server = server;
        }

        @Override
        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0L || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0L) {
                pipeline.addFirst("connectionLimitHandler", (ChannelHandler)connectionLimitHandler);
            }
            pipeline.addLast("frameDecoder", (ChannelHandler)new Frame.Decoder(this.server.connectionFactory));
            pipeline.addLast("frameEncoder", (ChannelHandler)frameEncoder);
            pipeline.addLast("frameDecompressor", (ChannelHandler)frameDecompressor);
            pipeline.addLast("frameCompressor", (ChannelHandler)frameCompressor);
            pipeline.addLast("messageDecoder", (ChannelHandler)messageDecoder);
            pipeline.addLast("messageEncoder", (ChannelHandler)messageEncoder);
            if (this.server.eventExecutorGroup != null) {
                pipeline.addLast((EventExecutorGroup)this.server.eventExecutorGroup, "executor", (ChannelHandler)dispatcher);
            } else {
                pipeline.addLast("executor", (ChannelHandler)dispatcher);
            }
        }
    }

    public static class ConnectionTracker
    implements Connection.Tracker {
        public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap(Event.Type.class);

        public ConnectionTracker() {
            for (Event.Type type : Event.Type.values()) {
                this.groups.put(type, new DefaultChannelGroup(type.toString(), GlobalEventExecutor.INSTANCE));
            }
        }

        @Override
        public void addConnection(Channel ch, Connection connection) {
            this.allChannels.add(ch);
        }

        public void register(Event.Type type, Channel ch) {
            this.groups.get((Object)type).add(ch);
        }

        public void send(Event event) {
            this.groups.get((Object)event.type).writeAndFlush(new EventMessage(event));
        }

        public void closeAll() {
            this.allChannels.close().awaitUninterruptibly();
        }

        public int getConnectedClients() {
            return this.allChannels.size() != 0 ? this.allChannels.size() - 1 : 0;
        }
    }

    public static class Builder {
        private EventLoopGroup workerGroup;
        private EventExecutor eventExecutorGroup;
        private boolean useSSL = false;
        private InetAddress hostAddr;
        private int port = -1;
        private InetSocketAddress socket;

        public Builder withSSL(boolean useSSL) {
            this.useSSL = useSSL;
            return this;
        }

        public Builder withEventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.workerGroup = eventLoopGroup;
            return this;
        }

        public Builder withEventExecutor(EventExecutor eventExecutor) {
            this.eventExecutorGroup = eventExecutor;
            return this;
        }

        public Builder withHost(InetAddress host) {
            this.hostAddr = host;
            this.socket = null;
            return this;
        }

        public Builder withPort(int port) {
            this.port = port;
            this.socket = null;
            return this;
        }

        public Server build() {
            return new Server(this);
        }

        private InetSocketAddress getSocket() {
            if (this.socket != null) {
                return this.socket;
            }
            if (this.port == -1) {
                throw new IllegalStateException("Missing port number");
            }
            if (this.hostAddr == null) {
                throw new IllegalStateException("Missing host");
            }
            this.socket = new InetSocketAddress(this.hostAddr, this.port);
            return this.socket;
        }
    }
}

