/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql.client;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.socket.DatagramChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.SSLConfig;
import io.r2dbc.postgresql.client.SSLMode;
import io.r2dbc.postgresql.client.SSLSessionHandlerAdapter;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.client.Version;
import io.r2dbc.postgresql.message.backend.BackendKeyData;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.BackendMessageDecoder;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.Field;
import io.r2dbc.postgresql.message.backend.NoticeResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.message.backend.ParameterStatus;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Terminate;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.net.ssl.SSLException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpResources;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

public final class ReactorNettyClient
implements Client {
    private static final Logger logger = Loggers.getLogger(ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final Supplier<PostgresConnectionClosedException> UNEXPECTED = () -> new PostgresConnectionClosedException("Connection unexpectedly closed");
    private static final Supplier<PostgresConnectionClosedException> EXPECTED = () -> new PostgresConnectionClosedException("Connection closed");
    private final ByteBufAllocator byteBufAllocator;
    private final Connection connection;
    private final EmitterProcessor<Publisher<FrontendMessage>> requestProcessor = EmitterProcessor.create((boolean)false);
    private final FluxSink<Publisher<FrontendMessage>> requests = this.requestProcessor.sink();
    private final DirectProcessor<NotificationResponse> notificationProcessor = DirectProcessor.create();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final BackendMessageSubscriber messageSubscriber = new BackendMessageSubscriber();
    private volatile Integer processId;
    private volatile Integer secretKey;
    private volatile TransactionStatus transactionStatus = TransactionStatus.IDLE;
    private volatile Version version = new Version("", 0);

    private ReactorNettyClient(Connection connection) {
        Assert.requireNonNull(connection, "Connection must not be null");
        connection.addHandler((ChannelHandler)new LengthFieldBasedFrameDecoder(0x7FFFFFFA, 1, 4, -4, 0));
        connection.addHandler((ChannelHandler)new EnsureSubscribersCompleteChannelHandler(this.requestProcessor));
        this.connection = connection;
        this.byteBufAllocator = connection.outbound().alloc();
        AtomicReference receiveError = new AtomicReference();
        connection.inbound().receive().map(BackendMessageDecoder::decode).doOnError(throwable -> {
            receiveError.set(throwable);
            this.handleConnectionError((Throwable)throwable);
        }).handle((backendMessage, sink) -> {
            if (this.consumeMessage((BackendMessage)backendMessage)) {
                return;
            }
            sink.next(backendMessage);
        }).subscribe((CoreSubscriber)this.messageSubscriber);
        Mono request = this.requestProcessor.concatMap(Function.identity()).flatMap(message -> {
            if (DEBUG_ENABLED) {
                logger.debug("Request:  {}", new Object[]{message});
            }
            return connection.outbound().send(message.encode(this.byteBufAllocator));
        }, 1).then();
        request.onErrorResume(this::resumeError).doAfterTerminate(this::handleClose).subscribe();
    }

    @Override
    public Mono<Void> close() {
        return Mono.defer(() -> {
            if (!this.notificationProcessor.isTerminated()) {
                this.notificationProcessor.onComplete();
            }
            this.drainError(EXPECTED);
            boolean connected = this.isConnected();
            if (this.isClosed.compareAndSet(false, true)) {
                if (!connected || this.processId == null) {
                    this.connection.dispose();
                    return this.connection.onDispose();
                }
                return Flux.just((Object)Terminate.INSTANCE).doOnNext(message -> logger.debug("Request:  {}", new Object[]{message})).concatMap(message -> this.connection.outbound().send(message.encode(this.connection.outbound().alloc()))).then().doOnSuccess(v -> this.connection.dispose()).then(this.connection.onDispose());
            }
            return Mono.empty();
        });
    }

    @Override
    public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests) {
        Assert.requireNonNull(takeUntil, "takeUntil must not be null");
        Assert.requireNonNull(requests, "requests must not be null");
        return this.messageSubscriber.addConversation(takeUntil, requests, arg_0 -> this.requests.next(arg_0), this::isConnected);
    }

    @Override
    public void send(FrontendMessage message) {
        Assert.requireNonNull(message, "requests must not be null");
        this.requests.next((Object)Mono.just((Object)message));
    }

    private Mono<Void> resumeError(Throwable throwable) {
        this.handleConnectionError(throwable);
        this.requestProcessor.onComplete();
        if (ReactorNettyClient.isSslException(throwable)) {
            logger.debug("Connection Error", throwable);
        } else {
            logger.error("Connection Error", throwable);
        }
        return this.close();
    }

    private static boolean isSslException(Throwable throwable) {
        return throwable instanceof SSLException || throwable.getCause() instanceof SSLException;
    }

    private boolean consumeMessage(BackendMessage message) {
        if (DEBUG_ENABLED) {
            logger.debug("Response: {}", new Object[]{message});
        }
        if (message.getClass() == NoticeResponse.class) {
            logger.warn("Notice: {}", new Object[]{ReactorNettyClient.toString(((NoticeResponse)message).getFields())});
            return true;
        }
        if (message.getClass() == BackendKeyData.class) {
            BackendKeyData backendKeyData = (BackendKeyData)message;
            this.processId = backendKeyData.getProcessId();
            this.secretKey = backendKeyData.getSecretKey();
            return true;
        }
        if (message.getClass() == ErrorResponse.class) {
            logger.warn("Error: {}", new Object[]{ReactorNettyClient.toString(((ErrorResponse)message).getFields())});
        }
        if (message.getClass() == ParameterStatus.class) {
            this.handleParameterStatus((ParameterStatus)message);
        }
        if (message.getClass() == ReadyForQuery.class) {
            this.transactionStatus = TransactionStatus.valueOf(((ReadyForQuery)message).getTransactionStatus());
        }
        if (message.getClass() == NotificationResponse.class) {
            this.notificationProcessor.onNext((Object)((NotificationResponse)message));
            return true;
        }
        return false;
    }

    private void handleParameterStatus(ParameterStatus message) {
        Version existingVersion = this.version;
        String versionString = existingVersion.getVersion();
        int versionNum = existingVersion.getVersionNumber();
        if (message.getName().equals("server_version_num")) {
            versionNum = Integer.parseInt(message.getValue());
        }
        if (message.getName().equals("server_version")) {
            versionString = message.getValue();
            if (versionNum == 0) {
                versionNum = Version.parseServerVersionStr(versionString);
            }
        }
        this.version = new Version(versionString, versionNum);
    }

    public static Mono<ReactorNettyClient> connect(String host, int port) {
        Assert.requireNonNull(host, "host must not be null");
        return ReactorNettyClient.connect(host, port, null, new SSLConfig(SSLMode.DISABLE, null, null));
    }

    public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable Duration connectTimeout, SSLConfig sslConfig) {
        return ReactorNettyClient.connect(ConnectionProvider.newConnection(), InetSocketAddress.createUnresolved(host, port), connectTimeout, sslConfig);
    }

    public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, @Nullable Duration connectTimeout, SSLConfig sslConfig) {
        Assert.requireNonNull(connectionProvider, "connectionProvider must not be null");
        Assert.requireNonNull(socketAddress, "socketAddress must not be null");
        TcpClient tcpClient = TcpClient.create((ConnectionProvider)connectionProvider).addressSupplier(() -> socketAddress);
        if (!(socketAddress instanceof InetSocketAddress)) {
            tcpClient = tcpClient.runOn((LoopResources)new SocketLoopResources(), true);
        }
        if (connectTimeout != null) {
            tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(connectTimeout.toMillis()));
        }
        return tcpClient.connect().flatMap(it -> {
            ChannelPipeline pipeline = it.channel().pipeline();
            InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class);
            if (logger.isTraceEnabled()) {
                pipeline.addFirst(LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
            }
            return ReactorNettyClient.registerSslHandler(sslConfig, it).thenReturn((Object)new ReactorNettyClient((Connection)it));
        });
    }

    private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Connection it) {
        if (sslConfig.getSslMode().startSsl()) {
            SSLSessionHandlerAdapter sslSessionHandlerAdapter = new SSLSessionHandlerAdapter(it.outbound().alloc(), sslConfig);
            it.addHandlerFirst((ChannelHandler)sslSessionHandlerAdapter);
            return sslSessionHandlerAdapter.getHandshake();
        }
        return Mono.empty();
    }

    @Override
    public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
        return this.notificationProcessor.subscribe(consumer);
    }

    @Override
    public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
        return this.notificationProcessor.subscribe(arg_0 -> consumer.onNext(arg_0), arg_0 -> consumer.onError(arg_0), () -> consumer.onComplete(), arg_0 -> consumer.onSubscribe(arg_0));
    }

    @Override
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator;
    }

    @Override
    public Optional<Integer> getProcessId() {
        return Optional.ofNullable(this.processId);
    }

    @Override
    public Optional<Integer> getSecretKey() {
        return Optional.ofNullable(this.secretKey);
    }

    @Override
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus;
    }

    @Override
    public Version getVersion() {
        return this.version;
    }

    @Override
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        if (this.requestProcessor.isDisposed()) {
            return false;
        }
        Channel channel = this.connection.channel();
        return channel.isOpen();
    }

    private static String toString(List<Field> fields) {
        StringJoiner joiner = new StringJoiner(", ");
        for (Field field : fields) {
            joiner.add(field.getType().name() + "=" + field.getValue());
        }
        return joiner.toString();
    }

    private void handleClose() {
        if (this.isClosed.compareAndSet(false, true)) {
            this.drainError(UNEXPECTED);
        } else {
            this.drainError(EXPECTED);
        }
    }

    private void handleConnectionError(Throwable error) {
        this.drainError(() -> new PostgresConnectionException(error));
    }

    private void drainError(Supplier<? extends Throwable> supplier) {
        this.messageSubscriber.close(supplier);
        if (!this.notificationProcessor.isTerminated()) {
            this.notificationProcessor.onError(supplier.get());
        }
    }

    private class BackendMessageSubscriber
    implements CoreSubscriber<BackendMessage> {
        private static final int DEMAND = 256;
        private final Queue<Conversation> conversations = (Queue)Queues.small().get();
        private final Queue<BackendMessage> buffer = (Queue)Queues.get((int)256).get();
        private final AtomicLong demand = new AtomicLong(0L);
        private final AtomicBoolean drain = new AtomicBoolean();
        private volatile boolean terminated;
        private Subscription upstream;

        private BackendMessageSubscriber() {
        }

        public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender, Supplier<Boolean> isConnected) {
            return Flux.create(sink -> {
                Conversation conversation = new Conversation(takeUntil, (FluxSink)sink);
                Queue<Conversation> queue = this.conversations;
                synchronized (queue) {
                    if (this.conversations.offer(conversation)) {
                        sink.onRequest(value -> this.onRequest(conversation, value));
                        if (!((Boolean)isConnected.get()).booleanValue()) {
                            sink.error((Throwable)((Object)new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed")));
                            return;
                        }
                        Flux requestMessages = Flux.from((Publisher)requests).doOnNext(arg_0 -> BackendMessageSubscriber.lambda$null$1((Supplier)isConnected, sink, arg_0));
                        sender.accept(requestMessages);
                    } else {
                        sink.error((Throwable)((Object)new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded")));
                    }
                }
            });
        }

        public void onRequest(Conversation conversation, long n) {
            conversation.incrementDemand(n);
            while (this.hasBufferedItems() && this.hasDownstreamDemand()) {
                this.drainLoop();
            }
        }

        private void demandMore() {
            if (!this.hasBufferedItems() && this.demand.compareAndSet(0L, 256L)) {
                this.upstream.request(256L);
            }
        }

        public void onSubscribe(Subscription s) {
            this.upstream = s;
            this.demandMore();
        }

        private boolean hasDownstreamDemand() {
            Conversation conversation = this.conversations.peek();
            return conversation != null && conversation.hasDemand();
        }

        public void onNext(BackendMessage message) {
            Conversation conversation;
            if (this.terminated) {
                ReferenceCountUtil.release((Object)message);
                Operators.onNextDropped((Object)message, (Context)this.currentContext());
                return;
            }
            this.demand.decrementAndGet();
            if (this.buffer.isEmpty() && (conversation = this.conversations.peek()) != null && conversation.hasDemand()) {
                this.emit(conversation, message);
                this.potentiallyDemandMore(conversation);
                return;
            }
            if (!this.buffer.offer(message)) {
                ReferenceCountUtil.release((Object)message);
                Operators.onNextDropped((Object)message, (Context)this.currentContext());
                this.onError((Throwable)((Object)new ResponseQueueException("Response queue is full")));
                return;
            }
            while (this.hasBufferedItems() && this.hasDownstreamDemand()) {
                this.drainLoop();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void drainLoop() {
            Conversation lastConversation = null;
            if (this.drain.compareAndSet(false, true)) {
                try {
                    while (this.hasBufferedItems()) {
                        Conversation conversation;
                        lastConversation = conversation = this.conversations.peek();
                        if (conversation == null) {
                            break;
                        }
                        if (!conversation.hasDemand()) break;
                        BackendMessage item = this.buffer.poll();
                        if (item == null) {
                            break;
                        }
                        this.emit(conversation, item);
                    }
                }
                finally {
                    this.drain.compareAndSet(true, false);
                }
            }
            this.potentiallyDemandMore(lastConversation);
        }

        private void potentiallyDemandMore(@Nullable Conversation lastConversation) {
            if (lastConversation == null || lastConversation.hasDemand() || lastConversation.isCancelled()) {
                this.demandMore();
            }
        }

        private void emit(Conversation conversation, BackendMessage item) {
            if (conversation.canComplete(item)) {
                this.conversations.poll();
                conversation.complete(item);
            } else {
                conversation.emit(item);
            }
        }

        private boolean hasBufferedItems() {
            return !this.buffer.isEmpty();
        }

        public void onError(Throwable throwable) {
            if (this.terminated) {
                Operators.onErrorDropped((Throwable)throwable, (Context)this.currentContext());
                return;
            }
            ReactorNettyClient.this.handleConnectionError(throwable);
            ReactorNettyClient.this.requestProcessor.onComplete();
            this.terminated = true;
            if (ReactorNettyClient.isSslException(throwable)) {
                logger.debug("Connection Error", throwable);
            } else {
                logger.error("Connection Error", throwable);
            }
            ReactorNettyClient.this.close().subscribe();
        }

        public void onComplete() {
            this.terminated = true;
            ReactorNettyClient.this.handleClose();
        }

        public Context currentContext() {
            Conversation receiver = this.conversations.peek();
            if (receiver != null) {
                return receiver.sink.currentContext();
            }
            return Context.empty();
        }

        public void close(Supplier<? extends Throwable> supplier) {
            Conversation receiver;
            this.terminated = true;
            while ((receiver = this.conversations.poll()) != null) {
                receiver.onError(supplier.get());
            }
            while (!this.buffer.isEmpty()) {
                ReferenceCountUtil.release((Object)this.buffer.poll());
            }
        }

        private static /* synthetic */ void lambda$null$1(Supplier isConnected, FluxSink sink, FrontendMessage m) {
            if (!((Boolean)isConnected.get()).booleanValue()) {
                sink.error((Throwable)((Object)new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed")));
            }
        }
    }

    private static class Conversation {
        private static final AtomicLongFieldUpdater<Conversation> DEMAND_UPDATER = AtomicLongFieldUpdater.newUpdater(Conversation.class, "demand");
        private final Predicate<BackendMessage> takeUntil;
        private final FluxSink<BackendMessage> sink;
        private volatile long demand;

        private Conversation(Predicate<BackendMessage> takeUntil, FluxSink<BackendMessage> sink) {
            this.sink = sink;
            this.takeUntil = takeUntil;
        }

        private long decrementDemand() {
            return Operators.addCap(DEMAND_UPDATER, (Object)this, (long)-1L);
        }

        public boolean canComplete(BackendMessage item) {
            return this.takeUntil.test(item);
        }

        public void complete(BackendMessage item) {
            ReferenceCountUtil.release((Object)item);
            if (!this.sink.isCancelled()) {
                this.sink.complete();
            }
        }

        public void emit(BackendMessage item) {
            if (this.sink.isCancelled()) {
                ReferenceCountUtil.release((Object)item);
            }
            this.decrementDemand();
            this.sink.next((Object)item);
        }

        public void onError(Throwable throwable) {
            if (!this.sink.isCancelled()) {
                this.sink.error(throwable);
            }
        }

        public boolean hasDemand() {
            return DEMAND_UPDATER.get(this) > 0L;
        }

        public boolean isCancelled() {
            return this.sink.isCancelled();
        }

        public void incrementDemand(long n) {
            Operators.addCap(DEMAND_UPDATER, (Object)this, (long)n);
        }
    }

    static class SocketLoopResources
    implements LoopResources {
        @Nullable
        private static final Class<? extends Channel> EPOLL_SOCKET = SocketLoopResources.findClass("io.netty.channel.epoll.EpollDomainSocketChannel");
        @Nullable
        private static final Class<? extends Channel> KQUEUE_SOCKET = SocketLoopResources.findClass("io.netty.channel.kqueue.KQueueDomainSocketChannel");
        private static final boolean kqueue;
        private static final boolean epoll;
        private final LoopResources delegate = TcpResources.get();

        SocketLoopResources() {
        }

        private static Class<? extends Channel> findClass(String className) {
            try {
                return SocketLoopResources.class.getClassLoader().loadClass(className);
            }
            catch (ClassNotFoundException e) {
                return null;
            }
        }

        public Class<? extends Channel> onChannel(EventLoopGroup group) {
            if (epoll && EPOLL_SOCKET != null) {
                return EPOLL_SOCKET;
            }
            if (kqueue && KQUEUE_SOCKET != null) {
                return KQUEUE_SOCKET;
            }
            return this.delegate.onChannel(group);
        }

        public EventLoopGroup onClient(boolean useNative) {
            return this.delegate.onClient(useNative);
        }

        public Class<? extends DatagramChannel> onDatagramChannel(EventLoopGroup group) {
            return this.delegate.onDatagramChannel(group);
        }

        public EventLoopGroup onServer(boolean useNative) {
            return this.delegate.onServer(useNative);
        }

        public Class<? extends ServerChannel> onServerChannel(EventLoopGroup group) {
            return this.delegate.onServerChannel(group);
        }

        public EventLoopGroup onServerSelect(boolean useNative) {
            return this.delegate.onServerSelect(useNative);
        }

        public boolean preferNative() {
            return this.delegate.preferNative();
        }

        public boolean daemon() {
            return this.delegate.daemon();
        }

        public void dispose() {
            this.delegate.dispose();
        }

        public Mono<Void> disposeLater() {
            return this.delegate.disposeLater();
        }

        static {
            boolean kqueueCheck = false;
            try {
                Class.forName("io.netty.channel.kqueue.KQueue");
                kqueueCheck = KQueue.isAvailable();
            }
            catch (ClassNotFoundException classNotFoundException) {
                // empty catch block
            }
            kqueue = kqueueCheck;
            boolean epollCheck = false;
            try {
                Class.forName("io.netty.channel.epoll.Epoll");
                epollCheck = Epoll.isAvailable();
            }
            catch (ClassNotFoundException classNotFoundException) {
                // empty catch block
            }
            epoll = epollCheck;
        }
    }

    static class ResponseQueueException
    extends R2dbcNonTransientResourceException {
        public ResponseQueueException(String message) {
            super(message);
        }
    }

    static class RequestQueueException
    extends R2dbcTransientResourceException {
        public RequestQueueException(String message) {
            super(message);
        }
    }

    static class PostgresConnectionException
    extends R2dbcNonTransientResourceException {
        public PostgresConnectionException(Throwable cause) {
            super(cause);
        }
    }

    static class PostgresConnectionClosedException
    extends R2dbcNonTransientResourceException {
        public PostgresConnectionClosedException(String reason) {
            super(reason);
        }
    }

    private final class EnsureSubscribersCompleteChannelHandler
    extends ChannelDuplexHandler {
        private final EmitterProcessor<Publisher<FrontendMessage>> requestProcessor;

        private EnsureSubscribersCompleteChannelHandler(EmitterProcessor<Publisher<FrontendMessage>> requestProcessor) {
            this.requestProcessor = requestProcessor;
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
        }

        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            super.channelUnregistered(ctx);
            this.requestProcessor.onComplete();
            ReactorNettyClient.this.handleClose();
        }
    }
}

