/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.azure.eventhubs.internal.client;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
import com.mulesoft.connectors.azure.eventhubs.internal.client.PublisherClient;
import com.mulesoft.connectors.azure.eventhubs.internal.client.mapper.EventDataMapper;
import com.mulesoft.connectors.azure.eventhubs.internal.client.mapper.SendOptionsMapper;
import com.mulesoft.connectors.azure.eventhubs.internal.domain.Event;
import com.mulesoft.connectors.azure.eventhubs.internal.error.AmqpErrorTranslator;
import com.mulesoft.connectors.azure.eventhubs.internal.error.exception.EventHubsException;
import com.mulesoft.connectors.azure.eventhubs.internal.error.exception.InvalidArgumentException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class DefaultPublisherClient
implements PublisherClient {
    private final EventHubProducerClient producer;

    public DefaultPublisherClient(EventHubProducerClient producer) {
        this.producer = producer;
    }

    public EventHubProducerClient getProducer() {
        return this.producer;
    }

    @Override
    public void publish(Event event) {
        EventData eventData = EventDataMapper.fromEvent(event);
        SendOptions sendOptions = SendOptionsMapper.fromEvent(event);
        try {
            this.producer.send(Collections.singletonList(eventData), sendOptions);
        }
        catch (AmqpException ex) {
            this.handleAmpqError(ex);
        }
    }

    @Override
    public void bulkPublish(List<Event> events, String partitionId, String partitionKey, int maxBatchSizeInBytes) {
        try {
            CreateBatchOptions createBatchOptions = this.buildCreateBatchOptions(partitionId, partitionKey, maxBatchSizeInBytes);
            EventDataBatch batch = this.producer.createBatch(createBatchOptions);
            List<EventData> eventsToPublish = events.stream().map(EventDataMapper::fromEvent).collect(Collectors.toList());
            eventsToPublish.forEach(eventData -> {
                try {
                    batch.tryAdd(eventData);
                }
                catch (AmqpException ex) {
                    throw new InvalidArgumentException("Event is too long. Max Size: " + maxBatchSizeInBytes, ex);
                }
            });
            this.producer.send(batch);
        }
        catch (AmqpException ex) {
            this.handleAmpqError(ex);
        }
    }

    @Override
    public void close() {
        this.producer.close();
    }

    private CreateBatchOptions buildCreateBatchOptions(String partitionId, String partitionKey, int maxBatchSizeInBytes) {
        CreateBatchOptions createBatchOptions = new CreateBatchOptions();
        createBatchOptions.setMaximumSizeInBytes(maxBatchSizeInBytes);
        if (!Objects.isNull(partitionId)) {
            return createBatchOptions.setPartitionId(partitionId);
        }
        if (!Objects.isNull(partitionKey)) {
            return createBatchOptions.setPartitionKey(partitionKey);
        }
        return createBatchOptions;
    }

    private void handleAmpqError(AmqpException ex) {
        AmqpErrorTranslator.tryToHandleKnownAmqpException(ex);
        throw new EventHubsException(ex.getMessage());
    }
}

