/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.DefaultTransactionIdSuffixStrategy;
import org.springframework.kafka.core.KafkaResourceFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerPostProcessor;
import org.springframework.kafka.core.TransactionIdSuffixStrategy;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class DefaultKafkaProducerFactory<K, V>
extends KafkaResourceFactory
implements ProducerFactory<K, V>,
ApplicationContextAware,
BeanNameAware,
ApplicationListener<ContextStoppedEvent>,
DisposableBean,
SmartLifecycle {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaProducerFactory.class));
    private final ReentrantLock globalLock = new ReentrantLock();
    private final Map<String, Object> configs;
    private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<String, BlockingQueue<CloseSafeProducer<K, V>>>();
    private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<Thread, CloseSafeProducer<K, V>>();
    private final AtomicInteger epoch = new AtomicInteger();
    private final AtomicInteger clientIdCounter = new AtomicInteger();
    private final List<ProducerFactory.Listener<K, V>> listeners = new ArrayList<ProducerFactory.Listener<K, V>>();
    private final List<ProducerPostProcessor<K, V>> postProcessors = new ArrayList<ProducerPostProcessor<K, V>>();
    private final AtomicBoolean running = new AtomicBoolean();
    private TransactionIdSuffixStrategy transactionIdSuffixStrategy = new DefaultTransactionIdSuffixStrategy(0);
    private @Nullable Supplier<Serializer<K>> keySerializerSupplier;
    private @Nullable Supplier<Serializer<V>> valueSerializerSupplier;
    private @Nullable Supplier<Serializer<K>> rawKeySerializerSupplier;
    private @Nullable Supplier<Serializer<V>> rawValueSerializerSupplier;
    private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
    private @Nullable ApplicationContext applicationContext;
    private String beanName = "not.managed.by.Spring";
    private boolean producerPerThread;
    private long maxAge;
    private boolean configureSerializers = true;
    private volatile @Nullable String transactionIdPrefix;
    private volatile @Nullable String clientIdPrefix;
    private volatile @Nullable CloseSafeProducer<K, V> producer;

    public DefaultKafkaProducerFactory(Map<String, Object> configs) {
        this(configs, () -> null, () -> null);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Serializer<K> keySerializer, @Nullable Serializer<V> valueSerializer) {
        this(configs, () -> keySerializer, () -> valueSerializer, true);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Serializer<K> keySerializer, @Nullable Serializer<V> valueSerializer, boolean configureSerializers) {
        this(configs, () -> keySerializer, () -> valueSerializer, configureSerializers);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<Serializer<K>> keySerializerSupplier, @Nullable Supplier<Serializer<V>> valueSerializerSupplier) {
        this(configs, keySerializerSupplier, valueSerializerSupplier, true);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<Serializer<K>> keySerializerSupplier, @Nullable Supplier<Serializer<V>> valueSerializerSupplier, boolean configureSerializers) {
        String txId;
        this.configs = new ConcurrentHashMap<String, Object>(configs);
        this.configureSerializers = configureSerializers;
        this.keySerializerSupplier = this.keySerializerSupplier(keySerializerSupplier);
        this.valueSerializerSupplier = this.valueSerializerSupplier(valueSerializerSupplier);
        if (this.clientIdPrefix == null && configs.get("client.id") instanceof String) {
            this.clientIdPrefix = (String)configs.get("client.id");
        }
        if (StringUtils.hasText((String)(txId = (String)this.configs.get("transactional.id")))) {
            this.setTransactionIdPrefix(txId);
            this.configs.remove("transactional.id");
        }
    }

    private @Nullable Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Serializer<K>> keySerializerSupplier) {
        this.rawKeySerializerSupplier = keySerializerSupplier;
        if (!this.configureSerializers) {
            return keySerializerSupplier;
        }
        return keySerializerSupplier == null ? () -> null : () -> {
            Serializer serializer = (Serializer)keySerializerSupplier.get();
            if (serializer != null) {
                serializer.configure(this.configs, true);
            }
            return serializer;
        };
    }

    private @Nullable Supplier<Serializer<V>> valueSerializerSupplier(@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {
        this.rawValueSerializerSupplier = valueSerializerSupplier;
        if (!this.configureSerializers) {
            return valueSerializerSupplier;
        }
        return valueSerializerSupplier == null ? () -> null : () -> {
            Serializer serializer = (Serializer)valueSerializerSupplier.get();
            if (serializer != null) {
                serializer.configure(this.configs, false);
            }
            return serializer;
        };
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
        this.keySerializerSupplier = this.keySerializerSupplier(() -> keySerializer);
    }

    public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
        this.valueSerializerSupplier = this.valueSerializerSupplier(() -> valueSerializer);
    }

    public void setKeySerializerSupplier(Supplier<Serializer<K>> keySerializerSupplier) {
        this.keySerializerSupplier = this.keySerializerSupplier(keySerializerSupplier);
    }

    public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSupplier) {
        this.valueSerializerSupplier = this.valueSerializerSupplier(valueSerializerSupplier);
    }

    public void setTransactionIdSuffixStrategy(TransactionIdSuffixStrategy transactionIdSuffixStrategy) {
        Assert.notNull((Object)transactionIdSuffixStrategy, (String)"'transactionIdSuffixStrategy' cannot be null");
        this.transactionIdSuffixStrategy = transactionIdSuffixStrategy;
    }

    public boolean isConfigureSerializers() {
        return this.configureSerializers;
    }

    public void setConfigureSerializers(boolean configureSerializers) {
        this.configureSerializers = configureSerializers;
    }

    public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
        this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout);
    }

    @Override
    public Duration getPhysicalCloseTimeout() {
        return this.physicalCloseTimeout;
    }

    public final void setTransactionIdPrefix(String transactionIdPrefix) {
        Assert.notNull((Object)transactionIdPrefix, (String)"'transactionIdPrefix' cannot be null");
        this.transactionIdPrefix = transactionIdPrefix;
        this.enableIdempotentBehaviour();
    }

    @Override
    public @Nullable String getTransactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    public void setProducerPerThread(boolean producerPerThread) {
        this.producerPerThread = producerPerThread;
    }

    @Override
    public boolean isProducerPerThread() {
        return this.producerPerThread;
    }

    @Override
    public @Nullable Serializer<K> getKeySerializer() {
        return this.keySerializerSupplier == null ? null : this.keySerializerSupplier.get();
    }

    @Override
    public @Nullable Serializer<V> getValueSerializer() {
        return this.valueSerializerSupplier == null ? null : this.valueSerializerSupplier.get();
    }

    @Override
    public @Nullable Supplier<Serializer<K>> getKeySerializerSupplier() {
        return this.rawKeySerializerSupplier;
    }

    @Override
    public @Nullable Supplier<Serializer<V>> getValueSerializerSupplier() {
        return this.rawValueSerializerSupplier;
    }

    @Override
    public Map<String, Object> getConfigurationProperties() {
        HashMap<String, Object> configs2 = new HashMap<String, Object>(this.configs);
        this.checkBootstrap(configs2);
        return Collections.unmodifiableMap(configs2);
    }

    @Override
    public List<ProducerFactory.Listener<K, V>> getListeners() {
        return Collections.unmodifiableList(this.listeners);
    }

    @Override
    public List<ProducerPostProcessor<K, V>> getPostProcessors() {
        return Collections.unmodifiableList(this.postProcessors);
    }

    public void setMaxAge(Duration maxAge) {
        this.maxAge = maxAge.toMillis();
    }

    public void start() {
        this.running.set(true);
    }

    public void stop() {
        this.running.set(false);
        this.destroy();
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public int getPhase() {
        return Integer.MIN_VALUE;
    }

    @Override
    public ProducerFactory<K, V> copyWithConfigurationOverride(@Nullable Map<String, Object> overrideProperties) {
        Map<String, Object> producerProperties = new HashMap<String, Object>(this.getConfigurationProperties());
        if (overrideProperties != null) {
            producerProperties.putAll(overrideProperties);
        }
        producerProperties = this.ensureExistingTransactionIdPrefixInProperties(producerProperties);
        DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<K, V>(producerProperties, this.getKeySerializerSupplier(), this.getValueSerializerSupplier(), this.isConfigureSerializers());
        newFactory.setPhysicalCloseTimeout((int)this.getPhysicalCloseTimeout().getSeconds());
        newFactory.setProducerPerThread(this.isProducerPerThread());
        for (ProducerPostProcessor<K, V> producerPostProcessor : this.getPostProcessors()) {
            newFactory.addPostProcessor(producerPostProcessor);
        }
        for (ProducerFactory.Listener listener : this.getListeners()) {
            newFactory.addListener(listener);
        }
        return newFactory;
    }

    private Map<String, Object> ensureExistingTransactionIdPrefixInProperties(Map<String, Object> producerProperties) {
        String txIdPrefix = this.getTransactionIdPrefix();
        if (StringUtils.hasText((String)txIdPrefix) && !producerProperties.containsKey("transactional.id")) {
            HashMap<String, Object> producerPropertiesWithTxnId = new HashMap<String, Object>(producerProperties);
            producerPropertiesWithTxnId.put("transactional.id", txIdPrefix);
            return producerPropertiesWithTxnId;
        }
        return producerProperties;
    }

    @Override
    public void addListener(ProducerFactory.Listener<K, V> listener) {
        Assert.notNull(listener, (String)"'listener' cannot be null");
        this.listeners.add(listener);
    }

    @Override
    public void addListener(int index, ProducerFactory.Listener<K, V> listener) {
        Assert.notNull(listener, (String)"'listener' cannot be null");
        if (index >= this.listeners.size()) {
            this.listeners.add(listener);
        } else {
            this.listeners.add(index, listener);
        }
    }

    @Override
    public boolean removeListener(ProducerFactory.Listener<K, V> listener) {
        return this.listeners.remove(listener);
    }

    @Override
    public void addPostProcessor(ProducerPostProcessor<K, V> postProcessor) {
        Assert.notNull(postProcessor, (String)"'postProcessor' cannot be null");
        this.postProcessors.add(postProcessor);
    }

    @Override
    public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
        return this.postProcessors.remove(postProcessor);
    }

    @Override
    public void updateConfigs(Map<String, Object> updates) {
        updates.forEach((key, value) -> {
            if (key == null) {
                return;
            }
            if (key.equals("transactional.id")) {
                Assert.isTrue((value == null || value instanceof String ? 1 : 0) != 0, () -> "'transactional.id' must be null or a String, not a " + value.getClass().getName());
                Assert.isTrue((this.transactionIdPrefix != null == (value != null) ? 1 : 0) != 0, (String)"Cannot change transactional capability");
                this.transactionIdPrefix = (String)value;
            } else if (key.equals("client.id")) {
                Assert.isTrue((value == null || value instanceof String ? 1 : 0) != 0, () -> "'client.id' must be null or a String, not a " + value.getClass().getName());
                this.clientIdPrefix = (String)value;
            } else if (value != null) {
                this.configs.put((String)key, value);
            }
        });
    }

    @Override
    public void removeConfig(String configKey) {
        this.configs.remove(configKey);
    }

    private void enableIdempotentBehaviour() {
        Object previousValue = this.configs.putIfAbsent("enable.idempotence", true);
        if (Boolean.FALSE.equals(previousValue)) {
            LOGGER.debug(() -> "The 'enable.idempotence' is set to false, may result in duplicate messages");
        }
    }

    @Override
    public boolean transactionCapable() {
        return this.transactionIdPrefix != null;
    }

    public void destroy() {
        CloseSafeProducer<K, V> producerToClose;
        this.globalLock.lock();
        try {
            producerToClose = this.producer;
            this.producer = null;
        }
        finally {
            this.globalLock.unlock();
        }
        if (producerToClose != null) {
            try {
                producerToClose.closeDelegate(this.physicalCloseTimeout);
            }
            catch (Exception e) {
                LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
            }
        }
        this.cache.values().forEach(queue -> {
            CloseSafeProducer next = (CloseSafeProducer)queue.poll();
            while (next != null) {
                try {
                    next.closeDelegate(this.physicalCloseTimeout);
                }
                catch (Exception e) {
                    LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
                }
                next = (CloseSafeProducer)queue.poll();
            }
        });
        this.cache.clear();
        this.threadBoundProducers.values().forEach(prod -> {
            try {
                prod.closeDelegate(this.physicalCloseTimeout);
            }
            catch (Exception e) {
                LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
            }
        });
        this.threadBoundProducers.clear();
        this.epoch.incrementAndGet();
    }

    public void onApplicationEvent(ContextStoppedEvent event) {
        if (event.getApplicationContext().equals((Object)this.applicationContext)) {
            this.reset();
        }
    }

    @Override
    public void reset() {
        try {
            this.destroy();
        }
        catch (Exception e) {
            LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
        }
    }

    @Override
    public Producer<K, V> createProducer() {
        return this.createProducer(this.transactionIdPrefix);
    }

    @Override
    public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
        String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
        return this.doCreateProducer(txIdPrefix);
    }

    @Override
    public Producer<K, V> createNonTransactionalProducer() {
        return this.doCreateProducer(null);
    }

    private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
        if (txIdPrefix != null) {
            return this.createTransactionalProducer(txIdPrefix);
        }
        if (this.producerPerThread) {
            return this.getOrCreateThreadBoundProducer();
        }
        this.globalLock.lock();
        try {
            if (this.producer != null && this.producer.closed) {
                this.producer.closeDelegate(this.physicalCloseTimeout);
                this.producer = null;
            }
            if (this.producer != null && this.expire(this.producer)) {
                this.producer = null;
            }
            if (this.producer == null) {
                this.producer = new CloseSafeProducer<K, V>(this.createKafkaProducer(), this::removeProducer, this.physicalCloseTimeout, this.beanName, this.epoch.get());
                this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
            }
            CloseSafeProducer<K, V> closeSafeProducer = this.producer;
            return closeSafeProducer;
        }
        finally {
            this.globalLock.unlock();
        }
    }

    private Producer<K, V> getOrCreateThreadBoundProducer() {
        CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get(Thread.currentThread());
        if (tlProducer != null && (tlProducer.closed || this.epoch.get() != tlProducer.epoch || this.expire(tlProducer))) {
            this.closeThreadBoundProducer();
            tlProducer = null;
        }
        if (tlProducer == null) {
            tlProducer = new CloseSafeProducer<K, V>(this.createKafkaProducer(), this::removeProducer, this.physicalCloseTimeout, this.beanName, this.epoch.get());
            for (ProducerFactory.Listener<K, V> listener : this.listeners) {
                listener.producerAdded(tlProducer.clientId, tlProducer);
            }
            this.threadBoundProducers.put(Thread.currentThread(), tlProducer);
        }
        return tlProducer;
    }

    protected Producer<K, V> createKafkaProducer() {
        return this.createRawProducer(this.getProducerConfigs());
    }

    protected final boolean removeProducer(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
        if (producerToRemove.closed) {
            this.listeners.forEach(listener -> listener.producerRemoved(producerToRemove.clientId, producerToRemove));
        }
        return producerToRemove.closed;
    }

    protected Producer<K, V> createTransactionalProducer() {
        return this.createTransactionalProducer(this.transactionIdPrefix);
    }

    protected Producer<K, V> createTransactionalProducer(@Nullable String txIdPrefix) {
        BlockingQueue<CloseSafeProducer<K, V>> queue = this.getCache(txIdPrefix);
        Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
        CloseSafeProducer cachedProducer = (CloseSafeProducer)queue.poll();
        while (cachedProducer != null && this.expire(cachedProducer)) {
            cachedProducer = (CloseSafeProducer)queue.poll();
        }
        if (cachedProducer == null) {
            String suffix = this.transactionIdSuffixStrategy.acquireSuffix(txIdPrefix);
            return this.doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner);
        }
        return cachedProducer;
    }

    private boolean expire(CloseSafeProducer<K, V> producer) {
        boolean expired;
        boolean bl = expired = this.maxAge > 0L && System.currentTimeMillis() - producer.created > this.maxAge;
        if (expired) {
            producer.closeDelegate(this.physicalCloseTimeout);
        }
        return expired;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
        if (producerToRemove.closed) {
            this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
            return true;
        }
        this.globalLock.lock();
        try {
            if (producerToRemove.epoch != this.epoch.get()) {
                this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
                boolean bl = true;
                return bl;
            }
            BlockingQueue<CloseSafeProducer<K, V>> txIdCache = this.getCache(producerToRemove.txIdPrefix);
            if (producerToRemove.epoch != this.epoch.get() || txIdCache != null && !txIdCache.contains(producerToRemove) && !txIdCache.offer(producerToRemove)) {
                this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
                boolean bl = true;
                return bl;
            }
        }
        finally {
            this.globalLock.unlock();
        }
        return false;
    }

    private void removeTransactionProducer(CloseSafeProducer<K, V> producer, Duration timeout, List<ProducerFactory.Listener<K, V>> listeners) {
        this.transactionIdSuffixStrategy.releaseSuffix(producer.txIdPrefix, producer.txIdSuffix);
        listeners.forEach(listener -> listener.producerRemoved(producer.clientId, producer));
    }

    private CloseSafeProducer<K, V> doCreateTxProducer(@Nullable String prefix, String suffix, BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
        Producer<K, V> newProducer = this.createRawProducer(this.getTxProducerConfigs(prefix + suffix));
        try {
            newProducer.initTransactions();
        }
        catch (RuntimeException ex) {
            try {
                newProducer.close(this.physicalCloseTimeout);
            }
            catch (RuntimeException ex2) {
                KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex);
                newEx.addSuppressed(ex2);
                throw newEx;
            }
            finally {
                this.transactionIdSuffixStrategy.releaseSuffix(prefix, suffix);
            }
            throw new KafkaException("initTransactions() failed", ex);
        }
        CloseSafeProducer closeSafeProducer = new CloseSafeProducer(newProducer, remover, prefix, suffix, this.physicalCloseTimeout, this.beanName, this.epoch.get());
        this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
        return closeSafeProducer;
    }

    protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
        KafkaProducer kafkaProducer = new KafkaProducer(rawConfigs, this.keySerializerSupplier == null ? null : this.keySerializerSupplier.get(), this.valueSerializerSupplier == null ? null : this.valueSerializerSupplier.get());
        for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
            kafkaProducer = (Producer)pp.apply(kafkaProducer);
        }
        return kafkaProducer;
    }

    protected @Nullable BlockingQueue<CloseSafeProducer<K, V>> getCache() {
        return this.getCache(this.transactionIdPrefix);
    }

    protected @Nullable BlockingQueue<CloseSafeProducer<K, V>> getCache(@Nullable String txIdPrefix) {
        if (txIdPrefix == null) {
            return null;
        }
        return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue());
    }

    @Override
    public void closeThreadBoundProducer() {
        CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.remove(Thread.currentThread());
        if (tlProducer != null) {
            tlProducer.closeDelegate(this.physicalCloseTimeout);
        }
    }

    protected Map<String, Object> getProducerConfigs() {
        HashMap<String, Object> newProducerConfigs = new HashMap<String, Object>(this.configs);
        this.checkBootstrap(newProducerConfigs);
        String prefix = this.clientIdPrefix != null ? this.clientIdPrefix : (String)Optional.ofNullable(this.applicationContext).map(EnvironmentCapable::getEnvironment).map(environment -> environment.getProperty("spring.application.name")).map(applicationName -> applicationName + "-producer").orElse(null);
        if (prefix != null) {
            newProducerConfigs.put("client.id", prefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        return newProducerConfigs;
    }

    protected Map<String, Object> getTxProducerConfigs(String transactionId) {
        Map<String, Object> newProducerConfigs = this.getProducerConfigs();
        newProducerConfigs.put("transactional.id", transactionId);
        return newProducerConfigs;
    }

    protected static class CloseSafeProducer<K, V>
    implements Producer<K, V> {
        private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0L);
        private final Producer<K, V> delegate;
        private final BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer;
        final @Nullable String txIdPrefix;
        final @Nullable String txIdSuffix;
        final long created;
        private final Duration closeTimeout;
        final String clientId;
        final int epoch;
        private volatile @Nullable Exception producerFailed;
        volatile boolean closed;

        CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout, String factoryName, int epoch) {
            this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
        }

        CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) {
            this(delegate, removeProducer, txIdPrefix, null, closeTimeout, factoryName, epoch);
        }

        CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix, @Nullable String txIdSuffix, Duration closeTimeout, String factoryName, int epoch) {
            Assert.isTrue((!(delegate instanceof CloseSafeProducer) ? 1 : 0) != 0, (String)"Cannot double-wrap a producer");
            this.delegate = delegate;
            this.removeProducer = removeProducer;
            this.txIdPrefix = txIdPrefix;
            this.txIdSuffix = txIdSuffix;
            this.closeTimeout = closeTimeout;
            Map metrics = delegate.metrics();
            Iterator metricIterator = metrics.keySet().iterator();
            String id = metricIterator.hasNext() ? (String)((MetricName)metricIterator.next()).tags().get("client-id") : "unknown";
            this.clientId = factoryName + "." + id;
            this.created = System.currentTimeMillis();
            this.epoch = epoch;
            LOGGER.debug(() -> "Created new Producer: " + String.valueOf(this));
        }

        Producer<K, V> getDelegate() {
            return this.delegate;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            LOGGER.trace(() -> this.toString() + " send(" + String.valueOf(record) + ")");
            return this.delegate.send(record);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record, final Callback callback) {
            LOGGER.trace(() -> this.toString() + " send(" + String.valueOf(record) + ")");
            return this.delegate.send(record, new Callback(){
                final /* synthetic */ CloseSafeProducer this$0;
                {
                    this.this$0 = this$0;
                }

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception instanceof OutOfOrderSequenceException) {
                        this.this$0.producerFailed = exception;
                        this.this$0.close(this.this$0.closeTimeout);
                    }
                    callback.onCompletion(metadata, exception);
                }
            });
        }

        public void flush() {
            LOGGER.trace(() -> this.toString() + " flush()");
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String topic) {
            return this.delegate.partitionsFor(topic);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public Uuid clientInstanceId(Duration timeout) {
            return this.delegate.clientInstanceId(timeout);
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " beginTransaction()");
            try {
                this.delegate.beginTransaction();
            }
            catch (RuntimeException e) {
                LOGGER.error((Throwable)e, () -> "beginTransaction failed: " + String.valueOf(this));
                this.producerFailed = e;
                throw e;
            }
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
            LOGGER.trace(() -> this.toString() + " sendOffsetsToTransaction(" + String.valueOf(offsets) + ", " + consumerGroupId + ")");
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
            LOGGER.trace(() -> this.toString() + " sendOffsetsToTransaction(" + String.valueOf(offsets) + ", " + String.valueOf(groupMetadata) + ")");
            this.delegate.sendOffsetsToTransaction(offsets, groupMetadata);
        }

        public void commitTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " commitTransaction()");
            try {
                this.delegate.commitTransaction();
            }
            catch (RuntimeException e) {
                LOGGER.error((Throwable)e, () -> "commitTransaction failed: " + String.valueOf(this));
                this.producerFailed = e;
                throw e;
            }
        }

        public void abortTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " abortTransaction()");
            if (this.producerFailed != null) {
                LOGGER.debug(() -> {
                    String message = this.producerFailed == null ? "" : this.producerFailed.getMessage();
                    return "abortTransaction ignored - previous txFailed: " + message + ": " + String.valueOf(this);
                });
            } else {
                try {
                    this.delegate.abortTransaction();
                }
                catch (RuntimeException e) {
                    LOGGER.error((Throwable)e, () -> "Abort failed: " + String.valueOf(this));
                    this.producerFailed = e;
                    throw e;
                }
            }
        }

        public void close() {
            this.close(null);
        }

        public void close(@Nullable Duration timeout) {
            LOGGER.trace(() -> this.toString() + " close(" + String.valueOf(timeout == null ? "null" : timeout) + ")");
            if (!this.closed) {
                if (this.producerFailed != null) {
                    LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + String.valueOf(this));
                    this.closed = true;
                    this.removeProducer.test(this, this.producerFailed instanceof TimeoutException ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT : timeout);
                    this.delegate.close(timeout == null ? this.closeTimeout : (this.producerFailed instanceof TimeoutException ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT : timeout));
                } else {
                    this.closed = this.removeProducer.test(this, timeout);
                    if (this.closed) {
                        this.delegate.close(timeout == null ? this.closeTimeout : timeout);
                    }
                }
            }
        }

        void closeDelegate(Duration timeout) {
            try {
                if (!this.closed) {
                    this.delegate.close(timeout == null ? this.closeTimeout : timeout);
                    this.closed = true;
                    this.removeProducer.test(this, timeout == null ? this.closeTimeout : timeout);
                }
            }
            catch (Exception ex) {
                LOGGER.warn((Throwable)ex, () -> "Failed to close " + String.valueOf(this.delegate));
            }
        }

        public String toString() {
            return "CloseSafeProducer [delegate=" + String.valueOf(this.delegate) + "]";
        }
    }
}

