/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.feature.messaging.kafka.utils;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sap.cds.feature.messaging.kafka.client.deserializer.ErrorHandlingStringDeserializer;
import com.sap.cds.feature.messaging.kafka.utils.KafkaServiceBinding;
import com.sap.cds.feature.messaging.kafka.utils.TrustStoreUtils;
import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.ErrorStatuses;
import com.sap.cds.services.ServiceException;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaUtils {
    private static final Logger logger = LoggerFactory.getLogger(KafkaUtils.class);
    private static final ObjectMapper mapper = new ObjectMapper();

    private KafkaUtils() {
    }

    public static byte[] toBytes(String str) {
        return Objects.requireNonNull(str).getBytes(StandardCharsets.UTF_8);
    }

    public static String toString(byte[] bytes) {
        return new String(Objects.requireNonNull(bytes), StandardCharsets.UTF_8);
    }

    public static Properties getAdminProperties(KafkaServiceBinding credentials) {
        Properties props = new Properties();
        props.put("bootstrap.servers", credentials.getBootstrapServersAuthSSL());
        props.putAll((Map<?, ?>)KafkaUtils.getSSLProperties(credentials));
        return props;
    }

    public static Properties createMessageProducerProperties(KafkaServiceBinding credentials, String groupId, Map<String, Object> config) {
        Properties props = new Properties();
        props.put("bootstrap.servers", credentials.getBootstrapServersAuthSSL());
        props.put("client.id", KafkaUtils.createClientId(groupId));
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        props.putAll((Map<?, ?>)KafkaUtils.getSSLProperties(credentials));
        KafkaUtils.getKafkaProperties("producer", config, props);
        return props;
    }

    public static Properties createMessageConsumerProperties(KafkaServiceBinding credentials, String consumerId, Map<String, Object> config) {
        Properties props = new Properties();
        props.put("bootstrap.servers", credentials.getBootstrapServersAuthSSL());
        props.put("group.id", consumerId);
        props.put("client.id", KafkaUtils.createClientId(consumerId));
        props.put("partition.assignment.strategy", RangeAssignor.class.getName());
        props.put("key.deserializer", ErrorHandlingStringDeserializer.class.getName());
        props.put("value.deserializer", ErrorHandlingStringDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", (Object)false);
        props.put("max.poll.records", (Object)1);
        props.putAll((Map<?, ?>)KafkaUtils.getSSLProperties(credentials));
        KafkaUtils.getKafkaProperties("consumer", config, props);
        return props;
    }

    private static void getKafkaProperties(String key, Map<String, Object> config, Properties props) {
        if (config != null && !config.isEmpty() && config.containsKey(key)) {
            props.putAll((Map<?, ?>)((Map)config.get(key)));
        }
    }

    private static String createClientId(String id) {
        return id + "-" + UUID.randomUUID().toString();
    }

    private static Properties getSSLProperties(KafkaServiceBinding credentials) {
        Properties sslProps = new Properties();
        try {
            String trustStorePass = KafkaUtils.getRandomString(30);
            sslProps.put("security.protocol", "SASL_SSL");
            sslProps.put("sasl.mechanism", "PLAIN");
            if (StringUtils.isNotBlank((CharSequence)credentials.getTokenUrl())) {
                sslProps.put("sasl.jaas.config", KafkaUtils.getJaasString(credentials));
            } else if (StringUtils.isNotBlank((CharSequence)credentials.getPassword()) && StringUtils.isNotBlank((CharSequence)credentials.getUsername())) {
                sslProps.put("sasl.jaas.config", KafkaUtils.getJaasStringForBasicAuth(credentials));
            } else {
                throw new ServiceException((ErrorStatus)ErrorStatuses.SERVER_ERROR, "Failed to retrieve credentials from the Kafka service binding", new Object[0]);
            }
            if (StringUtils.isNotBlank((CharSequence)credentials.getRootCertUrl())) {
                sslProps.put("ssl.truststore.location", TrustStoreUtils.createTruststoreWithRootCertsFromUrl(trustStorePass, credentials.getRootCertUrl()));
                sslProps.put("ssl.truststore.password", trustStorePass);
                sslProps.put("ssl.endpoint.identification.algorithm", "");
            }
        }
        catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
            logger.error("Error while retrieving the SSL properties from the Kafka service binding.", (Throwable)e);
            throw new ServiceException((ErrorStatus)ErrorStatuses.SERVER_ERROR, "Failed to retrieve the SSL properties from the Kafka service binding", new Object[0]);
        }
        return sslProps;
    }

    private static String getRandomString(int length) {
        return RandomStringUtils.random((int)length, (int)32, (int)127, (boolean)false, (boolean)false, null, (Random)new SecureRandom());
    }

    private static String getJaasString(KafkaServiceBinding credentials) throws IOException {
        String base = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
        String username = credentials.getUsername();
        String password = credentials.getPassword();
        String tokenUrl = credentials.getTokenUrl();
        String token = KafkaUtils.getToken(tokenUrl, username, password);
        return base.formatted(username, token);
    }

    private static String getJaasStringForBasicAuth(KafkaServiceBinding credentials) {
        String base = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
        return base.formatted(credentials.getUsername(), credentials.getPassword());
    }

    private static String getToken(String tokenUrl, String user, String password) throws IOException {
        String resp;
        String userCredentials = user + ":" + password;
        String authHeaderValue = "Basic " + Base64.encodeBase64String((byte[])KafkaUtils.toBytes(userCredentials));
        String bodyParams = "grant_type=client_credentials";
        byte[] postData = KafkaUtils.toBytes(bodyParams);
        URL url = new URL(tokenUrl);
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestProperty("Authorization", authHeaderValue);
        conn.setRequestMethod("POST");
        conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
        conn.setRequestProperty("charset", "utf-8");
        conn.setRequestProperty("Content-Length", "" + postData.length);
        conn.setUseCaches(false);
        conn.setDoInput(true);
        conn.setDoOutput(true);
        try (DataOutputStream os = new DataOutputStream(conn.getOutputStream());){
            os.write(postData);
        }
        try (DataInputStream is = new DataInputStream(conn.getInputStream());
             BufferedReader br = new BufferedReader(new InputStreamReader(is));){
            resp = br.lines().collect(Collectors.joining("\n"));
        }
        conn.disconnect();
        return ((JsonNode)mapper.readValue(resp, JsonNode.class)).get("access_token").asText();
    }
}

