/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.connection;

import com.mulesoft.connectors.commons.template.connection.ConnectorConnection;
import com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException;
import com.mulesoft.connectors.kafka.internal.error.exception.InputTooLargeException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidTopicPartitionException;
import com.mulesoft.connectors.kafka.internal.error.exception.KafkaProducerFencedException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import com.mulesoft.connectors.kafka.internal.error.exception.UnexpectedException;
import com.mulesoft.connectors.kafka.internal.model.serializer.InputStreamSerializer;
import java.io.InputStream;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Serializer;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.extension.api.connectivity.TransactionalConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerConnection
implements TransactionalConnection,
ConnectorConnection {
    private static final String PUBLISH = "publish";
    private static final String PRODUCER_FENCED_TEMPLATE = "The producer was fenced when beginning the transaction %s";
    private static final String CLUSTER_VERSION_NOT_SUPPORTING_TRANSACTIONS = "The Kafka version of the cluster does not support transactions";
    private static final Logger logger = LoggerFactory.getLogger(ProducerConnection.class);
    private final Producer<InputStream, InputStream> defaultProducer;
    private final Properties defaultProperties;
    private Producer<InputStream, InputStream> transactionProducer;
    private volatile String transactionId;
    private Function<Properties, Producer<InputStream, InputStream>> producerFactory = props -> new KafkaProducer(props, (Serializer)new InputStreamSerializer(), (Serializer)new InputStreamSerializer());

    public ProducerConnection(Producer<InputStream, InputStream> defaultProducer, Properties defaultProperties) {
        this.defaultProducer = defaultProducer;
        this.defaultProperties = defaultProperties;
    }

    public void publish(String topic, Integer partition, InputStream key, InputStream message, Map<String, InputStream> headers, Consumer<RecordMetadata> successListener, Consumer<Throwable> errorListener) {
        this.getProducer().send(this.createProducerRecord(topic, partition, key, message, headers), (metadata, exception) -> {
            if (exception == null) {
                successListener.accept(metadata);
            } else {
                logger.error("There was an error while publishing a message", (Throwable)exception);
                if (exception instanceof org.apache.kafka.common.errors.AuthenticationException) {
                    errorListener.accept((Throwable)((Object)new AuthenticationException(exception, this)));
                } else if (exception instanceof AuthorizationException) {
                    errorListener.accept((Throwable)((Object)new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(exception)));
                } else if (exception instanceof TimeoutException) {
                    errorListener.accept((Throwable)((Object)new OperationTimeoutException(PUBLISH, exception)));
                } else if (exception instanceof RecordTooLargeException) {
                    errorListener.accept((Throwable)((Object)new InputTooLargeException(exception.getMessage(), exception)));
                } else {
                    errorListener.accept(new UnexpectedException(PUBLISH, exception));
                }
            }
        });
    }

    private ProducerRecord<InputStream, InputStream> createProducerRecord(String topic, Integer partition, InputStream key, InputStream message, Map<String, InputStream> headers) {
        try {
            return new ProducerRecord(topic, partition, Long.valueOf(System.currentTimeMillis()), (Object)key, (Object)message, headers != null ? (Iterable)headers.entrySet().stream().map(entry -> new RecordHeader((String)entry.getKey(), IOUtils.toByteArray((InputStream)((InputStream)entry.getValue())))).collect(Collectors.toList()) : null);
        }
        catch (IllegalArgumentException e) {
            throw new InvalidTopicPartitionException("Failed to construct message. Partition or topic is invalid.", e);
        }
    }

    public void begin() throws TransactionException {
        this.transactionId = UUID.randomUUID().toString();
        try {
            this.createTransactionalProducer();
        }
        catch (RuntimeException e) {
            this.wrapThrowTx(e);
        }
    }

    private void createTransactionalProducer() throws TransactionException {
        try {
            Properties transactionProducerProperties = new Properties();
            transactionProducerProperties.putAll((Map<?, ?>)this.defaultProperties);
            transactionProducerProperties.put("transactional.id", this.transactionId);
            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(KafkaProducer.class.getClassLoader());
            Producer<InputStream, InputStream> transactionalProducer = this.producerFactory.apply(transactionProducerProperties);
            Thread.currentThread().setContextClassLoader(cl);
            transactionalProducer.initTransactions();
            transactionalProducer.beginTransaction();
            this.transactionProducer = transactionalProducer;
        }
        catch (AuthorizationException e) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e);
        }
        catch (org.apache.kafka.common.errors.AuthenticationException e) {
            throw new AuthenticationException(e, this);
        }
        catch (ProducerFencedException e) {
            throw new KafkaProducerFencedException(String.format(PRODUCER_FENCED_TEMPLATE, this.transactionId), e);
        }
        catch (UnsupportedVersionException e) {
            logger.error(CLUSTER_VERSION_NOT_SUPPORTING_TRANSACTIONS);
            throw new UnexpectedException(e);
        }
        catch (IllegalStateException | KafkaException e) {
            throw new TransactionException(e);
        }
    }

    public void commit() throws TransactionException {
        try {
            this.handleTransactionalException(Producer::commitTransaction);
        }
        catch (RuntimeException e) {
            this.wrapThrowTx(e);
        }
    }

    public void rollback() throws TransactionException {
        try {
            this.handleTransactionalException(Producer::abortTransaction);
        }
        catch (RuntimeException e) {
            this.wrapThrowTx(e);
        }
    }

    private void handleTransactionalException(Consumer<Producer<InputStream, InputStream>> transaction) throws TransactionException {
        try {
            transaction.accept(this.transactionProducer);
        }
        catch (AuthorizationException e) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e);
        }
        catch (org.apache.kafka.common.errors.AuthenticationException e) {
            throw new AuthenticationException(e, this);
        }
        catch (ProducerFencedException e) {
            throw new KafkaProducerFencedException(String.format(PRODUCER_FENCED_TEMPLATE, this.transactionId), e);
        }
        catch (UnsupportedVersionException e) {
            logger.error(CLUSTER_VERSION_NOT_SUPPORTING_TRANSACTIONS);
            throw new UnexpectedException(e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException(PUBLISH, e);
        }
        catch (IllegalStateException | KafkaException e) {
            throw new TransactionException(e);
        }
        finally {
            this.transactionProducer.close();
            this.transactionProducer = null;
        }
    }

    public void disconnect() {
        Optional.ofNullable(this.transactionProducer).ifPresent(Producer::close);
    }

    public void validate() {
        this.getProducer().metrics();
    }

    protected void wrapThrowTx(Throwable exception) throws TransactionException {
        throw new TransactionException(exception);
    }

    private Producer<InputStream, InputStream> getProducer() {
        return Optional.ofNullable(this.transactionProducer).orElse(this.defaultProducer);
    }
}

