/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.messaging.impl;

import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.ChannelPool;
import io.atomix.cluster.messaging.impl.ClientConnection;
import io.atomix.cluster.messaging.impl.Connection;
import io.atomix.cluster.messaging.impl.HandlerRegistry;
import io.atomix.cluster.messaging.impl.LocalClientConnection;
import io.atomix.cluster.messaging.impl.MessagingMetrics;
import io.atomix.cluster.messaging.impl.MessagingMetricsImpl;
import io.atomix.cluster.messaging.impl.MessagingProtocol;
import io.atomix.cluster.messaging.impl.NettyDnsMetrics;
import io.atomix.cluster.messaging.impl.ProtocolMessage;
import io.atomix.cluster.messaging.impl.ProtocolReply;
import io.atomix.cluster.messaging.impl.ProtocolRequest;
import io.atomix.cluster.messaging.impl.ProtocolVersion;
import io.atomix.cluster.messaging.impl.RemoteClientConnection;
import io.atomix.cluster.messaging.impl.RemoteServerConnection;
import io.atomix.utils.concurrent.OrderedFuture;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.util.StringUtil;
import io.camunda.zeebe.util.TlsConfigUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.compression.SnappyFrameDecoder;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.dns.BiDnsQueryLifecycleObserverFactory;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.resolver.dns.DnsQueryLifecycleObserverFactory;
import io.netty.resolver.dns.LoggingDnsQueryLifeCycleObserverFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.agrona.CloseHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class NettyMessagingService
implements ManagedMessagingService {
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5L);
    private static final String TLS_PROTOCOL = "TLSv1.3";
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Address advertisedAddress;
    private final Collection<Address> bindingAddresses = new ArrayList<Address>();
    private final int preamble;
    private final ProtocolVersion protocolVersion;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final HandlerRegistry handlers = new HandlerRegistry();
    private final Map<Channel, RemoteClientConnection> connections = Maps.newConcurrentMap();
    private final AtomicLong messageIdGenerator = new AtomicLong(0L);
    private final ChannelPool channelPool;
    private final Set<CompletableFuture<?>> openFutures = Sets.newConcurrentHashSet();
    private final MessagingConfig config;
    private EventLoopGroup serverGroup;
    private EventLoopGroup clientGroup;
    private Class<? extends ServerChannel> serverChannelClass;
    private Class<? extends SocketChannel> clientChannelClass;
    private Class<? extends DatagramChannel> clientDataGramChannelClass;
    private Channel serverChannel;
    private ScheduledExecutorService timeoutExecutor;
    private volatile LocalClientConnection localConnection;
    private SslContext serverSslContext;
    private SslContext clientSslContext;
    private DnsAddressResolverGroup dnsResolverGroup;
    private final MessagingMetrics messagingMetrics = new MessagingMetricsImpl();

    public NettyMessagingService(String cluster, Address advertisedAddress, MessagingConfig config) {
        this(cluster, advertisedAddress, config, ProtocolVersion.latest());
    }

    NettyMessagingService(String cluster, Address advertisedAddress, MessagingConfig config, ProtocolVersion protocolVersion) {
        this.preamble = cluster.hashCode();
        this.advertisedAddress = advertisedAddress;
        this.protocolVersion = protocolVersion;
        this.config = config;
        this.channelPool = new ChannelPool(this::openChannel, config.getConnectionPoolSize());
        this.initAddresses(config);
    }

    NettyMessagingService(String cluster, Address advertisedAddress, MessagingConfig config, ProtocolVersion protocolVersion, Function<Function<Address, CompletableFuture<Channel>>, ChannelPool> channelPoolFactor) {
        this.preamble = cluster.hashCode();
        this.advertisedAddress = advertisedAddress;
        this.protocolVersion = protocolVersion;
        this.config = config;
        this.channelPool = channelPoolFactor.apply(this::openChannel);
        this.initAddresses(config);
    }

    private void initAddresses(MessagingConfig config) {
        int port;
        int n = port = config.getPort() != null ? config.getPort().intValue() : this.advertisedAddress.port();
        if (config.getInterfaces().isEmpty()) {
            this.bindingAddresses.add(Address.from((String)this.advertisedAddress.host(), (int)port));
        } else {
            List addresses = config.getInterfaces().stream().map(iface -> Address.from((String)iface, (int)port)).collect(Collectors.toList());
            this.bindingAddresses.addAll(addresses);
        }
    }

    @Override
    public Address address() {
        return this.advertisedAddress;
    }

    @Override
    public Collection<Address> bindingAddresses() {
        return this.bindingAddresses;
    }

    @Override
    public CompletableFuture<Void> sendAsync(Address address, String type, byte[] payload, boolean keepAlive) {
        long messageId = this.messageIdGenerator.incrementAndGet();
        ProtocolRequest message = new ProtocolRequest(messageId, this.advertisedAddress, type, payload);
        return this.executeOnPooledConnection(address, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive) {
        return this.sendAndReceive(address, type, payload, keepAlive, DEFAULT_TIMEOUT, MoreExecutors.directExecutor());
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive, Executor executor) {
        return this.sendAndReceive(address, type, payload, keepAlive, DEFAULT_TIMEOUT, executor);
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive, Duration timeout) {
        return this.sendAndReceive(address, type, payload, keepAlive, timeout, MoreExecutors.directExecutor());
    }

    @Override
    public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, boolean keepAlive, Duration timeout, Executor executor) {
        if (!this.started.get()) {
            return CompletableFuture.failedFuture(new IllegalStateException("MessagingService is closed."));
        }
        long messageId = this.messageIdGenerator.incrementAndGet();
        ProtocolRequest message = new ProtocolRequest(messageId, this.advertisedAddress, type, payload);
        CompletableFuture<Object> responseFuture = keepAlive ? this.executeOnPooledConnection(address, type, c -> c.sendAndReceive(message), executor) : this.executeOnTransientConnection(address, c -> c.sendAndReceive(message), executor);
        ScheduledFuture<?> timeoutFuture = this.timeoutExecutor.schedule(() -> {
            responseFuture.completeExceptionally(new TimeoutException(String.format("Request %s to %s timed out in %s", type, address, timeout)));
            this.openFutures.remove(responseFuture);
        }, timeout.toNanos(), TimeUnit.NANOSECONDS);
        responseFuture.whenComplete((ignored, error) -> timeoutFuture.cancel(true));
        return responseFuture;
    }

    @Override
    public void registerHandler(String type, BiConsumer<Address, byte[]> handler, Executor executor) {
        this.handlers.register(type, (message, connection) -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
    }

    @Override
    public void registerHandler(String type, BiFunction<Address, byte[], byte[]> handler, Executor executor) {
        this.handlers.register(type, (message, connection) -> executor.execute(() -> {
            ProtocolReply.Status status;
            byte[] responsePayload;
            block2: {
                responsePayload = null;
                status = ProtocolReply.Status.OK;
                try {
                    responsePayload = (byte[])handler.apply(message.sender(), message.payload());
                }
                catch (Exception e) {
                    this.log.warn("Unexpected error while handling message {} from {}", new Object[]{message.subject(), message.sender(), e});
                    status = ProtocolReply.Status.ERROR_HANDLER_EXCEPTION;
                    String exceptionMessage = e.getMessage();
                    if (exceptionMessage == null) break block2;
                    responsePayload = StringUtil.getBytes((String)exceptionMessage);
                }
            }
            connection.reply(message.id(), status, Optional.ofNullable(responsePayload));
        }));
    }

    @Override
    public void registerHandler(String type, BiFunction<Address, byte[], CompletableFuture<byte[]>> handler) {
        this.handlers.register(type, (message, connection) -> {
            long id = message.id();
            String subject = message.subject();
            Address sender = message.sender();
            byte[] payload = message.payload();
            ((CompletableFuture)handler.apply(sender, payload)).whenComplete((result, error) -> {
                ProtocolReply.Status status;
                byte[] responsePayload = null;
                if (error == null) {
                    status = ProtocolReply.Status.OK;
                    responsePayload = result;
                } else {
                    this.log.warn("Unexpected error while handling message {} from {}", new Object[]{subject, sender, error});
                    status = ProtocolReply.Status.ERROR_HANDLER_EXCEPTION;
                    String exceptionMessage = error.getMessage();
                    if (exceptionMessage != null) {
                        responsePayload = StringUtil.getBytes((String)error.getMessage());
                    }
                }
                connection.reply(id, status, Optional.ofNullable(responsePayload));
            });
        });
    }

    @Override
    public void unregisterHandler(String type) {
        this.handlers.unregister(type);
    }

    @Override
    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<MessagingService> start() {
        if (this.started.get()) {
            this.log.warn("Already running at local address: {}", (Object)this.advertisedAddress);
            return CompletableFuture.completedFuture(this);
        }
        CompletionStage<Object> serviceLoader = this.config.isTlsEnabled() ? this.loadServerSslContext().thenCompose(ok -> this.loadClientSslContext()) : CompletableFuture.completedFuture(null);
        this.initTransport();
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)serviceLoader).thenCompose(ok -> this.bootstrapServer())).thenRun(() -> {
            NettyDnsMetrics metrics = new NettyDnsMetrics();
            this.dnsResolverGroup = new DnsAddressResolverGroup(new DnsNameResolverBuilder(this.clientGroup.next()).dnsQueryLifecycleObserverFactory((DnsQueryLifecycleObserverFactory)new BiDnsQueryLifecycleObserverFactory(ignored -> metrics, (DnsQueryLifecycleObserverFactory)new LoggingDnsQueryLifeCycleObserverFactory())).socketChannelType(this.clientChannelClass).channelType(this.clientDataGramChannelClass));
            this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("netty-messaging-timeout-"));
            this.localConnection = new LocalClientConnection(this.handlers);
            this.started.set(true);
            this.log.info("Started messaging service bound to {}, advertising {}, and using {}", new Object[]{this.bindingAddresses, this.advertisedAddress, this.config.isTlsEnabled() ? "TLS" : "plaintext"});
        })).thenApply(v -> this);
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            return CompletableFuture.supplyAsync(() -> {
                boolean interrupted = false;
                try {
                    if (this.dnsResolverGroup != null) {
                        CloseHelper.close(error -> this.log.warn("Failed to close DNS resolvers", error), (AutoCloseable)this.dnsResolverGroup);
                    }
                    try {
                        this.serverChannel.close().sync();
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    Future serverShutdownFuture = this.serverGroup.shutdownGracefully(this.config.getShutdownQuietPeriod().toMillis(), this.config.getShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS);
                    Future clientShutdownFuture = this.clientGroup.shutdownGracefully(this.config.getShutdownQuietPeriod().toMillis(), this.config.getShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS);
                    try {
                        serverShutdownFuture.sync();
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    try {
                        clientShutdownFuture.sync();
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    this.timeoutExecutor.shutdown();
                    for (Map.Entry<Channel, RemoteClientConnection> entry : this.connections.entrySet()) {
                        Channel channel = entry.getKey();
                        channel.close();
                        RemoteClientConnection connection = entry.getValue();
                        connection.close();
                    }
                    for (CompletableFuture completableFuture : this.openFutures) {
                        completableFuture.completeExceptionally(new IllegalStateException("MessagingService has been closed."));
                    }
                    this.openFutures.clear();
                }
                catch (Throwable throwable) {
                    this.log.info("Stopped messaging service bound to {}, advertising {}, and using {}", new Object[]{this.bindingAddresses, this.advertisedAddress, this.config.isTlsEnabled() ? "TLS" : "plaintext"});
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                    throw throwable;
                }
                this.log.info("Stopped messaging service bound to {}, advertising {}, and using {}", new Object[]{this.bindingAddresses, this.advertisedAddress, this.config.isTlsEnabled() ? "TLS" : "plaintext"});
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                return null;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> loadClientSslContext() {
        try {
            SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
            if (this.config.getKeyStore() != null) {
                sslContextBuilder.trustManager(TlsConfigUtil.getCertificateChain((File)this.config.getKeyStore(), (String)this.config.getKeyStorePassword()));
            } else {
                sslContextBuilder.trustManager(this.config.getCertificateChain());
            }
            this.clientSslContext = sslContextBuilder.sslProvider(SslProvider.OPENSSL_REFCNT).protocols(new String[]{TLS_PROTOCOL}).build();
            return CompletableFuture.completedFuture(null);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(new MessagingException("Failed to start messaging service; invalid client TLS configuration", e));
        }
    }

    private CompletableFuture<Void> loadServerSslContext() {
        try {
            SslContextBuilder sslContextBuilder;
            if (this.config.getKeyStore() != null) {
                PrivateKey privateKey = TlsConfigUtil.getPrivateKey((File)this.config.getKeyStore(), (String)this.config.getKeyStorePassword());
                X509Certificate[] certChain = TlsConfigUtil.getCertificateChain((File)this.config.getKeyStore(), (String)this.config.getKeyStorePassword());
                sslContextBuilder = SslContextBuilder.forServer((PrivateKey)privateKey, (X509Certificate[])certChain);
            } else {
                sslContextBuilder = SslContextBuilder.forServer((File)this.config.getCertificateChain(), (File)this.config.getPrivateKey());
            }
            this.serverSslContext = sslContextBuilder.sslProvider(SslProvider.OPENSSL_REFCNT).protocols(new String[]{TLS_PROTOCOL}).build();
            return CompletableFuture.completedFuture(null);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(new MessagingException("Failed to start messaging service; invalid server TLS configuration", e));
        }
    }

    private void initTransport() {
        if (Epoll.isAvailable()) {
            this.initEpollTransport();
        } else {
            this.initNioTransport();
        }
    }

    private X509Certificate[] getCertificateChain(File keyStoreFile, String password) throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException {
        KeyStore keyStore = this.getKeyStore(keyStoreFile, password);
        String alias = keyStore.aliases().nextElement();
        return (X509Certificate[])Arrays.stream(keyStore.getCertificateChain(alias)).map(X509Certificate.class::cast).toArray(X509Certificate[]::new);
    }

    private PrivateKey getPrivateKey(File keyStoreFile, String password) throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException, UnrecoverableKeyException {
        KeyStore keyStore = this.getKeyStore(keyStoreFile, password);
        String alias = keyStore.aliases().nextElement();
        return (PrivateKey)keyStore.getKey(alias, password.toCharArray());
    }

    private KeyStore getKeyStore(File keyStoreFile, String password) throws KeyStoreException {
        KeyStore keyStore = KeyStore.getInstance("PKCS12");
        try {
            keyStore.load(new FileInputStream(keyStoreFile), password.toCharArray());
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Keystore failed to load file: %s, please ensure it is a valid PKCS12 keystore", keyStoreFile.toPath()), e);
        }
        return keyStore;
    }

    private void initEpollTransport() {
        this.clientGroup = new EpollEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-epoll-client-%d", (Logger)this.log));
        this.serverGroup = new EpollEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-epoll-server-%d", (Logger)this.log));
        this.serverChannelClass = EpollServerSocketChannel.class;
        this.clientChannelClass = EpollSocketChannel.class;
        this.clientDataGramChannelClass = EpollDatagramChannel.class;
    }

    private void initNioTransport() {
        this.clientGroup = new NioEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-nio-client-%d", (Logger)this.log));
        this.serverGroup = new NioEventLoopGroup(0, Threads.namedThreads((String)"netty-messaging-event-nio-server-%d", (Logger)this.log));
        this.serverChannelClass = NioServerSocketChannel.class;
        this.clientChannelClass = NioSocketChannel.class;
        this.clientDataGramChannelClass = NioDatagramChannel.class;
    }

    private <T> CompletableFuture<T> executeOnPooledConnection(Address address, String type, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor) {
        CompletableFuture future = new CompletableFuture();
        this.executeOnPooledConnection(address, type, callback, executor, future);
        return future;
    }

    private <T> void executeOnPooledConnection(Address address, String type, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor, CompletableFuture<T> responseFuture) {
        if (address.equals((Object)this.advertisedAddress)) {
            callback.apply(this.localConnection).whenComplete((result, error) -> {
                if (error == null) {
                    executor.execute(() -> responseFuture.complete(result));
                } else {
                    executor.execute(() -> responseFuture.completeExceptionally((Throwable)error));
                }
            });
            return;
        }
        this.openFutures.add(responseFuture);
        this.channelPool.getChannel(address, type).whenComplete((channel, channelError) -> {
            if (channelError == null) {
                responseFuture.whenComplete((response, error) -> {
                    if (error instanceof TimeoutException) {
                        channel.close();
                    }
                });
                RemoteClientConnection connection = this.getOrCreateClientConnection((Channel)channel);
                ((CompletableFuture)callback.apply(connection)).whenComplete((result, sendError) -> {
                    if (sendError == null) {
                        executor.execute(() -> {
                            responseFuture.complete(result);
                            this.openFutures.remove(responseFuture);
                        });
                    } else {
                        Throwable cause = Throwables.getRootCause((Throwable)sendError);
                        if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
                            channel.close().addListener(f -> {
                                this.log.debug("Closing connection to {}", (Object)channel.remoteAddress());
                                connection.close();
                                this.connections.remove(channel);
                            });
                        }
                        executor.execute(() -> {
                            responseFuture.completeExceptionally((Throwable)sendError);
                            this.openFutures.remove(responseFuture);
                        });
                    }
                });
            } else {
                executor.execute(() -> {
                    responseFuture.completeExceptionally((Throwable)channelError);
                    this.openFutures.remove(responseFuture);
                });
            }
        });
    }

    private <T> CompletableFuture<T> executeOnTransientConnection(Address address, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor) {
        CompletableFuture future = new CompletableFuture();
        if (address.equals((Object)this.advertisedAddress)) {
            callback.apply(this.localConnection).whenComplete((result, error) -> {
                if (error == null) {
                    executor.execute(() -> future.complete(result));
                } else {
                    executor.execute(() -> future.completeExceptionally((Throwable)error));
                }
            });
            return future;
        }
        this.openChannel(address).whenComplete((channel, channelError) -> {
            if (channelError == null) {
                ((CompletableFuture)callback.apply(this.getOrCreateClientConnection((Channel)channel))).whenComplete((result, sendError) -> {
                    if (sendError == null) {
                        executor.execute(() -> future.complete(result));
                    } else {
                        executor.execute(() -> future.completeExceptionally((Throwable)sendError));
                    }
                    channel.close();
                });
            } else {
                executor.execute(() -> future.completeExceptionally((Throwable)channelError));
            }
        });
        return future;
    }

    private RemoteClientConnection getOrCreateClientConnection(Channel channel) {
        RemoteClientConnection connection = this.connections.get(channel);
        if (connection == null) {
            connection = this.connections.computeIfAbsent(channel, c -> new RemoteClientConnection(this.messagingMetrics, (Channel)c));
            channel.closeFuture().addListener(f -> {
                RemoteClientConnection removedConnection = this.connections.remove(channel);
                if (removedConnection != null) {
                    removedConnection.close();
                }
            });
        }
        return connection;
    }

    private CompletableFuture<Channel> openChannel(Address address) {
        return this.bootstrapClient(address);
    }

    private CompletableFuture<Channel> bootstrapClient(Address address) {
        OrderedFuture future = new OrderedFuture();
        InetSocketAddress socketAddress = address.socketAddress();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(327680, 655360));
        bootstrap.option(ChannelOption.SO_RCVBUF, (Object)0x100000);
        bootstrap.option(ChannelOption.SO_SNDBUF, (Object)0x100000);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)1000);
        bootstrap.group(this.clientGroup);
        bootstrap.channel(this.clientChannelClass);
        bootstrap.resolver((AddressResolverGroup)this.dnsResolverGroup);
        bootstrap.remoteAddress((SocketAddress)socketAddress);
        bootstrap.handler((ChannelHandler)new BasicClientChannelInitializer((CompletableFuture<Channel>)future));
        Channel channel = bootstrap.connect().addListener(arg_0 -> NettyMessagingService.lambda$bootstrapClient$40((CompletableFuture)future, address, arg_0)).channel();
        channel.closeFuture().addListener(arg_0 -> NettyMessagingService.lambda$bootstrapClient$41((CompletableFuture)future, channel, address, arg_0));
        return future;
    }

    private CompletableFuture<Void> bootstrapServer() {
        ServerBootstrap b = new ServerBootstrap();
        b.option(ChannelOption.SO_REUSEADDR, (Object)true);
        b.option(ChannelOption.SO_BACKLOG, (Object)128);
        b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(8192, 32768));
        b.childOption(ChannelOption.SO_RCVBUF, (Object)0x100000);
        b.childOption(ChannelOption.SO_SNDBUF, (Object)0x100000);
        b.childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        b.childOption(ChannelOption.TCP_NODELAY, (Object)true);
        b.childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        b.group(this.serverGroup, this.clientGroup);
        b.channel(this.serverChannelClass);
        b.childHandler((ChannelHandler)new BasicServerChannelInitializer());
        return this.bind(b);
    }

    private CompletableFuture<Void> bind(ServerBootstrap bootstrap) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.bind(bootstrap, this.bindingAddresses.iterator(), future);
        return future;
    }

    private void bind(ServerBootstrap bootstrap, Iterator<Address> addressIterator, CompletableFuture<Void> future) {
        if (addressIterator.hasNext()) {
            Address address = addressIterator.next();
            bootstrap.bind(address.host(), address.port()).addListener((GenericFutureListener)((ChannelFutureListener)f -> {
                if (f.isSuccess()) {
                    this.log.info("TCP server listening for connections on {}", (Object)address);
                    this.serverChannel = f.channel();
                    this.bind(bootstrap, addressIterator, future);
                } else {
                    this.log.warn("Failed to bind TCP server to port {} due to {}", (Object)address, (Object)f.cause());
                    future.completeExceptionally(f.cause());
                }
            }));
        } else {
            future.complete(null);
        }
    }

    private static /* synthetic */ void lambda$bootstrapClient$41(CompletableFuture future, Channel channel, Address address, Future onClose) throws Exception {
        if (!future.isDone()) {
            future.completeExceptionally(new MessagingException.ConnectionClosed(String.format("Channel %s for address %s was closed unexpectedly before the request was handled", channel, address)));
        }
    }

    private static /* synthetic */ void lambda$bootstrapClient$40(CompletableFuture future, Address address, Future onConnect) throws Exception {
        if (!onConnect.isSuccess()) {
            future.completeExceptionally(new ConnectException(String.format("Failed to connect channel for address %s (resolved: %s) : %s", address, address.getAddress(), onConnect.cause())));
        }
    }

    private class BasicClientChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final CompletableFuture<Channel> future;

        BasicClientChannelInitializer(CompletableFuture<Channel> future) {
            this.future = future;
        }

        protected void initChannel(SocketChannel channel) {
            if (NettyMessagingService.this.config.isTlsEnabled()) {
                SslHandler sslHandler = NettyMessagingService.this.clientSslContext.newHandler(channel.alloc());
                channel.pipeline().addLast("tls", (ChannelHandler)sslHandler);
            }
            channel.pipeline().addLast("handshake", (ChannelHandler)new ClientHandshakeHandlerAdapter(this.future));
            switch (NettyMessagingService.this.config.getCompressionAlgorithm()) {
                case GZIP: {
                    channel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibEncoder((ZlibWrapper)ZlibWrapper.GZIP)});
                    channel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibDecoder((ZlibWrapper)ZlibWrapper.GZIP)});
                    break;
                }
                case SNAPPY: {
                    channel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameEncoder()});
                    channel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameDecoder()});
                    break;
                }
                case NONE: {
                    break;
                }
                default: {
                    NettyMessagingService.this.log.debug("Unknown compression algorithm. Proceeding without compression.");
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            this.future.completeExceptionally(cause);
            super.exceptionCaught(ctx, cause);
        }
    }

    private final class BasicServerChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private BasicServerChannelInitializer() {
        }

        protected void initChannel(SocketChannel channel) {
            if (NettyMessagingService.this.config.isTlsEnabled()) {
                SslHandler sslHandler = NettyMessagingService.this.serverSslContext.newHandler(channel.alloc());
                channel.pipeline().addLast("tls", (ChannelHandler)sslHandler);
            }
            channel.pipeline().addLast("handshake", (ChannelHandler)new ServerHandshakeHandlerAdapter());
            switch (NettyMessagingService.this.config.getCompressionAlgorithm()) {
                case GZIP: {
                    channel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibEncoder((ZlibWrapper)ZlibWrapper.GZIP)});
                    channel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibDecoder((ZlibWrapper)ZlibWrapper.GZIP)});
                    break;
                }
                case SNAPPY: {
                    channel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameEncoder()});
                    channel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameDecoder()});
                    break;
                }
                case NONE: {
                    break;
                }
                default: {
                    NettyMessagingService.this.log.debug("Unknown compression algorithm. Proceeding without compression.");
                }
            }
        }
    }

    private class MessageDispatcher<M extends ProtocolMessage>
    extends SimpleChannelInboundHandler<Object> {
        private final Connection<M> connection;

        MessageDispatcher(Connection<M> connection) {
            this.connection = connection;
        }

        public void channelInactive(ChannelHandlerContext context) throws Exception {
            this.connection.close();
            context.close();
        }

        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
            NettyMessagingService.this.log.error("Exception inside channel handling pipeline", cause);
            this.connection.close();
            context.close();
        }

        public boolean acceptInboundMessage(Object msg) {
            return msg instanceof ProtocolMessage;
        }

        protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
            try {
                this.connection.dispatch((ProtocolMessage)message);
            }
            catch (RejectedExecutionException e) {
                NettyMessagingService.this.log.warn("Unable to dispatch message due to {}", (Object)e.getMessage());
            }
        }
    }

    private final class ServerHandshakeHandlerAdapter
    extends HandshakeHandlerAdapter<ProtocolRequest> {
        private ServerHandshakeHandlerAdapter() {
        }

        public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
            this.readProtocolVersion(context, (ByteBuf)message).ifPresent(version -> {
                ProtocolVersion protocolVersion = ProtocolVersion.valueOf(version);
                if (protocolVersion == null) {
                    protocolVersion = ProtocolVersion.latest();
                }
                this.writeProtocolVersion(context, protocolVersion);
                this.activateProtocolVersion(context, new RemoteServerConnection(NettyMessagingService.this.handlers, context.channel()), protocolVersion);
            });
        }

        @Override
        void activateProtocolVersion(ChannelHandlerContext context, Connection<ProtocolRequest> connection, ProtocolVersion protocolVersion) {
            NettyMessagingService.this.log.debug("Activating server protocol version {} for connection to {}", (Object)protocolVersion, (Object)context.channel().remoteAddress());
            super.activateProtocolVersion(context, connection, protocolVersion);
        }
    }

    private class ClientHandshakeHandlerAdapter
    extends HandshakeHandlerAdapter<ProtocolReply> {
        private final CompletableFuture<Channel> future;

        ClientHandshakeHandlerAdapter(CompletableFuture<Channel> future) {
            this.future = future;
        }

        public void channelActive(ChannelHandlerContext context) throws Exception {
            NettyMessagingService.this.log.debug("Writing client protocol version {} for connection to {}", (Object)NettyMessagingService.this.protocolVersion, (Object)context.channel().remoteAddress());
            this.writeProtocolVersion(context, NettyMessagingService.this.protocolVersion);
        }

        public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
            this.readProtocolVersion(context, (ByteBuf)message).ifPresent(version -> {
                ProtocolVersion protocolVersion = ProtocolVersion.valueOf(version);
                if (protocolVersion != null) {
                    this.activateProtocolVersion(context, NettyMessagingService.this.getOrCreateClientConnection(context.channel()), protocolVersion);
                } else {
                    NettyMessagingService.this.log.error("Failed to negotiate protocol version");
                    context.close();
                }
            });
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            this.future.completeExceptionally(cause);
        }

        @Override
        void activateProtocolVersion(ChannelHandlerContext context, Connection<ProtocolReply> connection, ProtocolVersion protocolVersion) {
            NettyMessagingService.this.log.debug("Activating client protocol version {} for connection to {}", (Object)protocolVersion, (Object)context.channel().remoteAddress());
            super.activateProtocolVersion(context, connection, protocolVersion);
            this.future.complete(context.channel());
        }
    }

    private abstract class HandshakeHandlerAdapter<M extends ProtocolMessage>
    extends ChannelInboundHandlerAdapter {
        private HandshakeHandlerAdapter() {
        }

        void writeProtocolVersion(ChannelHandlerContext context, ProtocolVersion version) {
            ByteBuf buffer = context.alloc().buffer(6);
            buffer.writeInt(NettyMessagingService.this.preamble);
            buffer.writeShort((int)version.version());
            context.writeAndFlush((Object)buffer);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        OptionalInt readProtocolVersion(ChannelHandlerContext context, ByteBuf buffer) {
            try {
                int preamble = buffer.readInt();
                if (preamble != NettyMessagingService.this.preamble) {
                    NettyMessagingService.this.log.warn("Received invalid handshake, closing connection");
                    context.close();
                    OptionalInt optionalInt = OptionalInt.empty();
                    return optionalInt;
                }
                OptionalInt optionalInt = OptionalInt.of(buffer.readShort());
                return optionalInt;
            }
            finally {
                buffer.release();
            }
        }

        void activateProtocolVersion(ChannelHandlerContext context, Connection<M> connection, ProtocolVersion protocolVersion) {
            MessagingProtocol protocol = protocolVersion.createProtocol(NettyMessagingService.this.advertisedAddress);
            context.pipeline().remove((ChannelHandler)this);
            context.pipeline().addLast("encoder", protocol.newEncoder());
            context.pipeline().addLast("decoder", (ChannelHandler)protocol.newDecoder());
            context.pipeline().addLast("handler", new MessageDispatcher<M>(connection));
        }
    }
}

