/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.producer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.extensions.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultProducerFactory<K, V>
implements ProducerFactory<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultProducerFactory.class);
    private final Duration closeTimeout;
    private final BlockingQueue<PoolableProducer<K, V>> cache;
    private final Map<String, Object> configuration;
    private final ConfirmationMode confirmationMode;
    private final String transactionIdPrefix;
    private final AtomicInteger transactionIdSuffix;
    private final AtomicReference<ShareableProducer<K, V>> nonTransactionalProducer = new AtomicReference();

    protected DefaultProducerFactory(Builder<K, V> builder) {
        builder.validate();
        this.closeTimeout = ((Builder)builder).closeTimeout;
        this.cache = new ArrayBlockingQueue<PoolableProducer<K, V>>(((Builder)builder).producerCacheSize);
        this.configuration = ((Builder)builder).configuration;
        this.confirmationMode = ((Builder)builder).confirmationMode;
        this.transactionIdPrefix = ((Builder)builder).transactionIdPrefix;
        this.transactionIdSuffix = new AtomicInteger();
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder();
    }

    @Override
    public Producer<K, V> createProducer() {
        if (this.confirmationMode.isTransactional()) {
            return this.createTransactionalProducer();
        }
        return this.nonTransactionalProducer.updateAndGet(currentValue -> {
            if (currentValue != null) {
                return currentValue;
            }
            logger.debug("Creating a non-transactional Producer.");
            return new ShareableProducer<K, V>(this.createKafkaProducer(this.configuration));
        });
    }

    @Override
    public ConfirmationMode confirmationMode() {
        return this.confirmationMode;
    }

    public Map<String, Object> configurationProperties() {
        return Collections.unmodifiableMap(this.configuration);
    }

    public String transactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    @Override
    public void shutDown() {
        logger.debug("Shutting down this Producer factory.");
        ProducerDecorator producer = this.nonTransactionalProducer.getAndSet(null);
        if (producer != null) {
            producer.delegate.close(this.closeTimeout);
        }
        producer = (ProducerDecorator)this.cache.poll();
        while (producer != null) {
            try {
                producer.delegate.close(this.closeTimeout);
            }
            catch (Exception e) {
                logger.error("Exception closing producer", (Throwable)e);
            }
            producer = (ProducerDecorator)this.cache.poll();
        }
    }

    private Producer<K, V> createTransactionalProducer() {
        PoolableProducer<K, V> producer = (PoolableProducer<K, V>)this.cache.poll();
        if (producer != null) {
            return producer;
        }
        logger.debug("Creating a transactional Producer.");
        HashMap<String, Object> configs = new HashMap<String, Object>(this.configuration);
        configs.put("transactional.id", this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
        producer = new PoolableProducer<K, V>(this.createKafkaProducer(configs), this.cache, this.closeTimeout);
        producer.initTransactions();
        return producer;
    }

    private Producer<K, V> createKafkaProducer(Map<String, Object> configs) {
        return new KafkaProducer(configs);
    }

    public static final class Builder<K, V> {
        private Duration closeTimeout = Duration.ofSeconds(30L);
        private int producerCacheSize = 10;
        private Map<String, Object> configuration;
        private ConfirmationMode confirmationMode = ConfirmationMode.NONE;
        private String transactionIdPrefix;

        public Builder<K, V> closeTimeout(int timeout, TemporalUnit temporalUnit) {
            BuilderUtils.assertNonNull((Object)temporalUnit, (String)"The temporalUnit may not be null");
            return this.closeTimeout(Duration.of(timeout, temporalUnit));
        }

        public Builder<K, V> closeTimeout(Duration closeTimeout) {
            BuilderUtils.assertThat((Object)closeTimeout, timeoutDuration -> !timeoutDuration.isNegative(), (String)"The closeTimeout should be a positive duration");
            BuilderUtils.assertNonNull((Object)closeTimeout, (String)"The closeTimeout may not be null");
            this.closeTimeout = closeTimeout;
            return this;
        }

        public Builder<K, V> producerCacheSize(int producerCacheSize) {
            BuilderUtils.assertThat((Object)producerCacheSize, size -> size > 0, (String)"The producerCacheSize should be a positive number");
            this.producerCacheSize = producerCacheSize;
            return this;
        }

        public Builder<K, V> configuration(Map<String, Object> configuration) {
            BuilderUtils.assertNonNull(configuration, (String)"The configuration may not be null");
            this.configuration = Collections.unmodifiableMap(new HashMap<String, Object>(configuration));
            return this;
        }

        public Builder<K, V> confirmationMode(ConfirmationMode confirmationMode) {
            BuilderUtils.assertNonNull((Object)((Object)confirmationMode), (String)"ConfirmationMode may not be null");
            this.confirmationMode = confirmationMode;
            return this;
        }

        public Builder<K, V> transactionalIdPrefix(String transactionIdPrefix) {
            BuilderUtils.assertNonNull((Object)transactionIdPrefix, (String)"The transactionalIdPrefix may not be null");
            this.transactionIdPrefix = transactionIdPrefix;
            return this.confirmationMode(ConfirmationMode.TRANSACTIONAL);
        }

        public DefaultProducerFactory<K, V> build() {
            return new DefaultProducerFactory(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.configuration, (String)"The configuration is a hard requirement and should be provided");
        }
    }

    private static final class ShareableProducer<K, V>
    extends ProducerDecorator<K, V> {
        ShareableProducer(Producer<K, V> delegate) {
            super(delegate);
        }

        @Override
        public void close() {
        }

        @Override
        public void close(Duration timeout) {
        }
    }

    private static final class PoolableProducer<K, V>
    extends ProducerDecorator<K, V> {
        private final BlockingQueue<PoolableProducer<K, V>> pool;
        private final Duration closeTimeout;

        PoolableProducer(Producer<K, V> delegate, BlockingQueue<PoolableProducer<K, V>> pool, Duration closeTimeout) {
            super(delegate);
            this.pool = pool;
            this.closeTimeout = closeTimeout;
        }

        @Override
        public void close() {
            this.close(this.closeTimeout);
        }

        @Override
        public void close(Duration timeout) {
            boolean isAdded = this.pool.offer(this);
            if (!isAdded) {
                super.close(timeout);
            }
        }
    }

    private static abstract class ProducerDecorator<K, V>
    implements Producer<K, V> {
        private final Producer<K, V> delegate;

        ProducerDecorator(Producer<K, V> delegate) {
            this.delegate = delegate;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
            return this.delegate.send(producerRecord);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
            return this.delegate.send(producerRecord, callback);
        }

        public void flush() {
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String topic) {
            return this.delegate.partitionsFor(topic);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            this.delegate.beginTransaction();
        }

        @Deprecated
        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
        }

        public void commitTransaction() throws ProducerFencedException {
            this.delegate.commitTransaction();
        }

        public void abortTransaction() throws ProducerFencedException {
            this.delegate.abortTransaction();
        }

        public void close() {
            this.delegate.close();
        }

        public void close(Duration timeout) {
            this.delegate.close(timeout);
        }

        public String toString() {
            return this.getClass().getSimpleName() + " [delegate=" + this.delegate + "]";
        }
    }
}

