/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import io.streamnative.pulsar.handlers.kop.schemaregistry.DummyOptionsCORSProcessor;
import io.streamnative.pulsar.handlers.kop.schemaregistry.HttpRequestProcessor;
import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryChannelInitializer;
import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryHandler;
import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryRequestAuthenticator;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.SchemaStorageAccessor;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorageAccessor;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.SchemaStorageException;
import io.streamnative.pulsar.handlers.kop.schemaregistry.resources.CompatibilityResource;
import io.streamnative.pulsar.handlers.kop.schemaregistry.resources.ConfigResource;
import io.streamnative.pulsar.handlers.kop.schemaregistry.resources.SchemaResource;
import io.streamnative.pulsar.handlers.kop.schemaregistry.resources.SubjectResource;
import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal;
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
import io.streamnative.pulsar.handlers.kop.security.auth.Resource;
import io.streamnative.pulsar.handlers.kop.security.auth.ResourceType;
import io.streamnative.pulsar.handlers.kop.security.auth.SimpleAclAuthorizer;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaRegistryManager {
    private static final Logger log = LoggerFactory.getLogger(SchemaRegistryManager.class);
    private final KafkaServiceConfiguration kafkaConfig;
    private final PulsarService pulsar;
    private final SchemaRegistryRequestAuthenticator schemaRegistryRequestAuthenticator;
    private final PulsarClient pulsarClient;

    public SchemaRegistryManager(KafkaServiceConfiguration kafkaConfig, PulsarService pulsar, AuthenticationService authenticationService) {
        this.kafkaConfig = kafkaConfig;
        this.pulsarClient = SystemTopicClient.createPulsarClient(pulsar, kafkaConfig, ___ -> {});
        this.pulsar = pulsar;
        SimpleAclAuthorizer authorizer = new SimpleAclAuthorizer(pulsar);
        this.schemaRegistryRequestAuthenticator = new HttpRequestAuthenticator(this.kafkaConfig, authenticationService, authorizer);
    }

    public InetSocketAddress getAddress() {
        return new InetSocketAddress(this.kafkaConfig.getKopSchemaRegistryPort());
    }

    public Optional<SchemaRegistryChannelInitializer> build() throws Exception {
        if (!this.kafkaConfig.isKopSchemaRegistryEnable()) {
            return Optional.empty();
        }
        PulsarAdmin pulsarAdmin = this.pulsar.getAdminClient();
        SchemaRegistryHandler handler = new SchemaRegistryHandler();
        PulsarSchemaStorageAccessor schemaStorage = new PulsarSchemaStorageAccessor(tenant -> {
            try {
                BrokerService brokerService = this.pulsar.getBrokerService();
                ClusterData clusterData = ClusterData.builder().serviceUrl(brokerService.getPulsar().getWebServiceAddress()).serviceUrlTls(brokerService.getPulsar().getWebServiceAddressTls()).brokerServiceUrl(brokerService.getPulsar().getBrokerServiceUrl()).brokerServiceUrlTls(brokerService.getPulsar().getBrokerServiceUrlTls()).build();
                MetadataUtils.createSchemaRegistryMetadataIfMissing(tenant, pulsarAdmin, clusterData, this.kafkaConfig);
                return this.pulsarClient;
            }
            catch (Exception err) {
                throw new IllegalStateException(err);
            }
        }, this.kafkaConfig.getKopSchemaRegistryNamespace(), this.kafkaConfig.getKopSchemaRegistryTopicName());
        new SchemaResource((SchemaStorageAccessor)schemaStorage, this.schemaRegistryRequestAuthenticator).register(handler);
        new SubjectResource((SchemaStorageAccessor)schemaStorage, this.schemaRegistryRequestAuthenticator).register(handler);
        new ConfigResource((SchemaStorageAccessor)schemaStorage, this.schemaRegistryRequestAuthenticator).register(handler);
        new CompatibilityResource((SchemaStorageAccessor)schemaStorage, this.schemaRegistryRequestAuthenticator).register(handler);
        handler.addProcessor((HttpRequestProcessor)new DummyOptionsCORSProcessor());
        return Optional.of(new SchemaRegistryChannelInitializer(handler));
    }

    public void close() {
        try {
            this.pulsarClient.close();
        }
        catch (PulsarClientException err) {
            log.error("Error while shutting down", (Throwable)err);
        }
    }

    public static class HttpRequestAuthenticator
    implements SchemaRegistryRequestAuthenticator {
        private final KafkaServiceConfiguration kafkaConfig;
        private final AuthenticationService authenticationService;
        private final Authorizer authorizer;

        public String authenticate(FullHttpRequest request) throws SchemaStorageException {
            if (!this.kafkaConfig.isAuthenticationEnabled()) {
                return this.kafkaConfig.getKafkaMetadataTenant();
            }
            String authenticationHeader = request.headers().get((CharSequence)HttpHeaderNames.AUTHORIZATION, "");
            UsernamePasswordPair usernamePasswordPair = this.parseUsernamePassword(authenticationHeader);
            String username = usernamePasswordPair.username;
            String password = usernamePasswordPair.password;
            AuthenticationProvider authenticationProvider = this.authenticationService.getAuthenticationProvider("token");
            if (authenticationProvider == null) {
                throw new SchemaStorageException("Pulsar is not configured for Token auth");
            }
            try {
                String tenant;
                AuthenticationState authState = authenticationProvider.newAuthState(AuthData.of((byte[])password.getBytes(StandardCharsets.UTF_8)), null, null);
                String role = authState.getAuthRole();
                if (this.kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
                    log.debug("SchemaRegistry Authenticated username {} role {} using tenant {} for data", new Object[]{username, role, username});
                    tenant = username;
                } else {
                    log.debug("SchemaRegistry Authenticated username {} role {} using system tenant {} for data", new Object[]{username, role, this.kafkaConfig.getKafkaMetadataTenant()});
                    tenant = this.kafkaConfig.getKafkaMetadataTenant();
                }
                this.performAuthorizationValidation(username, role, tenant);
                return tenant;
            }
            catch (AuthenticationException err) {
                throw new SchemaStorageException((Throwable)err);
            }
        }

        private void performAuthorizationValidation(String username, String role, String tenant) throws SchemaStorageException {
            if (this.kafkaConfig.isAuthorizationEnabled() && this.kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
                KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", role, username, null);
                String topicName = MetadataUtils.constructSchemaRegistryTopicName(tenant, this.kafkaConfig);
                try {
                    Boolean tenantExists = this.authorizer.canAccessTenantAsync(kafkaPrincipal, Resource.of(ResourceType.TENANT, tenant)).get();
                    if (tenantExists == null || !tenantExists.booleanValue()) {
                        log.debug("SchemaRegistry username {} role {} tenant {} does not exist", new Object[]{username, role, tenant, topicName});
                        throw new SchemaStorageException("Role " + role + " cannot access topic " + topicName + " tenant " + tenant + " does not exist (wrong username?)", HttpResponseStatus.FORBIDDEN.code());
                    }
                    Boolean hasPermission = this.authorizer.canProduceAsync(kafkaPrincipal, Resource.of(ResourceType.TOPIC, topicName)).get();
                    if (hasPermission == null || !hasPermission.booleanValue()) {
                        log.debug("SchemaRegistry username {} role {} tenant {} cannot access topic {}", new Object[]{username, role, tenant, topicName});
                        throw new SchemaStorageException("Role " + role + " cannot access topic " + topicName, HttpResponseStatus.FORBIDDEN.code());
                    }
                }
                catch (ExecutionException err) {
                    throw new SchemaStorageException(err.getCause());
                }
                catch (InterruptedException err) {
                    throw new SchemaStorageException((Throwable)err);
                }
            }
        }

        private UsernamePasswordPair parseUsernamePassword(String authenticationHeader) throws SchemaStorageException {
            if (authenticationHeader.isEmpty()) {
                throw new SchemaStorageException("Missing AUTHORIZATION header", HttpResponseStatus.UNAUTHORIZED.code());
            }
            if (!authenticationHeader.startsWith("Basic ")) {
                throw new SchemaStorageException("Bad authentication scheme, only Basic is supported", HttpResponseStatus.UNAUTHORIZED.code());
            }
            String strippedAuthenticationHeader = authenticationHeader.substring("Basic ".length());
            String usernamePassword = new String(Base64.getDecoder().decode(strippedAuthenticationHeader), StandardCharsets.UTF_8);
            int colon = usernamePassword.indexOf(":");
            if (colon <= 0) {
                throw new SchemaStorageException("Bad authentication header", HttpResponseStatus.BAD_REQUEST.code());
            }
            String rawUsername = usernamePassword.substring(0, colon);
            String rawPassword = usernamePassword.substring(colon + 1);
            if (!rawPassword.startsWith("token:")) {
                throw new SchemaStorageException("Password must start with 'token:'", HttpResponseStatus.UNAUTHORIZED.code());
            }
            String token = rawPassword.substring("token:".length());
            UsernamePasswordPair usernamePasswordPair = new UsernamePasswordPair(rawUsername, token);
            return usernamePasswordPair;
        }

        public HttpRequestAuthenticator(KafkaServiceConfiguration kafkaConfig, AuthenticationService authenticationService, Authorizer authorizer) {
            this.kafkaConfig = kafkaConfig;
            this.authenticationService = authenticationService;
            this.authorizer = authorizer;
        }
    }

    private static final class UsernamePasswordPair {
        final String username;
        final String password;

        public UsernamePasswordPair(String username, String password) {
            this.username = username;
            this.password = password;
        }
    }
}

