/*
 * Decompiled with CFR 0.152.
 */
package io.asyncer.r2dbc.mysql.client;

import io.asyncer.r2dbc.mysql.ConnectionContext;
import io.asyncer.r2dbc.mysql.MySqlSslConfiguration;
import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.client.ClientExceptions;
import io.asyncer.r2dbc.mysql.client.EnvelopeSlicer;
import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
import io.asyncer.r2dbc.mysql.client.Lifecycle;
import io.asyncer.r2dbc.mysql.client.MessageDuplexCodec;
import io.asyncer.r2dbc.mysql.client.RequestQueue;
import io.asyncer.r2dbc.mysql.client.RequestTask;
import io.asyncer.r2dbc.mysql.client.SslBridgeHandler;
import io.asyncer.r2dbc.mysql.client.SslState;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.ExitMessage;
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
import io.asyncer.r2dbc.mysql.message.server.WarningMessage;
import io.asyncer.r2dbc.mysql.util.AssertUtils;
import io.asyncer.r2dbc.mysql.util.OperatorUtils;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCounted;
import io.r2dbc.spi.R2dbcException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

final class ReactorNettyClient
implements Client {
    private static final Logger logger = Loggers.getLogger(ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final boolean INFO_ENABLED = logger.isInfoEnabled();
    private static final Consumer<ReferenceCounted> RELEASE = ReferenceCounted::release;
    private final Connection connection;
    private final ConnectionContext context;
    private final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer();
    private final Sinks.Many<ServerMessage> responseProcessor = Sinks.many().multicast().onBackpressureBuffer(512, false);
    private final RequestQueue requestQueue = new RequestQueue();
    private final AtomicBoolean closing = new AtomicBoolean();

    ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) {
        AssertUtils.requireNonNull(connection, "connection must not be null");
        AssertUtils.requireNonNull(context, "context must not be null");
        AssertUtils.requireNonNull(ssl, "ssl must not be null");
        AssertUtils.require(this.responseProcessor.asFlux() instanceof Subscriber, "responseProcessor(" + this.responseProcessor + ") must be a Subscriber");
        this.connection = connection;
        this.context = context;
        connection.addHandlerLast("R2dbcMySqlEnvelopeSlicer", (ChannelHandler)new EnvelopeSlicer()).addHandlerLast("R2dbcMySqlMessageDuplexCodec", (ChannelHandler)new MessageDuplexCodec(context, this.closing, this.requestQueue));
        if (ssl.getSslMode().startSsl()) {
            connection.addHandlerFirst("R2dbcMySqlSslBridgeHandler", (ChannelHandler)new SslBridgeHandler(context, ssl));
        }
        if (logger.isTraceEnabled()) {
            logger.debug("Connection tracking logging is enabled");
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
        }
        ResponseSink sink = new ResponseSink();
        connection.inbound().receiveObject().doOnNext(it -> {
            if (it instanceof ServerMessage) {
                if (it instanceof ReferenceCounted) {
                    ((ReferenceCounted)it).retain();
                }
            } else {
                throw ClientExceptions.unsupportedProtocol(it.getClass().getTypeName());
            }
            sink.next((ServerMessage)it);
        }).onErrorResume(this::resumeError).subscribe((CoreSubscriber)new ResponseSubscriber(sink));
        this.requests.asFlux().concatMap(message -> {
            if (DEBUG_ENABLED) {
                logger.debug("Request: {}", new Object[]{message});
            }
            return connection.outbound().sendObject(message);
        }).onErrorResume(this::resumeError).doAfterTerminate(this::handleClose).subscribe();
    }

    @Override
    public <T> Flux<T> exchange(ClientMessage request, BiConsumer<ServerMessage, SynchronousSink<T>> handler) {
        AssertUtils.requireNonNull(request, "request must not be null");
        return Mono.create(sink -> {
            if (!this.isConnected()) {
                if (request instanceof Disposable) {
                    ((Disposable)request).dispose();
                }
                sink.error((Throwable)ClientExceptions.exchangeClosed());
                return;
            }
            Flux responses = OperatorUtils.discardOnCancel(this.responseProcessor.asFlux().doOnSubscribe(ignored -> this.emitNextRequest(request)).handle(handler).doOnTerminate((Runnable)this.requestQueue)).doOnDiscard(ReferenceCounted.class, RELEASE);
            this.requestQueue.submit(RequestTask.wrap(request, sink, responses));
        }).flatMapMany(ReactorNettyClient.identity());
    }

    @Override
    public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
        AssertUtils.requireNonNull(exchangeable, "exchangeable must not be null");
        return Mono.create(sink -> {
            if (!this.isConnected()) {
                exchangeable.subscribe(request -> {
                    if (request instanceof Disposable) {
                        ((Disposable)request).dispose();
                    }
                }, e -> this.requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST));
                sink.error((Throwable)ClientExceptions.exchangeClosed());
                return;
            }
            Flux responses = this.responseProcessor.asFlux().doOnSubscribe(ignored -> exchangeable.subscribe(this::emitNextRequest, e -> this.requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST))).handle((BiConsumer)exchangeable).doOnTerminate(() -> {
                exchangeable.dispose();
                this.requestQueue.run();
            });
            this.requestQueue.submit(RequestTask.wrap(exchangeable, sink, OperatorUtils.discardOnCancel(responses).doOnDiscard(ReferenceCounted.class, RELEASE).doOnCancel(() -> ((FluxExchangeable)exchangeable).dispose())));
        }).flatMapMany(ReactorNettyClient.identity());
    }

    @Override
    public Mono<Void> close() {
        return Mono.create(sink -> {
            if (!this.closing.compareAndSet(false, true)) {
                sink.success();
                return;
            }
            this.requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
                Sinks.EmitResult result = this.requests.tryEmitNext((Object)ExitMessage.INSTANCE);
                if (result != Sinks.EmitResult.OK) {
                    logger.error("Exit message sending failed due to {}, force closing", new Object[]{result});
                }
            })));
        }).flatMap(ReactorNettyClient.identity()).onErrorResume(e -> {
            logger.error("Exit message sending failed, force closing", e);
            return Mono.empty();
        }).then(this.forceClose());
    }

    @Override
    public Mono<Void> forceClose() {
        return FutureMono.deferFuture(() -> this.connection.channel().close());
    }

    @Override
    public ByteBufAllocator getByteBufAllocator() {
        return this.connection.outbound().alloc();
    }

    @Override
    public boolean isConnected() {
        return !this.closing.get() && this.connection.channel().isOpen();
    }

    @Override
    public void sslUnsupported() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object)SslState.UNSUPPORTED);
    }

    @Override
    public void loginSuccess() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object)Lifecycle.COMMAND);
    }

    public String toString() {
        return String.format("ReactorNettyClient(%s){connectionId=%d}", this.closing.get() ? "closing or closed" : "activating", this.context.getConnectionId());
    }

    private void emitNextRequest(ClientMessage request) {
        if (this.isConnected() && this.requests.tryEmitNext((Object)request) == Sinks.EmitResult.OK) {
            return;
        }
        if (request instanceof Disposable) {
            ((Disposable)request).dispose();
        }
    }

    private <T> Mono<T> resumeError(Throwable e) {
        this.drainError(ClientExceptions.wrap(e));
        this.requests.emitComplete((signalType, emitResult) -> {
            if (emitResult.isFailure()) {
                logger.error("Error: {}", new Object[]{emitResult});
            }
            return false;
        });
        logger.error("Error: {}", new Object[]{e.getLocalizedMessage(), e});
        return this.close();
    }

    private void drainError(R2dbcException e) {
        this.requestQueue.dispose();
        this.responseProcessor.emitError((Throwable)e, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    private void handleClose() {
        if (this.closing.compareAndSet(false, true)) {
            logger.warn("Connection has been closed by peer");
            this.drainError(ClientExceptions.unexpectedClosed());
        } else {
            this.drainError(ClientExceptions.expectedClosed());
        }
    }

    private static <T> Function<T, T> identity() {
        return Identity.INSTANCE;
    }

    private static final class Identity
    implements Function<Object, Object> {
        private static final Identity INSTANCE = new Identity();

        private Identity() {
        }

        @Override
        public Object apply(Object o) {
            return o;
        }
    }

    private final class ResponseSink
    implements SynchronousSink<ServerMessage> {
        private ResponseSink() {
        }

        public void complete() {
            throw new UnsupportedOperationException();
        }

        @Deprecated
        public Context currentContext() {
            return Context.empty();
        }

        public ContextView contextView() {
            return Context.empty();
        }

        public void error(Throwable e) {
            ReactorNettyClient.this.responseProcessor.emitError((Throwable)ClientExceptions.wrap(e), Sinks.EmitFailureHandler.FAIL_FAST);
        }

        public void next(ServerMessage message) {
            if (message instanceof WarningMessage) {
                int warnings = ((WarningMessage)message).getWarnings();
                if (warnings == 0) {
                    if (DEBUG_ENABLED) {
                        logger.debug("Response: {}", new Object[]{message});
                    }
                } else if (INFO_ENABLED) {
                    logger.info("Response: {}, reports {} warning(s)", new Object[]{message, warnings});
                }
            } else if (DEBUG_ENABLED) {
                logger.debug("Response: {}", new Object[]{message});
            }
            ReactorNettyClient.this.responseProcessor.emitNext((Object)message, Sinks.EmitFailureHandler.FAIL_FAST);
        }
    }

    private final class ResponseSubscriber
    implements CoreSubscriber<Object> {
        private final ResponseSink sink;

        private ResponseSubscriber(ResponseSink sink) {
            this.sink = sink;
        }

        public void onSubscribe(Subscription s) {
            ((Subscriber)ReactorNettyClient.this.responseProcessor.asFlux()).onSubscribe(s);
        }

        public void onNext(Object message) {
        }

        public void onError(Throwable t) {
            this.sink.error(t);
        }

        public void onComplete() {
            ReactorNettyClient.this.handleClose();
        }
    }
}

