/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka010.common.security.authenticator;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.kafka010.common.KafkaException;
import org.apache.kafka010.common.errors.AuthenticationException;
import org.apache.kafka010.common.errors.IllegalSaslStateException;
import org.apache.kafka010.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka010.common.errors.UnsupportedVersionException;
import org.apache.kafka010.common.network.Authenticator;
import org.apache.kafka010.common.network.Mode;
import org.apache.kafka010.common.network.NetworkReceive;
import org.apache.kafka010.common.network.NetworkSend;
import org.apache.kafka010.common.network.Send;
import org.apache.kafka010.common.network.TransportLayer;
import org.apache.kafka010.common.protocol.ApiKeys;
import org.apache.kafka010.common.protocol.Errors;
import org.apache.kafka010.common.protocol.Protocol;
import org.apache.kafka010.common.protocol.types.SchemaException;
import org.apache.kafka010.common.requests.AbstractRequest;
import org.apache.kafka010.common.requests.AbstractResponse;
import org.apache.kafka010.common.requests.ApiVersionsResponse;
import org.apache.kafka010.common.requests.RequestHeader;
import org.apache.kafka010.common.requests.SaslHandshakeRequest;
import org.apache.kafka010.common.requests.SaslHandshakeResponse;
import org.apache.kafka010.common.security.auth.AuthCallbackHandler;
import org.apache.kafka010.common.security.auth.KafkaPrincipal;
import org.apache.kafka010.common.security.auth.PrincipalBuilder;
import org.apache.kafka010.common.security.authenticator.CredentialCache;
import org.apache.kafka010.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka010.common.security.authenticator.SaslServerCallbackHandler;
import org.apache.kafka010.common.security.kerberos.KerberosName;
import org.apache.kafka010.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka010.common.security.scram.ScramCredential;
import org.apache.kafka010.common.security.scram.ScramMechanism;
import org.apache.kafka010.common.security.scram.ScramServerCallbackHandler;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslServerAuthenticator
implements Authenticator {
    private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
    private final String node;
    private final Configuration jaasConfig;
    private final Subject subject;
    private final KerberosShortNamer kerberosNamer;
    private final int maxReceiveSize;
    private final String host;
    private final CredentialCache credentialCache;
    private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
    private SaslState pendingSaslState = null;
    private SaslServer saslServer;
    private String saslMechanism;
    private AuthCallbackHandler callbackHandler;
    private TransportLayer transportLayer;
    private Set<String> enabledMechanisms;
    private Map<String, ?> configs;
    private NetworkReceive netInBuffer;
    private Send netOutBuffer;

    public SaslServerAuthenticator(String node, Configuration jaasConfig, Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize, CredentialCache credentialCache) throws IOException {
        if (subject == null) {
            throw new IllegalArgumentException("subject cannot be null");
        }
        this.node = node;
        this.jaasConfig = jaasConfig;
        this.subject = subject;
        this.kerberosNamer = kerberosNameParser;
        this.maxReceiveSize = maxReceiveSize;
        this.host = host;
        this.credentialCache = credentialCache;
    }

    @Override
    public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
        this.transportLayer = transportLayer;
        this.configs = configs;
        List enabledMechanisms = (List)this.configs.get("sasl.enabled.mechanisms");
        if (enabledMechanisms == null || enabledMechanisms.isEmpty()) {
            throw new IllegalArgumentException("No SASL mechanisms are enabled");
        }
        this.enabledMechanisms = new HashSet<String>(enabledMechanisms);
    }

    private void createSaslServer(String mechanism) throws IOException {
        this.saslMechanism = mechanism;
        this.callbackHandler = !ScramMechanism.isScram(mechanism) ? new SaslServerCallbackHandler(this.jaasConfig, this.kerberosNamer) : new ScramServerCallbackHandler(this.credentialCache.cache(mechanism, ScramCredential.class));
        this.callbackHandler.configure(this.configs, Mode.SERVER, this.subject, this.saslMechanism);
        if (mechanism.equals("GSSAPI")) {
            this.saslServer = this.createSaslKerberosServer(this.callbackHandler, this.configs, this.subject);
        } else {
            try {
                this.saslServer = Subject.doAs(this.subject, new PrivilegedExceptionAction<SaslServer>(){

                    @Override
                    public SaslServer run() throws SaslException {
                        return Sasl.createSaslServer(SaslServerAuthenticator.this.saslMechanism, "kafka", SaslServerAuthenticator.this.host, SaslServerAuthenticator.this.configs, SaslServerAuthenticator.this.callbackHandler);
                    }
                });
            }
            catch (PrivilegedActionException e) {
                throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
            }
        }
    }

    private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs, Subject subject) throws IOException {
        KerberosName kerberosName;
        String servicePrincipal = SaslClientAuthenticator.firstPrincipal(subject);
        try {
            kerberosName = KerberosName.parse(servicePrincipal);
        }
        catch (IllegalArgumentException e) {
            throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
        }
        final String servicePrincipalName = kerberosName.serviceName();
        final String serviceHostname = kerberosName.hostName();
        LOG.debug("Creating SaslServer for {} with mechanism {}", (Object)kerberosName, (Object)this.saslMechanism);
        boolean usingNativeJgss = Boolean.getBoolean("sun.security.jgss.native");
        if (usingNativeJgss) {
            try {
                GSSManager manager = GSSManager.getInstance();
                Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
                GSSName gssName = manager.createName(servicePrincipalName + "@" + serviceHostname, GSSName.NT_HOSTBASED_SERVICE);
                GSSCredential cred = manager.createCredential(gssName, Integer.MAX_VALUE, krb5Mechanism, 2);
                subject.getPrivateCredentials().add(cred);
            }
            catch (GSSException ex) {
                LOG.warn("Cannot add private credential to subject; clients authentication may fail", ex);
            }
        }
        try {
            return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>(){

                @Override
                public SaslServer run() throws SaslException {
                    return Sasl.createSaslServer(SaslServerAuthenticator.this.saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler);
                }
            });
        }
        catch (PrivilegedActionException e) {
            throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
        }
    }

    @Override
    public void authenticate() throws IOException {
        if (this.netOutBuffer != null && !this.flushNetOutBufferAndUpdateInterestOps()) {
            return;
        }
        if (this.saslServer != null && this.saslServer.isComplete()) {
            this.setSaslState(SaslState.COMPLETE);
            return;
        }
        if (this.netInBuffer == null) {
            this.netInBuffer = new NetworkReceive(this.maxReceiveSize, this.node);
        }
        this.netInBuffer.readFrom(this.transportLayer);
        if (this.netInBuffer.complete()) {
            this.netInBuffer.payload().rewind();
            byte[] clientToken = new byte[this.netInBuffer.payload().remaining()];
            this.netInBuffer.payload().get(clientToken, 0, clientToken.length);
            this.netInBuffer = null;
            try {
                switch (this.saslState) {
                    case HANDSHAKE_REQUEST: {
                        this.handleKafkaRequest(clientToken);
                        break;
                    }
                    case GSSAPI_OR_HANDSHAKE_REQUEST: {
                        if (this.handleKafkaRequest(clientToken)) break;
                    }
                    case AUTHENTICATE: {
                        byte[] response = this.saslServer.evaluateResponse(clientToken);
                        if (response != null) {
                            this.netOutBuffer = new NetworkSend(this.node, ByteBuffer.wrap(response));
                            this.flushNetOutBufferAndUpdateInterestOps();
                        }
                        if (!this.saslServer.isComplete()) break;
                        this.setSaslState(SaslState.COMPLETE);
                        break;
                    }
                }
            }
            catch (Exception e) {
                this.setSaslState(SaslState.FAILED);
                throw new IOException(e);
            }
        }
    }

    @Override
    public Principal principal() {
        return new KafkaPrincipal("User", this.saslServer.getAuthorizationID());
    }

    @Override
    public boolean complete() {
        return this.saslState == SaslState.COMPLETE;
    }

    @Override
    public void close() throws IOException {
        if (this.saslServer != null) {
            this.saslServer.dispose();
        }
        if (this.callbackHandler != null) {
            this.callbackHandler.close();
        }
    }

    private void setSaslState(SaslState saslState) {
        if (this.netOutBuffer != null && !this.netOutBuffer.completed()) {
            this.pendingSaslState = saslState;
        } else {
            this.pendingSaslState = null;
            this.saslState = saslState;
            LOG.debug("Set SASL server state to {}", (Object)saslState);
        }
    }

    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
        boolean flushedCompletely = this.flushNetOutBuffer();
        if (flushedCompletely) {
            this.transportLayer.removeInterestOps(4);
            if (this.pendingSaslState != null) {
                this.setSaslState(this.pendingSaslState);
            }
        } else {
            this.transportLayer.addInterestOps(4);
        }
        return flushedCompletely;
    }

    private boolean flushNetOutBuffer() throws IOException {
        if (!this.netOutBuffer.completed()) {
            this.netOutBuffer.writeTo(this.transportLayer);
        }
        return this.netOutBuffer.completed();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
        boolean isKafkaRequest = false;
        String clientMechanism = null;
        try {
            ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
            RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
            ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
            this.setSaslState(SaslState.HANDSHAKE_REQUEST);
            isKafkaRequest = true;
            if (!Protocol.apiVersionSupported(requestHeader.apiKey(), requestHeader.apiVersion())) {
                if (apiKey != ApiKeys.API_VERSIONS) throw new UnsupportedVersionException("Version " + requestHeader.apiVersion() + " is not supported for apiKey " + (Object)((Object)apiKey));
                this.sendKafkaResponse(requestHeader, ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION));
            } else {
                AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer);
                LOG.debug("Handle Kafka request {}", (Object)apiKey);
                switch (apiKey) {
                    case API_VERSIONS: {
                        this.handleApiVersionsRequest(requestHeader);
                        break;
                    }
                    case SASL_HANDSHAKE: {
                        clientMechanism = this.handleHandshakeRequest(requestHeader, (SaslHandshakeRequest)request);
                        break;
                    }
                    default: {
                        throw new IllegalSaslStateException("Unexpected Kafka request of type " + (Object)((Object)apiKey) + " during SASL handshake.");
                    }
                }
            }
        }
        catch (IllegalArgumentException | SchemaException e) {
            if (this.saslState != SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) throw e;
            if (LOG.isDebugEnabled()) {
                StringBuilder tokenBuilder = new StringBuilder();
                for (byte b : requestBytes) {
                    tokenBuilder.append(String.format("%02x", b));
                    if (tokenBuilder.length() >= 20) break;
                }
                LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", (Object)requestBytes.length, (Object)tokenBuilder);
            }
            if (!this.enabledMechanisms.contains("GSSAPI")) throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
            LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
            clientMechanism = "GSSAPI";
        }
        if (clientMechanism == null) return isKafkaRequest;
        this.createSaslServer(clientMechanism);
        this.setSaslState(SaslState.AUTHENTICATE);
        return isKafkaRequest;
    }

    private String handleHandshakeRequest(RequestHeader requestHeader, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
        String clientMechanism = handshakeRequest.mechanism();
        if (this.enabledMechanisms.contains(clientMechanism)) {
            LOG.debug("Using SASL mechanism '{}' provided by client", (Object)clientMechanism);
            this.sendKafkaResponse(requestHeader, new SaslHandshakeResponse(0, this.enabledMechanisms));
            return clientMechanism;
        }
        LOG.debug("SASL mechanism '{}' requested by client is not supported", (Object)clientMechanism);
        this.sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM.code(), this.enabledMechanisms));
        throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism);
    }

    private void handleApiVersionsRequest(RequestHeader requestHeader) throws IOException, UnsupportedSaslMechanismException {
        this.sendKafkaResponse(requestHeader, ApiVersionsResponse.API_VERSIONS_RESPONSE);
    }

    private void sendKafkaResponse(RequestHeader requestHeader, AbstractResponse response) throws IOException {
        this.netOutBuffer = response.toSend(this.node, requestHeader);
        this.flushNetOutBufferAndUpdateInterestOps();
    }

    public static enum SaslState {
        GSSAPI_OR_HANDSHAKE_REQUEST,
        HANDSHAKE_REQUEST,
        AUTHENTICATE,
        COMPLETE,
        FAILED;

    }
}

