/*
 * Decompiled with CFR 0.152.
 */
package com.github.lhotari.reactive.pulsar.internal.adapter;

import com.github.lhotari.reactive.pulsar.adapter.MessageSpec;
import com.github.lhotari.reactive.pulsar.adapter.ProducerConfigurer;
import com.github.lhotari.reactive.pulsar.adapter.ReactiveMessageSender;
import com.github.lhotari.reactive.pulsar.internal.adapter.PulsarFutureAdapter;
import com.github.lhotari.reactive.pulsar.resourceadapter.PublisherTransformer;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveProducerAdapter;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveProducerAdapterFactory;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveProducerCache;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class DefaultReactiveMessageSender<T>
implements ReactiveMessageSender<T> {
    private final Schema<T> schema;
    private final ProducerConfigurer<T> producerConfigurer;
    private final String topicName;
    private final int maxConcurrency;
    private final ReactiveProducerAdapterFactory reactiveProducerAdapterFactory;
    private final ReactiveProducerCache producerCache;
    private final Supplier<PublisherTransformer> producerActionTransformer;

    public DefaultReactiveMessageSender(Schema<T> schema, ProducerConfigurer<T> producerConfigurer, String topicName, int maxConcurrency, ReactiveProducerAdapterFactory reactiveProducerAdapterFactory, ReactiveProducerCache producerCache, Supplier<PublisherTransformer> producerActionTransformer) {
        this.schema = schema;
        this.producerConfigurer = producerConfigurer;
        this.topicName = topicName;
        this.maxConcurrency = maxConcurrency;
        this.reactiveProducerAdapterFactory = reactiveProducerAdapterFactory;
        this.producerCache = producerCache;
        this.producerActionTransformer = producerActionTransformer;
    }

    ReactiveProducerAdapter<T> createReactiveProducerAdapter() {
        return this.reactiveProducerAdapterFactory.create(pulsarClient -> {
            ProducerBuilder producerBuilder = pulsarClient.newProducer(this.schema);
            if (this.topicName != null) {
                producerBuilder.topic(this.topicName);
            }
            if (this.producerConfigurer != null) {
                this.producerConfigurer.configure(producerBuilder);
            }
            return producerBuilder;
        }, this.producerCache, this.producerActionTransformer);
    }

    @Override
    public Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec) {
        return this.createReactiveProducerAdapter().usingProducer(producer -> messageSpec.flatMap(m -> this.createMessageMono((MessageSpec<T>)m, (Producer<T>)producer)));
    }

    private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T> producer) {
        return PulsarFutureAdapter.adaptPulsarFuture(() -> {
            TypedMessageBuilder typedMessageBuilder = producer.newMessage();
            messageSpec.configure(typedMessageBuilder);
            return typedMessageBuilder.sendAsync();
        });
    }

    @Override
    public Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs) {
        return this.createReactiveProducerAdapter().usingProducerMany(producer -> messageSpecs.flatMapSequential(messageSpec -> this.createMessageMono((MessageSpec<T>)messageSpec, (Producer<T>)producer), this.maxConcurrency));
    }
}

