/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.connection.provider;

import com.mulesoft.connectors.commons.template.connection.ConnectorConnection;
import com.mulesoft.connectors.commons.template.connection.provider.ConnectorConnectionProvider;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidConfigurationException;
import com.mulesoft.connectors.kafka.internal.metadata.EndpointIdentificationAlgorithmValueProvider;
import com.mulesoft.connectors.kafka.internal.model.serializer.InputStreamDeserializer;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.Provider;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.tls.TlsContextFactory;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.values.OfValues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaConnectionProvider<T extends ConnectorConnection>
implements ConnectorConnectionProvider<T>,
Initialisable,
Disposable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConnectionProvider.class);
    @Parameter
    @Summary(value="The urls that the consumer can try to use to connect to the kafka cluster.")
    @Example(value="localhost:9092,1.2.3.4:9092,abc.def.com:9092")
    @DisplayName(value="Bootstrap Server URLs")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @Placement(order=10)
    private List<String> bootstrapServers;
    @Parameter
    @Placement(tab="Security", order=1)
    @DisplayName(value="TLS Configuration")
    @org.mule.runtime.extension.api.annotation.param.Optional
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TlsContextFactory tlsContext;
    @Parameter
    @Placement(tab="Security", order=100)
    @DisplayName(value="Endpoint identification algorithm")
    @org.mule.runtime.extension.api.annotation.param.Optional
    @Summary(value="The endpoint identification algorithm used by clients to validate server host name.")
    @OfValues(value=EndpointIdentificationAlgorithmValueProvider.class)
    private String endpointIdentificationAlgorithm;
    private final SecurityProtocol plainProtocol;
    private final SecurityProtocol sslProtocol;
    private Properties properties;
    private Function<Properties, Consumer<InputStream, InputStream>> kafkaConsumerFactory = KafkaConsumer::new;

    protected Function<Properties, Consumer<InputStream, InputStream>> getKafkaConsumerFunction() {
        return this.kafkaConsumerFactory;
    }

    protected void testConnectivity(Properties properties) throws ConnectionException {
        Properties consumerProperties = (Properties)properties.clone();
        consumerProperties.setProperty("key.deserializer", InputStreamDeserializer.class.getName());
        consumerProperties.setProperty("value.deserializer", InputStreamDeserializer.class.getName());
        consumerProperties.setProperty("group.id", "connectivity");
        consumerProperties.remove("compression.type");
        consumerProperties.remove("enable.idempotence");
        consumerProperties.remove("delivery.timeout.ms");
        consumerProperties.remove("buffer.memory");
        consumerProperties.remove("key.serializer");
        consumerProperties.remove("max.block.ms");
        consumerProperties.remove("max.in.flight.requests.per.connection");
        consumerProperties.remove("acks");
        consumerProperties.remove("batch.size");
        consumerProperties.remove("retries");
        consumerProperties.remove("max.request.size");
        consumerProperties.remove("value.serializer");
        consumerProperties.remove("linger.ms");
        consumerProperties.remove("metadata.max.idle.ms");
        consumerProperties.remove("delivery.timeout.ms");
        consumerProperties.remove("partitioner.class");
        consumerProperties.remove("transaction.timeout.ms");
        consumerProperties.remove("transactional.id");
        Consumer<InputStream, InputStream> client = this.getKafkaConsumerFunction().apply(consumerProperties);
        try {
            client.listTopics();
            client.close();
        }
        catch (KafkaException ex) {
            throw new ConnectionException((Throwable)ex);
        }
    }

    public KafkaConnectionProvider(SecurityProtocol plainProtocol, SecurityProtocol sslProtocol) {
        this.plainProtocol = plainProtocol;
        this.sslProtocol = sslProtocol;
    }

    public void initialise() throws InitialisationException {
        this.properties = new Properties();
        this.setPropertyAsString("bootstrap.servers", this.bootstrapServers.stream().collect(Collectors.joining(",")));
        if (this.tlsContext != null) {
            LifecycleUtils.initialiseIfNeeded((Object)this.tlsContext);
            if (this.tlsContext.isKeyStoreConfigured()) {
                this.setPropertyAsString("ssl.key.password", this.tlsContext.getKeyStoreConfiguration().getKeyPassword());
                this.setPropertyAsString("ssl.keystore.location", this.tlsContext.getKeyStoreConfiguration().getPath());
                this.setPropertyAsString("ssl.keystore.password", this.tlsContext.getKeyStoreConfiguration().getKeyPassword());
                this.setPropertyAsString("ssl.keystore.type", this.tlsContext.getKeyStoreConfiguration().getType());
                this.setPropertyAsString("ssl.keymanager.algorithm", this.tlsContext.getKeyStoreConfiguration().getAlgorithm());
            }
            if (this.tlsContext.isTrustStoreConfigured()) {
                this.setPropertyAsString("ssl.truststore.location", this.tlsContext.getTrustStoreConfiguration().getPath());
                this.setPropertyAsString("ssl.truststore.password", this.tlsContext.getTrustStoreConfiguration().getPassword());
                this.setPropertyAsString("ssl.truststore.type", this.tlsContext.getTrustStoreConfiguration().getType());
                this.setPropertyAsString("ssl.trustmanager.algorithm", this.tlsContext.getTrustStoreConfiguration().getAlgorithm());
            }
            SSLContext sslContext = null;
            try {
                sslContext = this.tlsContext.createSslContext();
            }
            catch (KeyManagementException | NoSuchAlgorithmException e) {
                throw new MuleRuntimeException((Throwable)e);
            }
            SSLParameters defaultSSLParameters = sslContext.getDefaultSSLParameters();
            this.setProperties("ssl.cipher.suites", defaultSSLParameters.getCipherSuites());
            this.setProperties("ssl.enabled.protocols", this.tlsContext.getEnabledProtocols());
            this.setPropertyAsString("ssl.protocol", sslContext.getProtocol());
            this.setPropertyAsString("ssl.endpoint.identification.algorithm", this.endpointIdentificationAlgorithm != null ? this.endpointIdentificationAlgorithm.replace("disabled", "") : null);
            this.setPropertyAsString("ssl.provider", Optional.ofNullable(sslContext.getProvider()).map(Provider::getName).orElse(null));
        }
        this.setPropertyAsString("security.protocol", (this.tlsContext == null ? this.plainProtocol : this.sslProtocol).name());
        this.initialise(this.properties);
        if (logger.isTraceEnabled()) {
            logger.trace("Logging Kafka Consumer connection properties:");
            this.properties.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> logger.trace("Key: '{}', Value: '{}'", key, (Object)value.toString().replaceAll("password=.*", "password=********"))));
        }
    }

    protected abstract void initialise(Properties var1) throws InitialisationException;

    public T connect() throws ConnectionException {
        this.testConnectivity(this.properties);
        return this.connect(this.properties);
    }

    protected abstract T connect(Properties var1) throws ConnectionException;

    protected void setPropertyAsString(String propertyName, Object value) {
        if (value != null) {
            this.properties.put(propertyName, value.toString());
        }
    }

    private void setProperties(String propertyName, String[] values) {
        if (values != null && values.length > 0) {
            this.properties.put(propertyName, Arrays.asList(values));
        }
    }

    protected void handleConnectionException(KafkaException kafkaException) {
        throw new InvalidConfigurationException("The provided configuration is invalid!", kafkaException);
    }
}

