/*
 * Decompiled with CFR 0.152.
 */
package org.mariadb.r2dbc.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.mariadb.r2dbc.ExceptionFactory;
import org.mariadb.r2dbc.MariadbConnectionConfiguration;
import org.mariadb.r2dbc.client.Client;
import org.mariadb.r2dbc.client.CmdElement;
import org.mariadb.r2dbc.client.Context;
import org.mariadb.r2dbc.client.DecoderState;
import org.mariadb.r2dbc.client.MariadbPacketDecoder;
import org.mariadb.r2dbc.client.MariadbPacketEncoder;
import org.mariadb.r2dbc.client.ServerVersion;
import org.mariadb.r2dbc.message.client.ClientMessage;
import org.mariadb.r2dbc.message.client.QueryPacket;
import org.mariadb.r2dbc.message.client.QuitPacket;
import org.mariadb.r2dbc.message.client.SslRequestPacket;
import org.mariadb.r2dbc.message.server.InitialHandshakePacket;
import org.mariadb.r2dbc.message.server.ServerMessage;
import org.mariadb.r2dbc.util.PrepareCache;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

public abstract class ClientBase
implements Client {
    private static final Logger logger = Loggers.getLogger(ClientBase.class);
    protected final ReentrantLock lock = new ReentrantLock();
    private final MariadbConnectionConfiguration configuration;
    protected final Connection connection;
    protected final Queue<CmdElement> responseReceivers = (Queue)Queues.unbounded().get();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final MariadbPacketDecoder mariadbPacketDecoder;
    private final MariadbPacketEncoder mariadbPacketEncoder = new MariadbPacketEncoder();
    private volatile Context context;
    private final PrepareCache prepareCache;

    protected ClientBase(Connection connection, MariadbConnectionConfiguration configuration) {
        this.connection = connection;
        this.configuration = configuration;
        this.prepareCache = this.configuration.useServerPrepStmts() ? new PrepareCache(this.configuration.getPrepareCacheSize(), this) : null;
        this.mariadbPacketDecoder = new MariadbPacketDecoder(this.responseReceivers, this);
        connection.addHandler((ChannelHandler)this.mariadbPacketDecoder);
        connection.addHandler((ChannelHandler)this.mariadbPacketEncoder);
        if (logger.isTraceEnabled()) {
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(ClientBase.class, LogLevel.TRACE));
        }
        connection.inbound().receive().doOnError(this::handleConnectionError).doOnComplete(this::closedServlet).then().subscribe();
    }

    public static TcpClient setSocketOption(MariadbConnectionConfiguration configuration, TcpClient tcpClient) {
        if (configuration.getConnectTimeout() != null) {
            tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(configuration.getConnectTimeout().toMillis()));
        }
        if (configuration.getSocketTimeout() != null) {
            tcpClient = tcpClient.option(ChannelOption.SO_TIMEOUT, (Object)Math.toIntExact(configuration.getSocketTimeout().toMillis()));
        }
        if (configuration.isTcpKeepAlive()) {
            tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, (Object)configuration.isTcpKeepAlive());
        }
        if (configuration.isTcpAbortiveClose()) {
            tcpClient = tcpClient.option(ChannelOption.SO_LINGER, (Object)0);
        }
        return tcpClient;
    }

    private void handleConnectionError(Throwable throwable) {
        R2dbcNonTransientResourceException err;
        if (this.isClosed.compareAndSet(false, true)) {
            err = new R2dbcNonTransientResourceException("Connection unexpected error", "08000", throwable);
            logger.error("Connection unexpected error", throwable);
        } else {
            err = new R2dbcNonTransientResourceException("Connection error", "08000", throwable);
            logger.error("Connection error", throwable);
        }
        this.clearWaitingListWithError((Throwable)err);
    }

    @Override
    public Mono<Void> close() {
        return Mono.defer(() -> {
            if (this.isClosed.compareAndSet(false, true)) {
                Channel channel = this.connection.channel();
                if (!channel.isOpen()) {
                    this.connection.dispose();
                    return this.connection.onDispose();
                }
                return Flux.just((Object)QuitPacket.INSTANCE).doOnNext(message -> this.connection.channel().writeAndFlush(message)).then().doOnSuccess(v -> this.connection.dispose()).then(this.connection.onDispose());
            }
            return Mono.empty();
        });
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message) {
        return this.sendCommand(message, DecoderState.QUERY_RESPONSE);
    }

    @Override
    public Mono<Void> sendSslRequest(SslRequestPacket sslRequest, MariadbConnectionConfiguration configuration) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        try {
            SSLEngine engine = configuration.getSslConfig().getSslContext().newEngine(this.connection.channel().alloc());
            SslHandler sslHandler = new SslHandler(engine);
            GenericFutureListener<Future<? super Channel>> listener = configuration.getSslConfig().getHostNameVerifier(result, configuration.getHost(), this.context.getThreadId(), engine);
            sslHandler.handshakeFuture().addListener(listener);
            this.connection.channel().writeAndFlush((Object)sslRequest);
            this.connection.addHandlerFirst((ChannelHandler)sslHandler);
            return Mono.fromFuture(result);
        }
        catch (R2dbcTransientResourceException | SSLException e) {
            result.completeExceptionally(e);
            return Mono.fromFuture(result);
        }
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, DecoderState initialState) {
        return this.sendCommand(message, initialState, null);
    }

    @Override
    public abstract Flux<ServerMessage> sendCommand(ClientMessage var1, DecoderState var2, String var3);

    @Override
    public Flux<ServerMessage> receive(DecoderState initialState) {
        return Flux.create(sink -> this.responseReceivers.add(new CmdElement((FluxSink<ServerMessage>)sink, initialState)));
    }

    @Override
    public void setContext(InitialHandshakePacket handshake) {
        this.context = new Context(handshake.getServerVersion(), handshake.getThreadId(), handshake.getSeed(), handshake.getCapabilities(), handshake.getServerStatus(), handshake.isMariaDBServer());
        this.mariadbPacketDecoder.setContext(this.context);
        this.mariadbPacketEncoder.setContext(this.context);
    }

    @Override
    public LockAction getLockAction() {
        return new LockAction();
    }

    @Override
    public boolean isAutoCommit() {
        return (this.context.getServerStatus() & 2) > 0;
    }

    @Override
    public boolean noBackslashEscapes() {
        return (this.context.getServerStatus() & 0x200) > 0;
    }

    @Override
    public ServerVersion getVersion() {
        return this.context != null ? this.context.getVersion() : ServerVersion.UNKNOWN_VERSION;
    }

    @Override
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        return this.connection.channel().isOpen();
    }

    private void closedServlet() {
        if (this.isClosed.compareAndSet(false, true)) {
            this.clearWaitingListWithError((Throwable)new R2dbcNonTransientResourceException("Connection unexpectedly closed"));
        } else {
            this.clearWaitingListWithError((Throwable)new R2dbcNonTransientResourceException("Connection closed"));
        }
    }

    private void clearWaitingListWithError(Throwable exception) {
        CmdElement response;
        this.mariadbPacketDecoder.connectionError(exception);
        while ((response = this.responseReceivers.poll()) != null) {
            response.getSink().error(exception);
        }
    }

    @Override
    public abstract void sendNext();

    @Override
    public PrepareCache getPrepareCache() {
        return this.prepareCache;
    }

    public String toString() {
        return "Client{isClosed=" + this.isClosed + ", context=" + this.context + '}';
    }

    public class LockAction
    implements AutoCloseable {
        public LockAction() {
            ClientBase.this.lock.lock();
        }

        public Mono<Void> rollbackTransaction() {
            if (!ClientBase.this.responseReceivers.isEmpty() || (ClientBase.this.context.getServerStatus() & 1) > 0) {
                return this.exchange("ROLLBACK").then();
            }
            logger.debug("Skipping savepoint release because no active transaction");
            return Mono.empty();
        }

        public Mono<Void> releaseSavepoint(String name) {
            return this.exchange(String.format("RELEASE SAVEPOINT `%s`", name.replace("`", "``"))).then();
        }

        public Mono<Void> beginTransaction() {
            if (!ClientBase.this.responseReceivers.isEmpty() || (ClientBase.this.context.getServerStatus() & 1) == 0) {
                return this.exchange("BEGIN").then();
            }
            logger.debug("Skipping begin transaction because already in transaction");
            return Mono.empty();
        }

        public Mono<Void> commitTransaction() {
            if (!ClientBase.this.responseReceivers.isEmpty() || (ClientBase.this.context.getServerStatus() & 1) > 0) {
                return this.exchange("COMMIT").then();
            }
            logger.debug("Skipping commit transaction because no active transaction");
            return Mono.empty();
        }

        private Flux<ServerMessage> exchange(String sql) {
            ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
            return ClientBase.this.sendCommand(new QueryPacket(sql)).handle(exceptionFactory::handleErrorResponse);
        }

        public Mono<Void> createSavepoint(String name) {
            return this.exchange(String.format("SAVEPOINT `%s`", name.replace("`", "``"))).then();
        }

        public Mono<Void> rollbackTransactionToSavepoint(String name) {
            if (!ClientBase.this.responseReceivers.isEmpty() || (ClientBase.this.context.getServerStatus() & 1) > 0) {
                return this.exchange(String.format("ROLLBACK TO SAVEPOINT `%s`", name.replace("`", "``"))).then();
            }
            logger.debug("Skipping rollback to savepoint: no active transaction");
            return Mono.empty();
        }

        public Mono<Void> setAutoCommit(boolean autoCommit) {
            if (!ClientBase.this.responseReceivers.isEmpty() || autoCommit != ClientBase.this.isAutoCommit()) {
                return this.exchange("SET autocommit=" + (autoCommit ? (char)'1' : '0')).then();
            }
            return Mono.empty();
        }

        @Override
        public void close() {
            ClientBase.this.lock.unlock();
        }
    }
}

