/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.reactive.core;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory;
import org.springframework.pulsar.reactive.core.RestartableComponentSupport;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public final class DefaultReactivePulsarSenderFactory<T>
implements ReactivePulsarSenderFactory<T>,
RestartableComponentSupport {
    private static final int LIFECYCLE_PHASE = -1073741924;
    private final LogAccessor logger = new LogAccessor(this.getClass());
    private final AtomicReference<RestartableComponentSupport.State> currentState = RestartableComponentSupport.initialState();
    private final ReactivePulsarClient reactivePulsarClient;
    private final TopicResolver topicResolver;
    @Nullable
    private final ReactiveMessageSenderCache reactiveMessageSenderCache;
    @Nullable
    private String defaultTopic;
    @Nullable
    private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

    private DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, TopicResolver topicResolver, @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, @Nullable String defaultTopic, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers) {
        this.reactivePulsarClient = reactivePulsarClient;
        this.topicResolver = topicResolver;
        this.reactiveMessageSenderCache = reactiveMessageSenderCache;
        this.defaultTopic = defaultTopic;
        this.defaultConfigCustomizers = defaultConfigCustomizers;
    }

    public static <T> Builder<T> builderFor(ReactivePulsarClient reactivePulsarClient) {
        return new Builder(reactivePulsarClient);
    }

    public static <T> Builder<T> builderFor(PulsarClient pulsarClient) {
        return new Builder(AdaptedReactivePulsarClientFactory.create((PulsarClient)pulsarClient));
    }

    @Override
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String topic) {
        return this.doCreateReactiveMessageSender(schema, topic, null);
    }

    @Override
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String topic, @Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
        return this.doCreateReactiveMessageSender(schema, topic, customizer != null ? Collections.singletonList(customizer) : null);
    }

    @Override
    public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String topic, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
        return this.doCreateReactiveMessageSender(schema, topic, customizers);
    }

    private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String topic, @Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
        Objects.requireNonNull(schema, "Schema must be specified");
        String resolvedTopic = (String)this.topicResolver.resolveTopic(topic, () -> this.getDefaultTopic()).orElseThrow();
        this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic));
        ReactiveMessageSenderBuilder sender = this.reactivePulsarClient.messageSender(schema);
        if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
            this.defaultConfigCustomizers.forEach(customizer -> customizer.customize(sender));
        }
        sender.topic(resolvedTopic);
        if (this.reactiveMessageSenderCache != null) {
            sender.cache(this.reactiveMessageSenderCache);
        }
        if (!CollectionUtils.isEmpty(customizers)) {
            customizers.forEach(c -> c.customize(sender));
        }
        sender.topic(resolvedTopic);
        return sender.build();
    }

    @Override
    public String getDefaultTopic() {
        return this.defaultTopic;
    }

    public int getPhase() {
        return -1073741924;
    }

    @Override
    public AtomicReference<RestartableComponentSupport.State> currentState() {
        return this.currentState;
    }

    @Override
    public LogAccessor logger() {
        return this.logger;
    }

    @Override
    public void doStop() {
        try {
            if (this.reactiveMessageSenderCache != null) {
                this.reactiveMessageSenderCache.close();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static final class Builder<T> {
        private final ReactivePulsarClient reactivePulsarClient;
        private TopicResolver topicResolver = new DefaultTopicResolver();
        @Nullable
        private ReactiveMessageSenderCache messageSenderCache;
        @Nullable
        private String defaultTopic;
        @Nullable
        private List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

        private Builder(ReactivePulsarClient reactivePulsarClient) {
            Assert.notNull((Object)reactivePulsarClient, (String)"Reactive client is required");
            this.reactivePulsarClient = reactivePulsarClient;
        }

        public Builder<T> withTopicResolver(TopicResolver topicResolver) {
            this.topicResolver = topicResolver;
            return this;
        }

        public Builder<T> withMessageSenderCache(ReactiveMessageSenderCache messageSenderCache) {
            this.messageSenderCache = messageSenderCache;
            return this;
        }

        public Builder<T> withDefaultTopic(String defaultTopic) {
            this.defaultTopic = defaultTopic;
            return this;
        }

        public Builder<T> withDefaultConfigCustomizer(ReactiveMessageSenderBuilderCustomizer<T> customizer) {
            this.defaultConfigCustomizers = List.of(customizer);
            return this;
        }

        public Builder<T> withDefaultConfigCustomizers(List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
            this.defaultConfigCustomizers = customizers;
            return this;
        }

        public DefaultReactivePulsarSenderFactory<T> build() {
            Assert.notNull((Object)this.topicResolver, (String)"Topic resolver is required");
            return new DefaultReactivePulsarSenderFactory<T>(this.reactivePulsarClient, this.topicResolver, this.messageSenderCache, this.defaultTopic, this.defaultConfigCustomizers);
        }
    }
}

