/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql.client;

import dev.miku.r2dbc.mysql.MySqlSslConfiguration;
import dev.miku.r2dbc.mysql.client.Client;
import dev.miku.r2dbc.mysql.client.EnvelopeSlicer;
import dev.miku.r2dbc.mysql.client.Lifecycle;
import dev.miku.r2dbc.mysql.client.MessageDuplexCodec;
import dev.miku.r2dbc.mysql.client.RequestQueue;
import dev.miku.r2dbc.mysql.client.RequestTask;
import dev.miku.r2dbc.mysql.client.SslBridgeHandler;
import dev.miku.r2dbc.mysql.client.SslState;
import dev.miku.r2dbc.mysql.message.client.ClientMessage;
import dev.miku.r2dbc.mysql.message.client.ExchangeableMessage;
import dev.miku.r2dbc.mysql.message.client.ExitMessage;
import dev.miku.r2dbc.mysql.message.client.SendOnlyMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.WarningMessage;
import dev.miku.r2dbc.mysql.util.AssertUtils;
import dev.miku.r2dbc.mysql.util.ConnectionContext;
import io.netty.channel.ChannelHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.FutureMono;

final class ReactorNettyClient
implements Client {
    private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class);
    private static final Consumer<ServerMessage> INFO_LOGGING = ReactorNettyClient::infoLogging;
    private static final Consumer<ServerMessage> DEBUG_LOGGING = message -> {
        logger.debug("Response: {}", message);
        ReactorNettyClient.infoLogging(message);
    };
    private static final BiConsumer<Object, SynchronousSink<ServerMessage>> INBOUND_HANDLE = ReactorNettyClient::inboundHandle;
    private final Connection connection;
    private final ConnectionContext context;
    private final EmitterProcessor<ServerMessage> responseProcessor = EmitterProcessor.create((boolean)false);
    private final RequestQueue requestQueue = new RequestQueue();
    private final AtomicBoolean closing = new AtomicBoolean();

    ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) {
        AssertUtils.requireNonNull(connection, "connection must not be null");
        AssertUtils.requireNonNull(context, "context must not be null");
        AssertUtils.requireNonNull(ssl, "ssl must not be null");
        this.connection = connection;
        this.context = context;
        connection.addHandlerLast("R2dbcMySqlEnvelopeSlicer", (ChannelHandler)new EnvelopeSlicer()).addHandlerLast("R2dbcMySqlMessageDuplexCodec", (ChannelHandler)new MessageDuplexCodec(context, this.closing, this.requestQueue));
        if (ssl.getSslMode().startSsl()) {
            connection.addHandlerFirst("R2dbcMySqlSslBridgeHandler", (ChannelHandler)new SslBridgeHandler(context, ssl));
        }
        if (InternalLoggerFactory.getInstance(ReactorNettyClient.class).isTraceEnabled()) {
            logger.debug("Connection tracking logging is enabled");
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
        }
        Flux inbound = connection.inbound().receiveObject().handle(INBOUND_HANDLE);
        if (logger.isDebugEnabled()) {
            inbound = inbound.doOnNext(DEBUG_LOGGING);
        } else if (logger.isInfoEnabled()) {
            inbound = inbound.doOnNext(INFO_LOGGING);
        }
        inbound.subscribe(arg_0 -> this.responseProcessor.onNext(arg_0), throwable -> {
            try {
                logger.error("Connection Error: {}", (Object)throwable.getMessage(), throwable);
                this.responseProcessor.onError(throwable);
            }
            finally {
                connection.dispose();
            }
        }, () -> this.responseProcessor.onComplete());
    }

    @Override
    public Flux<ServerMessage> exchange(ExchangeableMessage request, Predicate<ServerMessage> complete) {
        AssertUtils.requireNonNull(request, "request must not be null");
        return Mono.create(sink -> {
            if (!this.isConnected()) {
                if (request instanceof Disposable) {
                    ((Disposable)request).dispose();
                }
                sink.error((Throwable)new IllegalStateException("Cannot send messages because the connection is closed"));
                return;
            }
            this.requestQueue.submit(RequestTask.wrap(request, sink, () -> {
                boolean[] completed = new boolean[]{false};
                return this.send(request).thenMany(this.responseProcessor).handle((message, response) -> {
                    if (complete.test((ServerMessage)message)) {
                        completed[0] = true;
                        response.next(message);
                        response.complete();
                    } else {
                        response.next(message);
                    }
                }).doOnTerminate((Runnable)this.requestQueue).doOnCancel(() -> ReactorNettyClient.exchangeCancel(completed));
            }));
        }).flatMapMany(ReactorNettyClient.identity());
    }

    @Override
    public Mono<Void> sendOnly(SendOnlyMessage message) {
        AssertUtils.requireNonNull(message, "message must not be null");
        return Mono.create(sink -> {
            if (!this.isConnected()) {
                if (message instanceof Disposable) {
                    ((Disposable)message).dispose();
                }
                sink.error((Throwable)new IllegalStateException("Cannot send messages because the connection is closed"));
                return;
            }
            this.requestQueue.submit(RequestTask.wrap(message, sink, () -> this.send(message).doOnTerminate((Runnable)this.requestQueue)));
        }).flatMap(ReactorNettyClient.identity());
    }

    @Override
    public Mono<ServerMessage> receiveOnly() {
        return Mono.create(sink -> {
            if (!this.isConnected()) {
                sink.error((Throwable)new IllegalStateException("Cannot receive messages because the connection is closed"));
                return;
            }
            this.requestQueue.submit(RequestTask.wrap(sink, () -> {
                boolean[] completed = new boolean[]{false};
                return this.responseProcessor.next().doOnSuccess(ignored -> {
                    completed[0] = true;
                }).doOnTerminate((Runnable)this.requestQueue).doOnCancel(() -> ReactorNettyClient.exchangeCancel(completed));
            }));
        }).flatMap(ReactorNettyClient.identity());
    }

    @Override
    public Mono<Void> close() {
        return Mono.create(sink -> {
            if (!this.closing.compareAndSet(false, true)) {
                sink.success();
                return;
            }
            this.requestQueue.submit(RequestTask.wrap(sink, () -> this.send(ExitMessage.getInstance()).onErrorResume(e -> {
                logger.error("Exit message sending failed, force closing", e);
                return Mono.empty();
            }).then(this.forceClose())));
        }).flatMap(ReactorNettyClient.identity());
    }

    @Override
    public Mono<Void> forceClose() {
        return FutureMono.deferFuture(() -> this.connection.channel().close());
    }

    @Override
    public boolean isConnected() {
        return !this.closing.get() && this.connection.channel().isOpen();
    }

    @Override
    public void sslUnsupported() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object)SslState.UNSUPPORTED);
    }

    @Override
    public void loginSuccess() {
        this.connection.channel().pipeline().fireUserEventTriggered((Object)Lifecycle.COMMAND);
    }

    public String toString() {
        return String.format("ReactorNettyClient(%s){connectionId=%d}", this.closing.get() ? "closing or closed" : "activating", this.context.getConnectionId());
    }

    private Mono<Void> send(ClientMessage message) {
        logger.debug("Request: {}", (Object)message);
        return FutureMono.from((Future)this.connection.channel().writeAndFlush((Object)message));
    }

    private static void inboundHandle(Object msg, SynchronousSink<ServerMessage> sink) {
        if (msg instanceof ServerMessage) {
            if (msg instanceof ReferenceCounted) {
                ((ReferenceCounted)msg).retain();
            }
            sink.next((Object)((ServerMessage)msg));
        } else {
            sink.error((Throwable)new IllegalStateException("Impossible inbound type: " + msg.getClass()));
        }
    }

    private static void exchangeCancel(boolean[] completed) {
        if (!completed[0]) {
            logger.error("Exchange cancelled while exchange is active. This is likely a bug leading to unpredictable outcome.");
        }
    }

    private static void infoLogging(ServerMessage message) {
        int warnings;
        if (message instanceof WarningMessage && (warnings = ((WarningMessage)message).getWarnings()) != 0) {
            logger.info("MySQL reports {} warning(s)", (Object)warnings);
        }
    }

    private static <T> Function<T, T> identity() {
        return Identity.INSTANCE;
    }

    private static final class Identity
    implements Function<Object, Object> {
        private static final Identity INSTANCE = new Identity();

        private Identity() {
        }

        @Override
        public Object apply(Object o) {
            return o;
        }
    }
}

