/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.core;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.ProducerUtils;
import org.springframework.pulsar.core.PulsarOperations;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.core.TransactionProperties;
import org.springframework.pulsar.core.TypedMessageBuilderCustomizer;
import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention;
import org.springframework.pulsar.observation.PulsarMessageSenderContext;
import org.springframework.pulsar.observation.PulsarTemplateObservation;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;
import org.springframework.pulsar.support.internal.logging.LambdaCustomizerWarnLogger;
import org.springframework.pulsar.transaction.PulsarResourceHolder;
import org.springframework.pulsar.transaction.PulsarTransactionUtils;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class PulsarTemplate<T>
implements PulsarOperations<T>,
ApplicationContextAware,
BeanNameAware,
SmartInitializingSingleton {
    private final LogAccessor logger = new LogAccessor(this.getClass());
    private final PulsarProducerFactory<T> producerFactory;
    private final SchemaResolver schemaResolver;
    private final TopicResolver topicResolver;
    private final @Nullable List<ProducerBuilderCustomizer<T>> interceptorsCustomizers;
    private final Map<Thread, Transaction> threadBoundTransactions = new HashMap<Thread, Transaction>();
    private final boolean isProducerFactoryCaching;
    private final boolean observationEnabled;
    private @Nullable ObservationRegistry observationRegistry;
    private @Nullable PulsarTemplateObservationConvention observationConvention;
    private @Nullable ApplicationContext applicationContext;
    private String beanName = "";
    private @Nullable LambdaCustomizerWarnLogger lambdaLogger;
    private final TransactionProperties transactionProps = new TransactionProperties();

    public PulsarTemplate(PulsarProducerFactory<T> producerFactory) {
        this(producerFactory, Collections.emptyList());
    }

    public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInterceptor> interceptors) {
        this(producerFactory, interceptors, new DefaultSchemaResolver(), new DefaultTopicResolver(), true);
    }

    public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInterceptor> interceptors, SchemaResolver schemaResolver, TopicResolver topicResolver, boolean observationEnabled) {
        this.producerFactory = Objects.requireNonNull(producerFactory, "producerFactory must not be null");
        this.schemaResolver = Objects.requireNonNull(schemaResolver, "schemaResolver must not be null");
        this.topicResolver = Objects.requireNonNull(topicResolver, "topicResolver must not be null");
        this.observationEnabled = observationEnabled;
        this.interceptorsCustomizers = !CollectionUtils.isEmpty(interceptors) ? interceptors.stream().map(this::adaptInterceptorToCustomizer).toList() : null;
        this.isProducerFactoryCaching = this.producerFactory instanceof CachingPulsarProducerFactory;
        this.lambdaLogger = this.newLambdaWarnLogger(1000L);
    }

    private ProducerBuilderCustomizer<T> adaptInterceptorToCustomizer(ProducerInterceptor interceptor) {
        return b -> b.intercept(new ProducerInterceptor[]{interceptor});
    }

    private LambdaCustomizerWarnLogger newLambdaWarnLogger(long frequency) {
        return new LambdaCustomizerWarnLogger(this.logger, frequency);
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public TransactionProperties transactions() {
        return this.transactionProps;
    }

    public void logWarningForLambdaCustomizer(long frequency) {
        this.lambdaLogger = frequency > 0L ? this.newLambdaWarnLogger(frequency) : null;
    }

    public void afterSingletonsInstantiated() {
        if (!this.observationEnabled) {
            this.logger.debug(() -> "Observations are not enabled - not recording");
            return;
        }
        if (this.applicationContext == null) {
            this.logger.warn(() -> "Observations enabled but application context null - not recording");
            return;
        }
        this.observationRegistry = (ObservationRegistry)this.applicationContext.getBeanProvider(ObservationRegistry.class).getIfUnique(() -> this.observationRegistry);
        this.observationConvention = (PulsarTemplateObservationConvention)this.applicationContext.getBeanProvider(PulsarTemplateObservationConvention.class).getIfUnique(() -> this.observationConvention);
    }

    @Override
    public MessageId send(@Nullable T message) {
        return this.doSend(null, message, null, null, null, null);
    }

    @Override
    public MessageId send(@Nullable T message, @Nullable Schema<T> schema) {
        return this.doSend(null, message, schema, null, null, null);
    }

    @Override
    public MessageId send(@Nullable String topic, @Nullable T message) {
        return this.doSend(topic, message, null, null, null, null);
    }

    @Override
    public MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema) {
        return this.doSend(topic, message, schema, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable T message) {
        return this.doSendAsync(null, message, null, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable T message, @Nullable Schema<T> schema) {
        return this.doSendAsync(null, message, schema, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable String topic, @Nullable T message) {
        return this.doSendAsync(topic, message, null, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema) {
        return this.doSendAsync(topic, message, schema, null, null, null);
    }

    @Override
    public PulsarOperations.SendMessageBuilder<T> newMessage(@Nullable T message) {
        return new SendMessageBuilderImpl<T>(this, message);
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    private MessageId doSend(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable Collection<String> encryptionKeys, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerCustomizer) {
        try {
            return this.doSendAsync(topic, message, schema, encryptionKeys, typedMessageBuilderCustomizer, producerCustomizer).get();
        }
        catch (PulsarException ex) {
            throw ex;
        }
        catch (Exception ex) {
            throw new PulsarException(PulsarClientException.unwrap((Throwable)ex));
        }
    }

    private CompletableFuture<MessageId> doSendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable Collection<String> encryptionKeys, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerCustomizer) {
        String defaultTopic = Objects.toString(this.producerFactory.getDefaultTopic(), null);
        String topicName = this.topicResolver.resolveTopic(topic, message, () -> defaultTopic).orElseThrow();
        Assert.notNull((Object)topicName, (String)"The resolvedTopic must not be null");
        this.logger.trace(() -> "Sending msg to '%s' topic".formatted(topicName));
        PulsarMessageSenderContext senderContext = PulsarMessageSenderContext.newContext(topicName, this.beanName);
        Observation observation = this.newObservation(senderContext);
        Producer<T> producer = null;
        try {
            observation.start();
            producer = this.prepareProducerForSend(topicName, message, schema, encryptionKeys, producerCustomizer);
            Transaction txn = this.getTransaction();
            TypedMessageBuilder messageBuilder = txn != null ? producer.newMessage(txn) : producer.newMessage();
            messageBuilder = messageBuilder.value(message);
            if (typedMessageBuilderCustomizer != null) {
                typedMessageBuilderCustomizer.customize(messageBuilder);
            }
            senderContext.properties().forEach((arg_0, arg_1) -> ((TypedMessageBuilder)messageBuilder).property(arg_0, arg_1));
            Producer<T> finalProducer = producer;
            return messageBuilder.sendAsync().whenComplete((msgId, ex) -> {
                if (ex == null) {
                    this.logger.trace(() -> "Sent msg to '%s' topic".formatted(topicName));
                    observation.stop();
                } else {
                    this.logger.error(ex, () -> "Failed to send msg to '%s' topic".formatted(topicName));
                    observation.error(ex);
                    observation.stop();
                }
                ProducerUtils.closeProducerAsync(finalProducer, this.logger);
            });
        }
        catch (RuntimeException ex2) {
            if (producer != null) {
                ProducerUtils.closeProducerAsync(producer, this.logger);
            }
            observation.error((Throwable)ex2);
            observation.stop();
            throw ex2;
        }
    }

    private Observation newObservation(PulsarMessageSenderContext senderContext) {
        if (this.observationRegistry == null) {
            return Observation.NOOP;
        }
        return PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, DefaultPulsarTemplateObservationConvention.INSTANCE, () -> senderContext, this.observationRegistry);
    }

    private @Nullable Transaction getTransaction() {
        if (!this.transactions().isEnabled()) {
            return null;
        }
        boolean allowNonTransactional = !this.transactions().isRequired();
        boolean inTransaction = this.inTransaction();
        Assert.state((allowNonTransactional || inTransaction ? 1 : 0) != 0, (String)"No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        if (!inTransaction) {
            this.logger.trace(() -> "No txn found but allowNonTransactional is true - returning null");
            return null;
        }
        Transaction txn = this.threadBoundTransactions.get(Thread.currentThread());
        if (txn != null) {
            this.logger.trace(() -> "Found local template txn [%s]".formatted(txn));
            return txn;
        }
        PulsarResourceHolder resourceHolder = PulsarTransactionUtils.obtainResourceHolder(this.producerFactory.getPulsarClient(), this.transactions().getTimeout());
        return resourceHolder.getTransaction();
    }

    private boolean inTransaction() {
        if (!this.transactions().isEnabled()) {
            return false;
        }
        return this.threadBoundTransactions.get(Thread.currentThread()) != null || PulsarTransactionUtils.inTransaction(this.producerFactory.getPulsarClient());
    }

    private Producer<T> prepareProducerForSend(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable Collection<String> encryptionKeys, @Nullable ProducerBuilderCustomizer<T> producerCustomizer) {
        Schema<T> resolvedSchema = schema != null ? schema : this.schemaResolver.resolveSchema(message).orElseThrow();
        Assert.notNull(resolvedSchema, (String)"The resolvedSchema must not be null");
        ArrayList customizers = new ArrayList();
        if (!CollectionUtils.isEmpty(this.interceptorsCustomizers)) {
            customizers.addAll(this.interceptorsCustomizers);
        }
        if (producerCustomizer != null) {
            this.possiblyLogWarningOnUsingLambdaCustomizers(producerCustomizer);
            customizers.add(producerCustomizer);
        }
        return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers);
    }

    private void possiblyLogWarningOnUsingLambdaCustomizers(ProducerBuilderCustomizer<T> producerCustomizer) {
        if (this.lambdaLogger != null && this.isProducerFactoryCaching) {
            this.lambdaLogger.maybeLog(producerCustomizer);
        }
    }

    public <R> @Nullable R executeInTransaction(TemplateCallback<T, R> callback) {
        Assert.notNull(callback, (String)"callback must not be null");
        Assert.state((boolean)this.transactions().isEnabled(), (String)"This template does not support transactions");
        Thread currentThread = Thread.currentThread();
        Transaction txn = this.threadBoundTransactions.get(currentThread);
        Assert.state((txn == null ? 1 : 0) != 0, (String)"Nested calls to 'executeInTransaction' are not allowed");
        txn = this.newPulsarTransaction();
        this.threadBoundTransactions.put(currentThread, txn);
        try {
            R result = callback.doWithTemplate(this);
            txn.commit().get();
            R r = result;
            return r;
        }
        catch (Exception ex) {
            if (txn != null) {
                PulsarTransactionUtils.abort(txn);
            }
            throw PulsarException.unwrap(ex);
        }
        finally {
            this.threadBoundTransactions.remove(currentThread);
        }
    }

    private Transaction newPulsarTransaction() {
        try {
            TransactionBuilder txnBuilder = this.producerFactory.getPulsarClient().newTransaction();
            if (this.transactions().getTimeout() != null) {
                long timeoutSecs = this.transactions().getTimeout().toSeconds();
                txnBuilder.withTransactionTimeout(timeoutSecs, TimeUnit.SECONDS);
            }
            return (Transaction)txnBuilder.build().get();
        }
        catch (Exception ex) {
            throw PulsarException.unwrap(ex);
        }
    }

    public static class SendMessageBuilderImpl<T>
    implements PulsarOperations.SendMessageBuilder<T> {
        private final PulsarTemplate<T> template;
        private final @Nullable T message;
        private @Nullable String topic;
        private @Nullable Schema<T> schema;
        private @Nullable Collection<String> encryptionKeys;
        private @Nullable TypedMessageBuilderCustomizer<T> messageCustomizer;
        private @Nullable ProducerBuilderCustomizer<T> producerCustomizer;

        SendMessageBuilderImpl(PulsarTemplate<T> template, @Nullable T message) {
            this.template = template;
            this.message = message;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withTopic(String topic) {
            this.topic = topic;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withSchema(Schema<T> schema) {
            this.schema = schema;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withEncryptionKeys(Collection<String> encryptionKeys) {
            this.encryptionKeys = encryptionKeys;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withMessageCustomizer(TypedMessageBuilderCustomizer<T> messageCustomizer) {
            this.messageCustomizer = messageCustomizer;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withProducerCustomizer(ProducerBuilderCustomizer<T> producerCustomizer) {
            this.producerCustomizer = producerCustomizer;
            return this;
        }

        @Override
        public MessageId send() {
            return this.template.doSend(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer);
        }

        @Override
        public CompletableFuture<MessageId> sendAsync() {
            return this.template.doSendAsync(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer);
        }
    }

    public static interface TemplateCallback<T, R> {
        public @Nullable R doWithTemplate(PulsarTemplate<T> var1);
    }
}

