/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.security;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal;
import io.streamnative.pulsar.handlers.kop.security.PlainSaslServer;
import io.streamnative.pulsar.handlers.kop.security.Session;
import io.streamnative.pulsar.handlers.kop.security.oauth.KopOAuthBearerSaslServer;
import io.streamnative.pulsar.handlers.kop.security.oauth.KopOAuthBearerUnsecuredValidatorCallbackHandler;
import io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import lombok.NonNull;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.KopResponseUtils;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslAuthenticator {
    private static final Logger log = LoggerFactory.getLogger(SaslAuthenticator.class);
    public static final String USER_NAME_PROP = "username";
    public static final String AUTH_DATA_SOURCE_PROP = "authDataSource";
    public static final String AUTHENTICATION_SERVER_OBJ = "authenticationServerObj";
    private static final byte[] EMPTY_BUFFER = new byte[0];
    private final AuthenticationService authenticationService;
    private final PulsarAdmin admin;
    private final Set<String> allowedMechanisms;
    private final Set<String> proxyRoles;
    private final AuthenticateCallbackHandler oauth2CallbackHandler;
    private State state = State.HANDSHAKE_OR_VERSIONS_REQUEST;
    private SaslServer saslServer;
    private Session session;
    private boolean enableKafkaSaslAuthenticateHeaders;
    private ByteBuf authenticationFailureResponse = null;
    private ChannelHandlerContext ctx = null;
    private String defaultKafkaMetadataTenant;

    private static <T> T safeGetProperty(SaslServer saslServer, String propertyName) {
        try {
            Object property = saslServer.getNegotiatedProperty(propertyName);
            if (property == null) {
                throw new NoExpectedPropertyException(propertyName, "property not found");
            }
            return (T)property;
        }
        catch (ClassCastException e) {
            throw new NoExpectedPropertyException(propertyName, e.getMessage());
        }
    }

    private void buildResponseOnAuthenticateFailure(RequestHeader header, AbstractRequest request, AbstractResponse abstractResponse, Exception e) {
        this.authenticationFailureResponse = SaslAuthenticator.buildKafkaResponse(header, request, abstractResponse, e);
    }

    public void sendAuthenticationFailureResponse(Consumer<Future<? super Void>> listener) {
        if (this.authenticationFailureResponse == null) {
            listener.accept(null);
            return;
        }
        this.sendKafkaResponse(this.authenticationFailureResponse, (GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener)listener::accept));
        this.authenticationFailureResponse = null;
    }

    public SaslAuthenticator(PulsarService pulsarService, Set<String> allowedMechanisms, KafkaServiceConfiguration config) throws PulsarServerException {
        this.authenticationService = pulsarService.getBrokerService().getAuthenticationService();
        this.admin = pulsarService.getAdminClient();
        this.allowedMechanisms = allowedMechanisms;
        this.proxyRoles = config.getProxyRoles();
        this.oauth2CallbackHandler = allowedMechanisms.contains("OAUTHBEARER") ? this.createOAuth2CallbackHandler(config) : null;
        this.enableKafkaSaslAuthenticateHeaders = false;
        this.defaultKafkaMetadataTenant = config.getKafkaMetadataTenant();
    }

    public SaslAuthenticator(PulsarAdmin admin, AuthenticationService authenticationService, Set<String> allowedMechanisms, KafkaServiceConfiguration config) throws PulsarServerException {
        this.authenticationService = authenticationService;
        this.proxyRoles = config.getProxyRoles();
        this.admin = admin;
        this.allowedMechanisms = allowedMechanisms;
        this.oauth2CallbackHandler = allowedMechanisms.contains("OAUTHBEARER") ? this.createOAuth2CallbackHandler(config) : null;
        this.enableKafkaSaslAuthenticateHeaders = false;
    }

    public void authenticate(ChannelHandlerContext ctx, ByteBuf requestBuf, BiConsumer<Long, Throwable> registerRequestParseLatency, BiConsumer<ApiKeys, Long> registerRequestLatency, Function<Session, Boolean> tenantAccessValidationFunction) throws AuthenticationException {
        Preconditions.checkArgument((requestBuf.readableBytes() > 0 ? 1 : 0) != 0);
        if (log.isDebugEnabled()) {
            log.debug("Authenticate {} {} {}", new Object[]{ctx, this.saslServer, this.state});
        }
        this.ctx = ctx;
        if (this.saslServer != null && this.saslServer.isComplete()) {
            this.setState(State.COMPLETE);
            return;
        }
        switch (this.state) {
            case HANDSHAKE_OR_VERSIONS_REQUEST: 
            case HANDSHAKE_REQUEST: {
                this.handleKafkaRequest(ctx, requestBuf, registerRequestParseLatency, registerRequestLatency);
                break;
            }
            case AUTHENTICATE: {
                this.handleSaslToken(ctx, requestBuf, registerRequestParseLatency, registerRequestLatency, tenantAccessValidationFunction);
                if (!this.saslServer.isComplete()) break;
                this.setState(State.COMPLETE);
                break;
            }
        }
    }

    public boolean complete() {
        return this.state == State.COMPLETE;
    }

    public Session session() {
        if (this.saslServer != null && this.complete()) {
            return this.session;
        }
        return null;
    }

    public void reset() {
        this.state = State.HANDSHAKE_OR_VERSIONS_REQUEST;
        if (this.saslServer != null) {
            try {
                this.saslServer.dispose();
            }
            catch (SaslException saslException) {
                // empty catch block
            }
            this.saslServer = null;
        }
    }

    private void setState(State state) {
        this.state = state;
        if (log.isDebugEnabled()) {
            log.debug("Set SaslAuthenticator's state to {}", (Object)state);
        }
    }

    @NonNull
    private AuthenticateCallbackHandler createOAuth2CallbackHandler(@NonNull KafkaServiceConfiguration config) {
        KopOAuthBearerUnsecuredValidatorCallbackHandler handler;
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        if (config.getKopOauth2AuthenticateCallbackHandler() != null) {
            String className = config.getKopOauth2AuthenticateCallbackHandler();
            try {
                Class<?> clazz = Class.forName(className);
                handler = (AuthenticateCallbackHandler)clazz.newInstance();
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException("Failed to load class " + className + ": " + e.getMessage());
            }
            catch (IllegalAccessException | InstantiationException e) {
                throw new RuntimeException("Failed to create new instance of " + className + ": " + e.getMessage());
            }
            catch (ClassCastException e) {
                throw new RuntimeException("Failed to cast " + className + ": " + e.getMessage());
            }
        } else {
            handler = new KopOAuthBearerUnsecuredValidatorCallbackHandler();
        }
        Properties props = config.getKopOauth2Properties();
        HashMap oauth2Configs = new HashMap();
        props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> oauth2Configs.put(key.toString(), value.toString())));
        AppConfigurationEntry appConfigurationEntry = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, oauth2Configs);
        HashMap<String, AuthenticationService> configs = new HashMap<String, AuthenticationService>();
        configs.put(AUTHENTICATION_SERVER_OBJ, this.getAuthenticationService());
        handler.configure(configs, "OAUTHBEARER", Collections.singletonList(appConfigurationEntry));
        return handler;
    }

    private void createSaslServer(String mechanism) throws AuthenticationException {
        if (mechanism.equals("PLAIN")) {
            this.saslServer = new PlainSaslServer(this.authenticationService, this.admin, this.proxyRoles);
        } else if (mechanism.equals("OAUTHBEARER")) {
            if (this.oauth2CallbackHandler == null) {
                throw new IllegalArgumentException("No OAuth2CallbackHandler found when mechanism is OAUTHBEARER");
            }
            this.saslServer = new KopOAuthBearerSaslServer((CallbackHandler)this.oauth2CallbackHandler, this.defaultKafkaMetadataTenant);
        } else {
            throw new AuthenticationException("KoP doesn't support '" + mechanism + "' mechanism");
        }
    }

    private static boolean isUnsupportedApiVersionsRequest(RequestHeader header) {
        return header.apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(header.apiVersion());
    }

    private AbstractRequest parseRequest(RequestHeader header, ByteBuffer nioBuffer) {
        if (SaslAuthenticator.isUnsupportedApiVersionsRequest(header)) {
            ApiVersionsRequestData data = new ApiVersionsRequestData();
            return new ApiVersionsRequest(data, header.apiVersion());
        }
        ApiKeys apiKey = header.apiKey();
        short apiVersion = header.apiVersion();
        return AbstractRequest.parseRequest((ApiKeys)apiKey, (short)apiVersion, (ByteBuffer)nioBuffer).request;
    }

    private void handleKafkaRequest(ChannelHandlerContext ctx, ByteBuf requestBuf, BiConsumer<Long, Throwable> registerRequestParseLatency, BiConsumer<ApiKeys, Long> registerRequestLatency) throws AuthenticationException {
        long beforeParseTime = MathUtils.nowInNano();
        ByteBuffer nioBuffer = requestBuf.nioBuffer();
        RequestHeader header = RequestHeader.parse((ByteBuffer)nioBuffer);
        ApiKeys apiKey = header.apiKey();
        AbstractRequest body = this.parseRequest(header, nioBuffer);
        registerRequestParseLatency.accept(beforeParseTime, null);
        if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE) {
            throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
        }
        if (log.isDebugEnabled()) {
            log.debug("Handling Kafka request header {}, body {}", (Object)header, (Object)body);
        }
        long startProcessRequestTime = MathUtils.nowInNano();
        if (apiKey == ApiKeys.API_VERSIONS) {
            this.handleApiVersionsRequest(ctx, header, (ApiVersionsRequest)body, startProcessRequestTime, registerRequestLatency);
        } else {
            String clientMechanism = this.handleHandshakeRequest(ctx, header, (SaslHandshakeRequest)body, startProcessRequestTime, registerRequestLatency);
            try {
                this.createSaslServer(clientMechanism);
            }
            catch (AuthenticationException e) {
                this.authenticationFailureResponse = SaslAuthenticator.buildKafkaResponse(header, body, null, (Exception)((Object)e));
                throw e;
            }
            this.setState(State.AUTHENTICATE);
        }
    }

    private static void sendKafkaResponse(ChannelHandlerContext ctx, RequestHeader header, AbstractRequest request, AbstractResponse abstractResponse, Exception e) {
        ByteBuf response = SaslAuthenticator.buildKafkaResponse(header, request, abstractResponse, e);
        ctx.channel().eventLoop().execute(() -> ctx.channel().writeAndFlush((Object)response));
    }

    private void sendKafkaResponse(ByteBuf response, GenericFutureListener<? extends Future<? super Void>> listener) {
        this.ctx.channel().eventLoop().execute(() -> this.ctx.channel().writeAndFlush((Object)response).addListener(listener));
    }

    private static ByteBuf buildKafkaResponse(RequestHeader header, AbstractRequest request, AbstractResponse abstractResponse, Exception e) {
        short version = header.apiVersion();
        ApiKeys apiKey = header.apiKey();
        AbstractResponse backResponse = e != null && abstractResponse == null ? request.getErrorResponse((Throwable)e) : abstractResponse;
        if (apiKey == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(version)) {
            version = ApiKeys.API_VERSIONS.oldestVersion();
        }
        return KopResponseUtils.serializeResponse(version, header.toResponseHeader(), backResponse);
    }

    private void handleSaslToken(ChannelHandlerContext ctx, ByteBuf requestBuf, BiConsumer<Long, Throwable> registerRequestParseLatency, BiConsumer<ApiKeys, Long> registerRequestLatency, Function<Session, Boolean> tenantAccessValidationFunction) throws AuthenticationException {
        block16: {
            long timeBeforeParse = MathUtils.nowInNano();
            ByteBuffer nioBuffer = requestBuf.nioBuffer();
            if (!this.enableKafkaSaslAuthenticateHeaders) {
                try {
                    byte[] clientToken = new byte[nioBuffer.remaining()];
                    nioBuffer.get(clientToken, 0, clientToken.length);
                    byte[] response = this.saslServer.evaluateResponse(clientToken);
                    if (response != null) {
                        Session newSession;
                        ByteBuf byteBuf = Unpooled.wrappedBuffer((byte[])response);
                        if (this.saslServer.isComplete()) {
                            newSession = new Session(new KafkaPrincipal("User", this.saslServer.getAuthorizationID(), (String)SaslAuthenticator.safeGetProperty(this.saslServer, USER_NAME_PROP), (AuthenticationDataSource)SaslAuthenticator.safeGetProperty(this.saslServer, AUTH_DATA_SOURCE_PROP)), "old-clientId");
                            if (!tenantAccessValidationFunction.apply(newSession).booleanValue()) {
                                throw new AuthenticationException("User is not allowed to access this tenant");
                            }
                        } else {
                            newSession = null;
                        }
                        ctx.channel().writeAndFlush((Object)byteBuf).addListener(future -> {
                            if (!future.isSuccess()) {
                                log.error("[{}] Failed to write {}", (Object)ctx.channel(), (Object)future.cause());
                            } else {
                                this.session = newSession;
                                if (log.isDebugEnabled()) {
                                    log.debug("Send sasl response to SASL_HANDSHAKE v0 old client {} successfully, session {}", (Object)ctx.channel(), (Object)this.session);
                                }
                            }
                        });
                    }
                    break block16;
                }
                catch (SaslException e) {
                    if (log.isDebugEnabled()) {
                        log.debug("Authenticate failed for SASL_HANDSHAKE v0 old client, reason {}", (Object)e.getMessage());
                    }
                    break block16;
                }
            }
            RequestHeader header = RequestHeader.parse((ByteBuffer)nioBuffer);
            ApiKeys apiKey = header.apiKey();
            short version = header.apiVersion();
            AbstractRequest request = AbstractRequest.parseRequest((ApiKeys)apiKey, (short)version, (ByteBuffer)nioBuffer).request;
            registerRequestParseLatency.accept(timeBeforeParse, null);
            long startProcessTime = MathUtils.nowInNano();
            if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
                AuthenticationException e = new AuthenticationException("Unexpected Kafka request of type " + apiKey + " during SASL authentication");
                registerRequestLatency.accept(apiKey, startProcessTime);
                this.buildResponseOnAuthenticateFailure(header, request, null, (Exception)((Object)e));
                throw e;
            }
            if (!apiKey.isVersionSupported(version)) {
                throw new AuthenticationException("Version " + version + " is not supported for apiKey " + apiKey);
            }
            SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest)request;
            try {
                byte[] responseBuf;
                byte[] responseToken = this.saslServer.evaluateResponse(saslAuthenticateRequest.data().authBytes());
                byte[] byArray = responseBuf = responseToken == null ? EMPTY_BUFFER : responseToken;
                if (this.saslServer.isComplete()) {
                    String pulsarRole = this.saslServer.getAuthorizationID();
                    this.session = new Session(new KafkaPrincipal("User", pulsarRole, (String)SaslAuthenticator.safeGetProperty(this.saslServer, USER_NAME_PROP), (AuthenticationDataSource)SaslAuthenticator.safeGetProperty(this.saslServer, AUTH_DATA_SOURCE_PROP)), header.clientId());
                    if (log.isDebugEnabled()) {
                        log.debug("Authenticate successfully for client, header {}, request {}, session {} username {}, authDataSource {}", new Object[]{header, saslAuthenticateRequest, this.session, this.saslServer.getNegotiatedProperty(USER_NAME_PROP), this.saslServer.getNegotiatedProperty(AUTH_DATA_SOURCE_PROP)});
                    }
                    if (!tenantAccessValidationFunction.apply(this.session).booleanValue()) {
                        AuthenticationException e = new AuthenticationException("User is not allowed to access this tenant");
                        registerRequestLatency.accept(apiKey, startProcessTime);
                        this.buildResponseOnAuthenticateFailure(header, request, null, (Exception)((Object)e));
                        throw e;
                    }
                }
                registerRequestLatency.accept(apiKey, startProcessTime);
                SaslAuthenticator.sendKafkaResponse(ctx, header, request, (AbstractResponse)KafkaResponseUtils.newSaslAuthenticate(responseBuf), null);
            }
            catch (SaslAuthenticationException e) {
                this.buildResponseOnAuthenticateFailure(header, request, (AbstractResponse)KafkaResponseUtils.newSaslAuthenticate(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()), (Exception)((Object)e));
                throw e;
            }
            catch (SaslException e) {
                registerRequestLatency.accept(apiKey, startProcessTime);
                this.buildResponseOnAuthenticateFailure(header, request, (AbstractResponse)KafkaResponseUtils.newSaslAuthenticate(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()), e);
                this.sendAuthenticationFailureResponse(__ -> {});
                if (!log.isDebugEnabled()) break block16;
                log.debug("Authenticate failed for client, header {}, request {}, reason {}", new Object[]{header, saslAuthenticateRequest, e.getMessage(), e});
            }
        }
    }

    private void handleApiVersionsRequest(ChannelHandlerContext ctx, RequestHeader header, ApiVersionsRequest request, Long startProcessTime, BiConsumer<ApiKeys, Long> registerRequestLatency) throws AuthenticationException {
        if (this.state != State.HANDSHAKE_OR_VERSIONS_REQUEST) {
            throw new IllegalStateException("Receive ApiVersions request", this.state, State.HANDSHAKE_OR_VERSIONS_REQUEST);
        }
        if (request.hasUnsupportedRequestVersion()) {
            registerRequestLatency.accept(header.apiKey(), startProcessTime);
            SaslAuthenticator.sendKafkaResponse(ctx, header, (AbstractRequest)request, (AbstractResponse)request.getErrorResponse(0, (Throwable)Errors.UNSUPPORTED_VERSION.exception()), null);
        } else {
            ApiVersionsResponse versionsResponse = ApiVersionsResponse.defaultApiVersionsResponse((ApiMessageType.ListenerType)ApiMessageType.ListenerType.BROKER);
            registerRequestLatency.accept(header.apiKey(), startProcessTime);
            SaslAuthenticator.sendKafkaResponse(ctx, header, (AbstractRequest)request, (AbstractResponse)versionsResponse, null);
            this.setState(State.HANDSHAKE_REQUEST);
        }
    }

    @NonNull
    private String handleHandshakeRequest(ChannelHandlerContext ctx, RequestHeader header, SaslHandshakeRequest request, Long startProcessTime, BiConsumer<ApiKeys, Long> registerRequestLatency) throws AuthenticationException {
        String mechanism = request.data().mechanism();
        if (mechanism == null) {
            AuthenticationException e = new AuthenticationException("client's mechanism is null");
            registerRequestLatency.accept(header.apiKey(), startProcessTime);
            SaslAuthenticator.sendKafkaResponse(ctx, header, (AbstractRequest)request, null, (Exception)((Object)e));
            throw e;
        }
        if (header.apiVersion() >= 1) {
            this.enableKafkaSaslAuthenticateHeaders = true;
        }
        if (this.allowedMechanisms.contains(mechanism)) {
            if (log.isDebugEnabled()) {
                log.debug("Using SASL mechanism '{}' provided by client", (Object)mechanism);
            }
            registerRequestLatency.accept(header.apiKey(), startProcessTime);
            SaslAuthenticator.sendKafkaResponse(ctx, header, (AbstractRequest)request, (AbstractResponse)KafkaResponseUtils.newSaslHandshake(Errors.NONE, this.allowedMechanisms), null);
            return mechanism;
        }
        if (log.isDebugEnabled()) {
            log.debug("SASL mechanism '{}' requested by client is not supported", (Object)mechanism);
        }
        registerRequestLatency.accept(header.apiKey(), startProcessTime);
        this.buildResponseOnAuthenticateFailure(header, (AbstractRequest)request, (AbstractResponse)KafkaResponseUtils.newSaslHandshake(Errors.UNSUPPORTED_SASL_MECHANISM, this.allowedMechanisms), null);
        throw new UnsupportedSaslMechanismException(mechanism);
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public static class NoExpectedPropertyException
    extends AuthenticationException {
        public NoExpectedPropertyException(String propertyName, String msg) {
            super("No expected property for " + propertyName + ": " + msg);
        }
    }

    private static enum State {
        HANDSHAKE_OR_VERSIONS_REQUEST,
        HANDSHAKE_REQUEST,
        AUTHENTICATE,
        COMPLETE;

    }

    public static class IllegalStateException
    extends AuthenticationException {
        public IllegalStateException(String msg, State actualState, State expectedState) {
            super(msg + " actual state: " + actualState + " expected state: " + expectedState);
        }
    }

    public static class UnsupportedSaslMechanismException
    extends AuthenticationException {
        public UnsupportedSaslMechanismException(String mechanism) {
            super("SASL mechanism '" + mechanism + "' requested by client is not supported");
        }
    }
}

