/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.Lifecycle;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.Assert;

public class DefaultKafkaProducerFactory<K, V>
implements ProducerFactory<K, V>,
Lifecycle,
DisposableBean {
    private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30;
    private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class);
    private final Map<String, Object> configs;
    private final AtomicInteger transactionIdSuffix = new AtomicInteger();
    private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<CloseSafeProducer<K, V>>();
    private volatile CloseSafeProducer<K, V> producer;
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;
    private int physicalCloseTimeout = 30;
    private String transactionIdPrefix;
    private volatile boolean running;

    public DefaultKafkaProducerFactory(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.configs = new HashMap<String, Object>(configs);
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
    }

    public void setKeySerializer(Serializer<K> keySerializer) {
        this.keySerializer = keySerializer;
    }

    public void setValueSerializer(Serializer<V> valueSerializer) {
        this.valueSerializer = valueSerializer;
    }

    public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
        this.physicalCloseTimeout = physicalCloseTimeout;
    }

    public void setTransactionIdPrefix(String transactionIdPrefix) {
        Assert.notNull((Object)transactionIdPrefix, (String)"'transactionIdPrefix' cannot be null");
        this.transactionIdPrefix = transactionIdPrefix;
        this.enableIdempotentBehaviour();
    }

    private void enableIdempotentBehaviour() {
        Object previousValue = this.configs.putIfAbsent("enable.idempotence", true);
        if (logger.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) {
            logger.debug((Object)"The 'enable.idempotence' is set to false, may result in duplicate messages");
        }
    }

    public Map<String, Object> getConfigurationProperties() {
        return Collections.unmodifiableMap(this.configs);
    }

    @Override
    public boolean transactionCapable() {
        return this.transactionIdPrefix != null;
    }

    public void destroy() throws Exception {
        CloseSafeProducer producer = this.producer;
        this.producer = null;
        if (producer != null) {
            producer.delegate.close((long)this.physicalCloseTimeout, TimeUnit.SECONDS);
        }
        producer = (CloseSafeProducer)this.cache.poll();
        while (producer != null) {
            try {
                producer.delegate.close((long)this.physicalCloseTimeout, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                logger.error((Object)"Exception while closing producer", (Throwable)e);
            }
            producer = (CloseSafeProducer)this.cache.poll();
        }
    }

    public void start() {
        this.running = true;
    }

    public void stop() {
        try {
            this.destroy();
        }
        catch (Exception e) {
            logger.error((Object)"Exception while closing producer", (Throwable)e);
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Producer<K, V> createProducer() {
        if (this.transactionIdPrefix != null) {
            return this.createTransactionalProducer();
        }
        if (this.producer == null) {
            DefaultKafkaProducerFactory defaultKafkaProducerFactory = this;
            synchronized (defaultKafkaProducerFactory) {
                if (this.producer == null) {
                    this.producer = new CloseSafeProducer<K, V>(this.createKafkaProducer());
                }
            }
        }
        return this.producer;
    }

    protected Producer<K, V> createKafkaProducer() {
        return new KafkaProducer(this.configs, this.keySerializer, this.valueSerializer);
    }

    protected Producer<K, V> createTransactionalProducer() {
        Producer producer = (Producer)this.cache.poll();
        if (producer == null) {
            HashMap<String, Object> configs = new HashMap<String, Object>(this.configs);
            configs.put("transactional.id", this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
            producer = new KafkaProducer(configs, this.keySerializer, this.valueSerializer);
            producer.initTransactions();
            return new CloseSafeProducer<K, V>(producer, this.cache);
        }
        return producer;
    }

    private static class CloseSafeProducer<K, V>
    implements Producer<K, V> {
        private final Producer<K, V> delegate;
        private final BlockingQueue<CloseSafeProducer<K, V>> cache;

        CloseSafeProducer(Producer<K, V> delegate) {
            this(delegate, null);
        }

        CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
            this.delegate = delegate;
            this.cache = cache;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            return this.delegate.send(record);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            return this.delegate.send(record, callback);
        }

        public void flush() {
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String topic) {
            return this.delegate.partitionsFor(topic);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            this.delegate.beginTransaction();
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
        }

        public void commitTransaction() throws ProducerFencedException {
            this.delegate.commitTransaction();
        }

        public void abortTransaction() throws ProducerFencedException {
            this.delegate.abortTransaction();
        }

        public void close() {
            if (this.cache != null) {
                this.cache.offer(this);
            }
        }

        public void close(long timeout, TimeUnit unit) {
            this.close();
        }

        public String toString() {
            return "CloseSafeProducer [delegate=" + this.delegate + "]";
        }
    }
}

