/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.kafka.KafkaProducerHealthCheck;
import org.apache.camel.component.kafka.KafkaTransactionSynchronization;
import org.apache.camel.component.kafka.producer.support.DelegatingCallback;
import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
import org.apache.camel.component.kafka.producer.support.KafkaProducerMetadataCallBack;
import org.apache.camel.component.kafka.producer.support.KeyValueHolderIterator;
import org.apache.camel.component.kafka.producer.support.ProducerUtil;
import org.apache.camel.component.kafka.producer.support.PropagatedHeadersProvider;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.health.HealthCheck;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.URISupport;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
    private Producer kafkaProducer;
    private KafkaProducerHealthCheck producerHealthCheck;
    private WritableHealthCheckRepository healthCheckRepository;
    private String clientId;
    private String transactionId;
    private final KafkaEndpoint endpoint;
    private final KafkaConfiguration configuration;
    private ExecutorService workerPool;
    private boolean shutdownWorkerPool;
    private volatile boolean closeKafkaProducer;
    private final String endpointTopic;
    private final Integer configPartitionKey;
    private final String configKey;

    public KafkaProducer(KafkaEndpoint endpoint) {
        super((Endpoint)endpoint);
        this.endpoint = endpoint;
        this.configuration = endpoint.getConfiguration();
        this.endpointTopic = URISupport.extractRemainderPath((URI)URI.create(endpoint.getEndpointUri()), (boolean)true);
        this.configPartitionKey = this.configuration.getPartitionKey();
        this.configKey = this.configuration.getKey();
    }

    public KafkaEndpoint getEndpoint() {
        return (KafkaEndpoint)super.getEndpoint();
    }

    Properties getProps() {
        Properties props = this.configuration.createProducerProperties();
        this.endpoint.updateClassProperties(props);
        String brokers = this.endpoint.getKafkaClientFactory().getBrokers(this.configuration);
        if (brokers != null) {
            props.put("bootstrap.servers", brokers);
        }
        return props;
    }

    public boolean isReady() {
        boolean ready = true;
        try {
            if (this.kafkaProducer instanceof org.apache.kafka.clients.producer.KafkaProducer) {
                org.apache.kafka.clients.producer.KafkaProducer kp = (org.apache.kafka.clients.producer.KafkaProducer)this.kafkaProducer;
                Sender sender = (Sender)ReflectionHelper.getField((Field)kp.getClass().getDeclaredField("sender"), (Object)kp);
                NetworkClient nc = (NetworkClient)ReflectionHelper.getField((Field)sender.getClass().getDeclaredField("client"), (Object)sender);
                LOG.trace("Health-Check calling org.apache.kafka.clients.NetworkClient.hasReadyNode");
                ready = nc.hasReadyNodes(System.currentTimeMillis());
            }
        }
        catch (Exception e) {
            LOG.debug("Cannot check hasReadyNodes on KafkaProducer client (NetworkClient) due to " + e.getMessage() + ". This exception is ignored.", (Throwable)e);
        }
        return ready;
    }

    public Producer getKafkaProducer() {
        return this.kafkaProducer;
    }

    public void setKafkaProducer(Producer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    public ExecutorService getWorkerPool() {
        return this.workerPool;
    }

    public void setWorkerPool(ExecutorService workerPool) {
        this.workerPool = workerPool;
    }

    protected void doStart() throws Exception {
        Properties props = this.getProps();
        if (this.kafkaProducer == null) {
            this.createProducer(props);
        }
        this.transactionId = props.getProperty("transactional.id");
        if (this.transactionId != null) {
            this.kafkaProducer.initTransactions();
        }
        if (!this.configuration.isSynchronous() && this.workerPool == null) {
            if (this.configuration.getWorkerPool() != null) {
                this.workerPool = this.configuration.getWorkerPool();
                this.shutdownWorkerPool = false;
            } else {
                this.workerPool = this.endpoint.createProducerExecutor();
                this.shutdownWorkerPool = true;
            }
        }
        if (this.clientId == null) {
            this.clientId = this.getProps().getProperty("client.id");
            if (this.clientId == null) {
                try {
                    this.clientId = (String)ReflectionHelper.getField((Field)this.kafkaProducer.getClass().getDeclaredField("clientId"), (Object)this.kafkaProducer);
                }
                catch (Exception e) {
                    this.clientId = "";
                }
            }
        }
        this.healthCheckRepository = (WritableHealthCheckRepository)HealthCheckHelper.getHealthCheckRepository((CamelContext)this.endpoint.getCamelContext(), (String)"components", WritableHealthCheckRepository.class);
        if (this.healthCheckRepository != null) {
            this.producerHealthCheck = new KafkaProducerHealthCheck(this, this.clientId);
            this.healthCheckRepository.addHealthCheck((HealthCheck)this.producerHealthCheck);
        }
    }

    protected void doStop() throws Exception {
        if (this.healthCheckRepository != null && this.producerHealthCheck != null) {
            this.healthCheckRepository.removeHealthCheck((HealthCheck)this.producerHealthCheck);
            this.producerHealthCheck = null;
        }
        if (this.kafkaProducer != null && this.closeKafkaProducer) {
            LOG.debug("Closing KafkaProducer: {}", (Object)this.kafkaProducer);
            this.kafkaProducer.close();
            this.kafkaProducer = null;
        }
        if (this.shutdownWorkerPool && this.workerPool != null) {
            int timeout = this.configuration.getShutdownTimeout();
            LOG.debug("Shutting down Kafka producer worker threads with timeout {} millis", (Object)timeout);
            this.endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(this.workerPool, (long)timeout);
            this.workerPool = null;
        }
    }

    private void createProducer(Properties props) {
        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
            LOG.trace("Creating KafkaProducer");
            this.kafkaProducer = this.endpoint.getKafkaClientFactory().getProducer(props);
            this.closeKafkaProducer = true;
        }
        finally {
            Thread.currentThread().setContextClassLoader(threadClassLoader);
        }
        LOG.debug("Created KafkaProducer: {}", (Object)this.kafkaProducer);
    }

    protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(Exchange exchange, Message message) {
        String topic = this.evaluateTopic(message);
        PropagatedHeadersProvider propagatedHeadersProvider = new PropagatedHeadersProvider(this, this.configuration, exchange, message);
        Object body = message.getBody();
        Iterator<Object> iterator = this.getObjectIterator(body);
        return new KeyValueHolderIterator(iterator, exchange, this.configuration, topic, propagatedHeadersProvider);
    }

    protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message) {
        String topic = this.evaluateTopic(message);
        Long timeStamp = null;
        Object overrideTimeStamp = message.removeHeader("kafka.OVERRIDE_TIMESTAMP");
        if (overrideTimeStamp != null) {
            timeStamp = (Long)exchange.getContext().getTypeConverter().convertTo(Long.class, exchange, overrideTimeStamp);
            LOG.debug("Using override TimeStamp: {}", overrideTimeStamp);
        }
        List<Header> propagatedHeaders = this.getPropagatedHeaders(exchange, message);
        Integer msgPartitionKey = this.getOverridePartitionKey(message);
        Object msgKey = this.getOverrideKey(message);
        if (msgKey != null) {
            msgKey = ProducerUtil.tryConvertToSerializedType(exchange, msgKey, this.configuration.getKeySerializer());
        }
        Object value = ProducerUtil.tryConvertToSerializedType(exchange, message.getBody(), this.configuration.getValueSerializer());
        return new ProducerRecord(topic, msgPartitionKey, timeStamp, msgKey, value, propagatedHeaders);
    }

    private Object getOverrideKey(Message message) {
        if (ObjectHelper.isEmpty((String)this.configKey)) {
            return message.getHeader("kafka.KEY");
        }
        return this.configKey;
    }

    private Integer getOverridePartitionKey(Message message) {
        if (ObjectHelper.isEmpty((Object)this.configPartitionKey)) {
            return (Integer)message.getHeader("kafka.PARTITION_KEY", Integer.class);
        }
        return this.configPartitionKey;
    }

    protected KeyValueHolder<Object, ProducerRecord<Object, Object>> createKeyValueHolder(Exchange exchange, Message message) {
        ProducerRecord<Object, Object> record = this.createRecord(exchange, message);
        return new KeyValueHolder((Object)exchange, record);
    }

    private String evaluateTopic(Message message) {
        Object overrideTopic = message.removeHeader("kafka.OVERRIDE_TOPIC");
        String overrideTopicString = (String)this.endpoint.getCamelContext().getTypeConverter().tryConvertTo(String.class, overrideTopic);
        if (overrideTopicString != null) {
            LOG.debug("Using override topic: {}", (Object)overrideTopicString);
            return overrideTopicString;
        }
        String topic = this.configuration.getTopic();
        if (topic != null) {
            return topic;
        }
        return this.endpointTopic;
    }

    private boolean isIterable(Object body) {
        return body instanceof Iterable || body instanceof Iterator;
    }

    private Iterator<Object> getObjectIterator(Object body) {
        Iterator iterator = null;
        if (body instanceof Iterable) {
            iterator = ((Iterable)body).iterator();
        } else if (body instanceof Iterator) {
            iterator = (Iterator)body;
        }
        return iterator;
    }

    public List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
        Map messageHeaders = message.getHeaders();
        ArrayList<Header> propagatedHeaders = new ArrayList<Header>(messageHeaders.size());
        for (Map.Entry<String, Object> entry : messageHeaders.entrySet()) {
            RecordHeader recordHeader = this.getRecordHeader(entry, exchange);
            if (recordHeader == null) continue;
            propagatedHeaders.add((Header)recordHeader);
        }
        return propagatedHeaders;
    }

    private boolean shouldBeFiltered(String key, Object value, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
        return !headerFilterStrategy.applyFilterToCamelHeaders(key, value, exchange);
    }

    private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, Exchange exchange) {
        Object value;
        HeaderFilterStrategy headerFilterStrategy = this.configuration.getHeaderFilterStrategy();
        String key = entry.getKey();
        if (this.shouldBeFiltered(key, value = entry.getValue(), exchange, headerFilterStrategy)) {
            KafkaHeaderSerializer headerSerializer = this.configuration.getHeaderSerializer();
            byte[] headerValue = headerSerializer.serialize(key, value);
            if (headerValue == null) {
                return null;
            }
            return new RecordHeader(key, headerValue);
        }
        return null;
    }

    public void process(Exchange exchange) throws Exception {
        Message message = exchange.getIn();
        if (this.transactionId != null) {
            this.startKafkaTransaction(exchange);
        }
        if (this.isIterable(message.getBody())) {
            this.processIterableSync(exchange, message);
        } else {
            this.processSingleMessageSync(exchange, message);
        }
    }

    private void processSingleMessageSync(Exchange exchange, Message message) throws InterruptedException, ExecutionException {
        ProducerRecord<Object, Object> producerRecord = this.createRecord(exchange, message);
        Future future = this.kafkaProducer.send(producerRecord);
        this.postProcessMetadata(exchange, future);
    }

    private void processIterableSync(Exchange exchange, Message message) throws ExecutionException, InterruptedException {
        ArrayList<KeyValueHolder<Object, Future<RecordMetadata>>> futures = new ArrayList<KeyValueHolder<Object, Future<RecordMetadata>>>();
        Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> recordIterable = this.createRecordIterable(exchange, message);
        ArrayList<RecordMetadata> recordMetadata = new ArrayList<RecordMetadata>();
        if (this.configuration.isRecordMetadata()) {
            exchange.getMessage().setHeader("org.apache.kafka.clients.producer.RecordMetadata", recordMetadata);
        }
        while (recordIterable.hasNext()) {
            KeyValueHolder<Object, ProducerRecord<Object, Object>> exchangeRecord = recordIterable.next();
            ProducerRecord rec = (ProducerRecord)exchangeRecord.getValue();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message to topic: {}, partition: {}, key: {}", new Object[]{rec.topic(), rec.partition(), rec.key()});
            }
            futures.add((KeyValueHolder<Object, Future<RecordMetadata>>)new KeyValueHolder(exchangeRecord.getKey(), (Object)this.kafkaProducer.send(rec)));
        }
        this.postProcessMetadata(futures, recordMetadata);
    }

    private void postProcessMetadata(List<KeyValueHolder<Object, Future<RecordMetadata>>> futures, List<RecordMetadata> metadataList) throws InterruptedException, ExecutionException {
        for (KeyValueHolder<Object, Future<RecordMetadata>> f : futures) {
            metadataList.addAll(this.postProcessMetadata(f.getKey(), (Future)f.getValue()));
        }
    }

    private List<RecordMetadata> postProcessMetadata(Object key, Future<RecordMetadata> f) throws InterruptedException, ExecutionException {
        RecordMetadata metadata = f.get();
        if (this.configuration.isRecordMetadata()) {
            List<RecordMetadata> metadataList = Collections.singletonList(metadata);
            ProducerUtil.setRecordMetadata(key, metadataList);
            return metadataList;
        }
        return Collections.emptyList();
    }

    public boolean process(Exchange exchange, AsyncCallback callback) {
        KafkaProducerCallBack producerCallBack = new KafkaProducerCallBack(exchange, callback, this.workerPool, this.configuration.isRecordMetadata());
        Message message = exchange.getMessage();
        Object body = message.getBody();
        if (this.transactionId != null) {
            this.startKafkaTransaction(exchange);
        }
        try {
            if (this.isIterable(body)) {
                this.processIterableAsync(exchange, producerCallBack, message);
            } else {
                ProducerRecord<Object, Object> record = this.createRecord(exchange, message);
                this.doSend(exchange, record, producerCallBack);
            }
            return producerCallBack.allSent();
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
            callback.done(true);
            return true;
        }
    }

    private void processIterableAsync(Exchange exchange, KafkaProducerCallBack producerCallBack, Message message) {
        Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> c = this.createRecordIterable(exchange, message);
        while (c.hasNext()) {
            this.doSend(c, producerCallBack);
        }
    }

    private void doSend(Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> kvIterator, KafkaProducerCallBack cb) {
        this.doSend(kvIterator.next(), cb);
    }

    private void doSend(KeyValueHolder<Object, ProducerRecord<Object, Object>> exchangeRecord, KafkaProducerCallBack cb) {
        this.doSend(exchangeRecord.getKey(), (ProducerRecord<Object, Object>)((ProducerRecord)exchangeRecord.getValue()), cb);
    }

    private void doSend(Object key, ProducerRecord<Object, Object> record, KafkaProducerCallBack cb) {
        cb.increment();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending message to topic: {}, partition: {}, key: {}", new Object[]{record.topic(), record.partition(), record.key()});
        }
        if (key != null) {
            KafkaProducerMetadataCallBack metadataCallBack = new KafkaProducerMetadataCallBack(key, this.configuration.isRecordMetadata());
            DelegatingCallback delegatingCallback = new DelegatingCallback(cb, metadataCallBack);
            this.kafkaProducer.send(record, (Callback)delegatingCallback);
        } else {
            this.kafkaProducer.send(record, (Callback)cb);
        }
    }

    private void startKafkaTransaction(Exchange exchange) {
        exchange.getUnitOfWork().beginTransactedBy((Object)this.transactionId);
        this.kafkaProducer.beginTransaction();
        exchange.getUnitOfWork().addSynchronization((Synchronization)new KafkaTransactionSynchronization(this.transactionId, this.kafkaProducer));
    }
}

