/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.unix.DomainSocketAddress;
import io.r2dbc.postgresql.DefaultPortalNameSupplier;
import io.r2dbc.postgresql.DefaultPostgresqlReplicationConnection;
import io.r2dbc.postgresql.ExceptionFactory;
import io.r2dbc.postgresql.Extensions;
import io.r2dbc.postgresql.IndefiniteStatementCache;
import io.r2dbc.postgresql.PostgresqlConnection;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactoryMetadata;
import io.r2dbc.postgresql.SimpleQueryPostgresqlStatement;
import io.r2dbc.postgresql.api.PostgresqlReplicationConnection;
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ReactorNettyClient;
import io.r2dbc.postgresql.client.SSLConfig;
import io.r2dbc.postgresql.client.SSLMode;
import io.r2dbc.postgresql.client.StartupMessageFlow;
import io.r2dbc.postgresql.codec.DefaultCodecs;
import io.r2dbc.postgresql.extension.CodecRegistrar;
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.resources.ConnectionProvider;
import reactor.util.annotation.Nullable;

public final class PostgresqlConnectionFactory
implements ConnectionFactory {
    private static final String REPLICATION_OPTION = "replication";
    private static final String REPLICATION_DATABASE = "database";
    private final Function<SSLConfig, Mono<? extends Client>> clientFactory;
    private final PostgresqlConnectionConfiguration configuration;
    private final SocketAddress endpoint;
    private final Extensions extensions;

    public PostgresqlConnectionFactory(PostgresqlConnectionConfiguration configuration) {
        this.configuration = Assert.requireNonNull(configuration, "configuration must not be null");
        this.endpoint = PostgresqlConnectionFactory.createSocketAddress(configuration);
        this.clientFactory = sslConfig -> ReactorNettyClient.connect(ConnectionProvider.newConnection(), this.endpoint, configuration.getConnectTimeout(), sslConfig).cast(Client.class);
        this.extensions = PostgresqlConnectionFactory.getExtensions(configuration);
    }

    PostgresqlConnectionFactory(Function<SSLConfig, Mono<? extends Client>> clientFactory, PostgresqlConnectionConfiguration configuration) {
        this.configuration = Assert.requireNonNull(configuration, "configuration must not be null");
        this.endpoint = PostgresqlConnectionFactory.createSocketAddress(configuration);
        this.clientFactory = Assert.requireNonNull(clientFactory, "clientFactory must not be null");
        this.extensions = PostgresqlConnectionFactory.getExtensions(configuration);
    }

    private static SocketAddress createSocketAddress(PostgresqlConnectionConfiguration configuration) {
        if (!configuration.isUseSocket()) {
            return InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort());
        }
        if (configuration.isUseSocket()) {
            return new DomainSocketAddress(configuration.getRequiredSocket());
        }
        throw new IllegalArgumentException("Cannot create SocketAddress for " + configuration);
    }

    private static Extensions getExtensions(PostgresqlConnectionConfiguration configuration) {
        Extensions extensions = Extensions.from(configuration.getExtensions());
        if (configuration.isAutodetectExtensions()) {
            extensions = extensions.mergeWith(Extensions.autodetect());
        }
        return extensions;
    }

    public Mono<io.r2dbc.postgresql.api.PostgresqlConnection> create() {
        if (this.isReplicationConnection()) {
            throw new UnsupportedOperationException("Cannot create replication connection through create(). Use replication() method instead.");
        }
        return this.doCreateConnection(false, this.configuration.getOptions()).cast(io.r2dbc.postgresql.api.PostgresqlConnection.class);
    }

    public Mono<PostgresqlReplicationConnection> replication() {
        Map<String, String> options = this.configuration.getOptions();
        options = options == null ? new HashMap<String, String>() : new HashMap<String, String>(options);
        options.put(REPLICATION_OPTION, REPLICATION_DATABASE);
        return this.doCreateConnection(true, options).map(DefaultPostgresqlReplicationConnection::new);
    }

    private Mono<PostgresqlConnection> doCreateConnection(boolean forReplication, @Nullable Map<String, String> options) {
        SSLConfig sslConfig = this.configuration.getSslConfig();
        Predicate<Throwable> isAuthSpecificationError = e -> e instanceof ExceptionFactory.PostgresqlAuthenticationFailure;
        return this.tryConnectWithConfig(sslConfig, options).onErrorResume(isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.ALLOW), e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.REQUIRE), options).onErrorResume(sslAuthError -> {
            e.addSuppressed((Throwable)sslAuthError);
            return Mono.error((Throwable)e);
        })).onErrorResume(isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.PREFER), e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.DISABLE), options).onErrorResume(sslAuthError -> {
            e.addSuppressed((Throwable)sslAuthError);
            return Mono.error((Throwable)e);
        })).flatMap(client -> {
            DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator());
            Mono<IsolationLevel> isolationLevelMono = Mono.just((Object)IsolationLevel.READ_COMMITTED);
            if (!forReplication) {
                isolationLevelMono = this.getIsolationLevel((Client)client, codecs);
            }
            return isolationLevelMono.map(it -> new PostgresqlConnection((Client)client, codecs, DefaultPortalNameSupplier.INSTANCE, new IndefiniteStatementCache((Client)client), (IsolationLevel)it, this.configuration.isForceBinary())).delayUntil(connection -> this.prepareConnection((PostgresqlConnection)connection, client.getByteBufAllocator(), codecs)).onErrorResume(throwable -> this.closeWithError((Client)client, (Throwable)throwable));
        }).onErrorMap(this::cannotConnect);
    }

    private boolean isReplicationConnection() {
        Map<String, String> options = this.configuration.getOptions();
        return options != null && REPLICATION_DATABASE.equalsIgnoreCase(options.get(REPLICATION_OPTION));
    }

    private Mono<Client> tryConnectWithConfig(SSLConfig sslConfig, @Nullable Map<String, String> options) {
        return this.clientFactory.apply(sslConfig).delayUntil(client -> StartupMessageFlow.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), options).handle(ExceptionFactory.INSTANCE::handleErrorResponse)).cast(Client.class);
    }

    private Publisher<?> prepareConnection(PostgresqlConnection connection, ByteBufAllocator byteBufAllocator, DefaultCodecs codecs) {
        ArrayList<Mono<Void>> publishers = new ArrayList<Mono<Void>>();
        publishers.add(this.setSchema(connection));
        this.extensions.forEach(CodecRegistrar.class, it -> publishers.add((Mono<Void>)it.register(connection, byteBufAllocator, codecs)));
        return Flux.concat(publishers).then();
    }

    private Mono<PostgresqlConnection> closeWithError(Client client, Throwable throwable) {
        return client.close().then(Mono.error((Throwable)throwable));
    }

    private Throwable cannotConnect(Throwable throwable) {
        if (throwable instanceof R2dbcException) {
            return throwable;
        }
        return new PostgresConnectionException(String.format("Cannot connect to %s", this.endpoint), throwable);
    }

    public ConnectionFactoryMetadata getMetadata() {
        return PostgresqlConnectionFactoryMetadata.INSTANCE;
    }

    PostgresqlConnectionConfiguration getConfiguration() {
        return this.configuration;
    }

    public String toString() {
        return "PostgresqlConnectionFactory{clientFactory=" + this.clientFactory + ", configuration=" + this.configuration + ", extensions=" + this.extensions + '}';
    }

    private AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
        if (PasswordAuthenticationHandler.supports(message)) {
            CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
            return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
        }
        if (SASLAuthenticationHandler.supports(message)) {
            CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
            return new SASLAuthenticationHandler(password, this.configuration.getUsername());
        }
        throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
    }

    private Mono<IsolationLevel> getIsolationLevel(Client client, DefaultCodecs codecs) {
        return new SimpleQueryPostgresqlStatement(client, codecs, "SHOW TRANSACTION ISOLATION LEVEL").execute().flatMap(it -> it.map((row, rowMetadata) -> {
            String level = (String)row.get(0, String.class);
            if (level == null) {
                return IsolationLevel.READ_COMMITTED;
            }
            return IsolationLevel.valueOf((String)level.toUpperCase(Locale.US));
        })).defaultIfEmpty((Object)IsolationLevel.READ_COMMITTED).last();
    }

    private Mono<Void> setSchema(PostgresqlConnection connection) {
        if (this.configuration.getSchema() == null) {
            return Mono.empty();
        }
        return connection.createStatement(String.format("SET SCHEMA '%s'", this.configuration.getSchema())).execute().then();
    }

    static class PostgresConnectionException
    extends R2dbcNonTransientResourceException {
        public PostgresConnectionException(String msg, @Nullable Throwable cause) {
            super(msg, cause);
        }
    }
}

