/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.eventhubs.core;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.spring.eventhubs.core.producer.EventHubsProducerFactory;
import com.azure.spring.eventhubs.support.converter.EventHubsMessageConverter;
import com.azure.spring.messaging.PartitionSupplier;
import com.azure.spring.messaging.core.BatchSendOperation;
import com.azure.spring.messaging.core.SendOperation;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class EventHubsTemplate
implements SendOperation,
BatchSendOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsTemplate.class);
    private final EventHubsProducerFactory producerFactory;
    private EventHubsMessageConverter messageConverter = new EventHubsMessageConverter();

    public EventHubsTemplate(EventHubsProducerFactory producerFactory) {
        this.producerFactory = producerFactory;
    }

    public <T> Mono<Void> sendAsync(String destination, Collection<Message<T>> messages, PartitionSupplier partitionSupplier) {
        List<EventData> eventData = messages.stream().map(m -> (EventData)this.messageConverter.fromMessage((Message)m, EventData.class)).collect(Collectors.toList());
        return this.doSend(destination, eventData, partitionSupplier);
    }

    public <T> Mono<Void> sendAsync(String destination, Message<T> message, PartitionSupplier partitionSupplier) {
        return this.sendAsync(destination, Collections.singleton(message), partitionSupplier);
    }

    private Mono<Void> doSend(String destination, List<EventData> events, PartitionSupplier partitionSupplier) {
        EventHubProducerAsyncClient producer = this.producerFactory.createProducer(destination);
        CreateBatchOptions options = this.buildCreateBatchOptions(partitionSupplier);
        AtomicReference<EventDataBatch> currentBatch = new AtomicReference<EventDataBatch>((EventDataBatch)producer.createBatch(options).block());
        Flux.fromIterable(events).flatMap(event -> {
            EventDataBatch batch = (EventDataBatch)currentBatch.get();
            try {
                if (batch.tryAdd(event)) {
                    return Mono.empty();
                }
                LOGGER.warn("EventDataBatch is full in the collect process or the first event is too large to fit in an empty batch! Max size: {}", (Object)batch.getMaxSizeInBytes());
            }
            catch (AmqpException e) {
                LOGGER.error("Event is larger than maximum allowed size.", (Throwable)e);
                return Mono.empty();
            }
            return Mono.when((Publisher[])new Publisher[]{producer.send(batch), producer.createBatch(options).map(newBatch -> {
                currentBatch.set((EventDataBatch)newBatch);
                try {
                    if (!newBatch.tryAdd(event)) {
                        LOGGER.error("Event was too large to fit in an empty batch. Max size:{} ", (Object)newBatch.getMaxSizeInBytes());
                    }
                }
                catch (AmqpException e) {
                    LOGGER.error("Event was too large to fit in an empty batch. Max size:{}", (Object)newBatch.getMaxSizeInBytes(), (Object)e);
                }
                return newBatch;
            })});
        }).then().block();
        EventDataBatch batch = currentBatch.getAndSet(null);
        return producer.send(batch);
    }

    private CreateBatchOptions buildCreateBatchOptions(PartitionSupplier partitionSupplier) {
        return new CreateBatchOptions().setPartitionId(partitionSupplier != null ? partitionSupplier.getPartitionId() : null).setPartitionKey(partitionSupplier != null ? partitionSupplier.getPartitionKey() : null);
    }

    public void setMessageConverter(EventHubsMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
}

