/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.mssql;

import io.r2dbc.mssql.ExceptionFactory;
import io.r2dbc.mssql.LoginConfiguration;
import io.r2dbc.mssql.client.Client;
import io.r2dbc.mssql.client.ssl.SslState;
import io.r2dbc.mssql.message.ClientMessage;
import io.r2dbc.mssql.message.Message;
import io.r2dbc.mssql.message.TDSVersion;
import io.r2dbc.mssql.message.tds.ProtocolException;
import io.r2dbc.mssql.message.token.DoneToken;
import io.r2dbc.mssql.message.token.ErrorToken;
import io.r2dbc.mssql.message.token.Login7;
import io.r2dbc.mssql.message.token.Prelogin;
import io.r2dbc.mssql.util.Assert;
import io.r2dbc.mssql.util.PredicateUtils;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

final class LoginFlow {
    private LoginFlow() {
    }

    static Flux<Message> exchange(Client client, LoginConfiguration login) {
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(login, "Login must not be null");
        EmitterProcessor requestProcessor = EmitterProcessor.create();
        FluxSink requests = requestProcessor.sink();
        Prelogin.Builder builder = Prelogin.builder();
        if (login.getConnectionId() != null) {
            builder.withConnectionId(login.getConnectionId());
        }
        if (login.useSsl()) {
            builder.withEncryptionEnabled();
        }
        AtomicReference preloginResponse = new AtomicReference();
        Prelogin request = builder.build();
        Predicate[] predicateArray = new Predicate[4];
        predicateArray[0] = Prelogin.class::isInstance;
        predicateArray[1] = SslState.class::isInstance;
        predicateArray[2] = DoneToken.class::isInstance;
        predicateArray[3] = ErrorToken.class::isInstance;
        return client.exchange((Publisher<? extends ClientMessage>)requestProcessor.startWith((Object[])new ClientMessage[]{request}), DoneToken::isDone).filter(PredicateUtils.or(predicateArray)).handle((message, sink) -> {
            try {
                if (message instanceof Prelogin) {
                    Prelogin response = (Prelogin)message;
                    preloginResponse.set(response);
                    Prelogin.Encryption encryption = response.getRequiredToken(Prelogin.Encryption.class);
                    if (!encryption.requiresSslHandshake()) {
                        requests.next((Object)LoginFlow.createLoginMessage(login, response));
                    }
                    return;
                }
                if (message instanceof SslState && message == SslState.NEGOTIATED) {
                    Prelogin prelogin = (Prelogin)preloginResponse.get();
                    requests.next((Object)LoginFlow.createLoginMessage(login, prelogin));
                    return;
                }
                if (DoneToken.isDone(message)) {
                    sink.next(message);
                    sink.complete();
                    return;
                }
                if (message instanceof ErrorToken) {
                    sink.error((Throwable)ExceptionFactory.createException((ErrorToken)message, ""));
                    client.close().subscribe();
                    return;
                }
                throw ProtocolException.unsupported(String.format("Unexpected login flow message: %s", message));
            }
            catch (Exception e) {
                requests.error((Throwable)e);
                sink.error((Throwable)e);
                return;
            }
        });
    }

    private static Login7 createLoginMessage(LoginConfiguration login, Prelogin prelogin) {
        Prelogin.Version serverVersion = prelogin.getRequiredToken(Prelogin.Version.class);
        TDSVersion tdsVersion = LoginFlow.getTdsVersion(serverVersion.getVersion());
        return login.asBuilder().tdsVersion(tdsVersion).build();
    }

    private static TDSVersion getTdsVersion(int serverVersion) {
        if (serverVersion >= 11) {
            return TDSVersion.VER_DENALI;
        }
        if (serverVersion >= 10) {
            return TDSVersion.VER_KATMAI;
        }
        if (serverVersion >= 9) {
            return TDSVersion.VER_YUKON;
        }
        throw ProtocolException.unsupported("Unsupported server version: " + serverVersion);
    }
}

