/*
 * Decompiled with CFR 0.152.
 */
package org.mariadb.r2dbc.client;

import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import org.mariadb.r2dbc.MariadbConnectionConfiguration;
import org.mariadb.r2dbc.client.Client;
import org.mariadb.r2dbc.client.ClientBase;
import org.mariadb.r2dbc.client.CmdElement;
import org.mariadb.r2dbc.client.DecoderState;
import org.mariadb.r2dbc.message.ClientMessage;
import org.mariadb.r2dbc.message.ServerMessage;
import org.mariadb.r2dbc.message.client.ExecutePacket;
import org.mariadb.r2dbc.message.client.PreparePacket;
import org.mariadb.r2dbc.message.client.QueryPacket;
import org.mariadb.r2dbc.util.HostAddress;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;

public final class ClientPipelineImpl
extends ClientBase {
    private static final Logger logger = Loggers.getLogger(ClientPipelineImpl.class);

    private ClientPipelineImpl(Connection connection, MariadbConnectionConfiguration configuration, HostAddress hostAddress) {
        super(connection, configuration, hostAddress);
    }

    public static Mono<Client> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, HostAddress hostAddress, MariadbConnectionConfiguration configuration) {
        TcpClient tcpClient = TcpClient.create((ConnectionProvider)connectionProvider).remoteAddress(() -> socketAddress).runOn(configuration.loopResources());
        tcpClient = ClientPipelineImpl.setSocketOption(configuration, tcpClient);
        return tcpClient.connect().flatMap(it -> Mono.just((Object)new ClientPipelineImpl((Connection)it, configuration, hostAddress)));
    }

    @Override
    public void sendCommandWithoutResult(ClientMessage message) {
        try {
            this.lock.lock();
            this.connection.channel().writeAndFlush((Object)message);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Flux<ServerMessage> sendCommand(PreparePacket preparePacket, ExecutePacket executePacket) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return Flux.create(sink -> {
            if (!this.isConnected()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
                return;
            }
            if (atomicBoolean.compareAndSet(false, true)) {
                try {
                    this.lock.lock();
                    this.responseReceivers.add(new CmdElement((FluxSink<ServerMessage>)sink, DecoderState.PREPARE_AND_EXECUTE_RESPONSE, preparePacket.getSql()));
                    this.connection.channel().writeAndFlush((Object)preparePacket);
                    this.connection.channel().writeAndFlush((Object)executePacket);
                }
                finally {
                    this.lock.unlock();
                }
            }
        });
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, DecoderState initialState, String sql) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return Flux.create(sink -> {
            if (!this.isConnected()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
                return;
            }
            if (atomicBoolean.compareAndSet(false, true)) {
                try {
                    this.lock.lock();
                    this.responseReceivers.add(new CmdElement((FluxSink<ServerMessage>)sink, initialState, sql));
                    this.connection.channel().writeAndFlush((Object)message);
                }
                finally {
                    this.lock.unlock();
                }
            }
        });
    }

    @Override
    protected void begin(FluxSink<ServerMessage> sink, String sql) {
        if (!this.responseReceivers.isEmpty() || (this.context.getServerStatus() & 1) == 0) {
            this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, sql));
            this.connection.channel().writeAndFlush((Object)new QueryPacket(sql));
        } else {
            logger.debug("Skipping start transaction because already in transaction");
            sink.complete();
        }
    }

    @Override
    protected void executeAutoCommit(FluxSink<ServerMessage> sink, boolean autoCommit) {
        String cmd = "SET autocommit=" + (autoCommit ? (char)'1' : '0');
        if (this.responseReceivers.isEmpty() || autoCommit != this.isAutoCommit()) {
            this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, cmd));
            this.connection.channel().writeAndFlush((Object)new QueryPacket(cmd));
        } else {
            logger.debug("Skipping autocommit since already in that state");
            sink.complete();
        }
    }

    @Override
    protected void executeWhenTransaction(FluxSink<ServerMessage> sink, String cmd) {
        if (!this.responseReceivers.isEmpty() || (this.context.getServerStatus() & 1) > 0) {
            this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, cmd));
            this.connection.channel().writeAndFlush((Object)new QueryPacket(cmd));
        } else {
            logger.debug(String.format("Skipping '%s' because no active transaction", cmd));
            sink.complete();
        }
    }

    @Override
    public void sendNext() {
    }
}

