/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusMessageBatch;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.models.CreateBatchOptions;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

public final class ServiceBusSenderAsyncClient
implements AutoCloseable {
    static final int MAX_MESSAGE_LENGTH_BYTES = 262144;
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private static final CreateBatchOptions DEFAULT_BATCH_OPTIONS = new CreateBatchOptions();
    private final ClientLogger logger = new ClientLogger(ServiceBusSenderAsyncClient.class);
    private final AtomicReference<String> linkName = new AtomicReference();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final AmqpRetryOptions retryOptions;
    private final AmqpRetryPolicy retryPolicy;
    private final MessagingEntityType entityType;
    private final Runnable onClientClose;
    private final String entityName;
    private final ServiceBusConnectionProcessor connectionProcessor;

    ServiceBusSenderAsyncClient(String entityName, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose) {
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null.");
        this.entityName = Objects.requireNonNull(entityName, "'entityPath' cannot be null.");
        this.connectionProcessor = Objects.requireNonNull(connectionProcessor, "'connectionProcessor' cannot be null.");
        this.tracerProvider = tracerProvider;
        this.retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)retryOptions);
        this.entityType = entityType;
        this.onClientClose = onClientClose;
    }

    public String getFullyQualifiedNamespace() {
        return this.connectionProcessor.getFullyQualifiedNamespace();
    }

    public String getEntityPath() {
        return this.entityName;
    }

    public Mono<Void> send(ServiceBusMessage message) {
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        return this.sendInternal((Flux<ServiceBusMessage>)Flux.just((Object)message), null);
    }

    public Mono<Void> send(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.sendInternal((Flux<ServiceBusMessage>)Flux.just((Object)message), transactionContext);
    }

    public Mono<Void> send(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.sendIterable(messages, transactionContext);
    }

    public Mono<Void> send(Iterable<ServiceBusMessage> messages) {
        return this.sendIterable(messages, null);
    }

    public Mono<Void> send(ServiceBusMessageBatch batch) {
        return this.sendInternal(batch, null);
    }

    public Mono<Void> send(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.sendInternal(batch, transactionContext);
    }

    public Mono<ServiceBusMessageBatch> createBatch() {
        return this.createBatch(DEFAULT_BATCH_OPTIONS);
    }

    public Mono<ServiceBusMessageBatch> createBatch(CreateBatchOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        int maxSize = options.getMaximumSizeInBytes();
        return this.getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> {
            int maximumLinkSize;
            int n = maximumLinkSize = size > 0 ? size : 262144;
            if (maxSize > maximumLinkSize) {
                return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException(String.format(Locale.US, "CreateBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size (%s bytes).", maxSize, maximumLinkSize)));
            }
            int batchSize = maxSize > 0 ? maxSize : maximumLinkSize;
            return Mono.just((Object)new ServiceBusMessageBatch(batchSize, () -> ((AmqpSendLink)link).getErrorContext(), this.tracerProvider, this.messageSerializer));
        }));
    }

    public Mono<Long> scheduleMessage(ServiceBusMessage message, Instant scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.scheduleMessageInternal(message, scheduledEnqueueTime, transactionContext);
    }

    public Mono<Long> scheduleMessage(ServiceBusMessage message, Instant scheduledEnqueueTime) {
        return this.scheduleMessageInternal(message, scheduledEnqueueTime, null);
    }

    public Mono<Void> cancelScheduledMessage(long sequenceNumber) {
        if (sequenceNumber < 0L) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("'sequenceNumber' cannot be negative."));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityName, this.entityType)).flatMap(managementNode -> managementNode.cancelScheduledMessage(sequenceNumber, this.linkName.get()));
    }

    public Mono<ServiceBusTransactionContext> createTransaction() {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "createTransaction")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.createTransaction()).map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId()));
    }

    public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "commitTransaction")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.commitTransaction(new AmqpTransaction(transactionContext.getTransactionId())));
    }

    public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "rollbackTransaction")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.rollbackTransaction(new AmqpTransaction(transactionContext.getTransactionId())));
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.onClientClose.run();
    }

    private Mono<Void> sendIterable(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transaction) {
        if (Objects.isNull(messages)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'messages' cannot be null."));
        }
        return this.createBatch().flatMap(messageBatch -> {
            messages.forEach(message -> messageBatch.tryAdd((ServiceBusMessage)message));
            return this.sendInternal((ServiceBusMessageBatch)messageBatch, transaction);
        });
    }

    private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, Instant scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        if (Objects.isNull(scheduledEnqueueTime)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'scheduledEnqueueTime' cannot be null."));
        }
        return this.getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> {
            int maxSize = size > 0 ? size : 262144;
            return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityName, this.entityType)).flatMap(managementNode -> managementNode.schedule(message, scheduledEnqueueTime, maxSize, link.getLinkName(), transactionContext));
        }));
    }

    private Mono<Void> sendInternal(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) {
        AtomicReference<Context> parentContext;
        if (Objects.isNull(batch)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'batch' cannot be null."));
        }
        boolean isTracingEnabled = this.tracerProvider.isEnabled();
        AtomicReference<Context> atomicReference = parentContext = isTracingEnabled ? new AtomicReference<Context>(Context.NONE) : null;
        if (batch.getMessages().isEmpty()) {
            this.logger.info("Cannot send an EventBatch that is empty.");
            return Mono.empty();
        }
        this.logger.info("Sending batch with size[{}].", new Object[]{batch.getCount()});
        Context sharedContext = null;
        ArrayList<Message> messages = new ArrayList<Message>();
        for (int i = 0; i < batch.getMessages().size(); ++i) {
            Message message;
            ServiceBusMessage event = batch.getMessages().get(i);
            if (isTracingEnabled) {
                parentContext.set(event.getContext());
                if (i == 0) {
                    sharedContext = this.tracerProvider.getSharedSpanBuilder(parentContext.get());
                }
                this.tracerProvider.addSpanLinks(sharedContext.addData((Object)"span-context", (Object)event.getContext()));
            }
            MessageAnnotations messageAnnotations = (message = this.messageSerializer.serialize((Object)event)).getMessageAnnotations() == null ? new MessageAnnotations(new HashMap()) : message.getMessageAnnotations();
            message.setMessageAnnotations(messageAnnotations);
            messages.add(message);
        }
        Context finalSharedContext = sharedContext != null ? sharedContext : Context.NONE;
        return RetryUtil.withRetry((Mono)this.getSendLink().flatMap(link -> {
            if (isTracingEnabled) {
                Context entityContext = finalSharedContext.addData((Object)"entity-path", (Object)link.getEntityPath());
                parentContext.set(this.tracerProvider.startSpan(entityContext.addData((Object)"hostname", (Object)link.getHostname()), ProcessKind.SEND));
            }
            if (transactionContext != null && transactionContext.getTransactionId() != null) {
                TransactionalState deliveryState = new TransactionalState();
                deliveryState.setTxnId(new Binary(transactionContext.getTransactionId().array()));
                return messages.size() == 1 ? link.send((Message)messages.get(0), (DeliveryState)deliveryState) : link.send(messages, (DeliveryState)deliveryState);
            }
            return messages.size() == 1 ? link.send((Message)messages.get(0)) : link.send(messages);
        }).doOnEach(signal -> {
            if (isTracingEnabled) {
                this.tracerProvider.endSpan((Context)parentContext.get(), signal);
            }
        }).doOnError(error -> {
            if (isTracingEnabled) {
                this.tracerProvider.endSpan((Context)parentContext.get(), Signal.error((Throwable)error));
            }
        }), (Duration)this.retryOptions.getTryTimeout(), (AmqpRetryPolicy)this.retryPolicy);
    }

    private Mono<Void> sendInternal(Flux<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext) {
        return this.getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> {
            int batchSize = size > 0 ? size : 262144;
            CreateBatchOptions batchOptions = new CreateBatchOptions().setMaximumSizeInBytes(batchSize);
            return messages.collect((Collector)new AmqpMessageCollector(batchOptions, 1, () -> ((AmqpSendLink)link).getErrorContext(), this.tracerProvider, this.messageSerializer));
        }).flatMap(list -> this.sendInternalBatch((Flux<ServiceBusMessageBatch>)Flux.fromIterable((Iterable)list), transactionContext)));
    }

    private Mono<Void> sendInternalBatch(Flux<ServiceBusMessageBatch> eventBatches, ServiceBusTransactionContext transactionContext) {
        return eventBatches.flatMap(messageBatch -> this.sendInternal((ServiceBusMessageBatch)messageBatch, transactionContext)).then().doOnError(error -> this.logger.error("Error sending batch.", new Object[]{error}));
    }

    private Mono<AmqpSendLink> getSendLink() {
        return this.connectionProcessor.flatMap(connection -> connection.createSendLink(this.entityName, this.entityName, this.retryOptions)).doOnNext(next -> this.linkName.compareAndSet(null, next.getLinkName()));
    }

    private static class AmqpMessageCollector
    implements Collector<ServiceBusMessage, List<ServiceBusMessageBatch>, List<ServiceBusMessageBatch>> {
        private final int maxMessageSize;
        private final Integer maxNumberOfBatches;
        private final ErrorContextProvider contextProvider;
        private final TracerProvider tracerProvider;
        private final MessageSerializer serializer;
        private volatile ServiceBusMessageBatch currentBatch;

        AmqpMessageCollector(CreateBatchOptions options, Integer maxNumberOfBatches, ErrorContextProvider contextProvider, TracerProvider tracerProvider, MessageSerializer serializer) {
            this.maxNumberOfBatches = maxNumberOfBatches;
            this.maxMessageSize = options.getMaximumSizeInBytes() > 0 ? options.getMaximumSizeInBytes() : 262144;
            this.contextProvider = contextProvider;
            this.tracerProvider = tracerProvider;
            this.serializer = serializer;
            this.currentBatch = new ServiceBusMessageBatch(this.maxMessageSize, contextProvider, tracerProvider, serializer);
        }

        @Override
        public Supplier<List<ServiceBusMessageBatch>> supplier() {
            return ArrayList::new;
        }

        @Override
        public BiConsumer<List<ServiceBusMessageBatch>, ServiceBusMessage> accumulator() {
            return (list, event) -> {
                ServiceBusMessageBatch batch = this.currentBatch;
                if (batch.tryAdd((ServiceBusMessage)event)) {
                    return;
                }
                if (this.maxNumberOfBatches != null && list.size() == this.maxNumberOfBatches.intValue()) {
                    String message = String.format(Locale.US, "EventData does not fit into maximum number of batches. '%s'", this.maxNumberOfBatches);
                    throw new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, message, this.contextProvider.getErrorContext());
                }
                this.currentBatch = new ServiceBusMessageBatch(this.maxMessageSize, this.contextProvider, this.tracerProvider, this.serializer);
                this.currentBatch.tryAdd((ServiceBusMessage)event);
                list.add(batch);
            };
        }

        @Override
        public BinaryOperator<List<ServiceBusMessageBatch>> combiner() {
            return (existing, another) -> {
                existing.addAll(another);
                return existing;
            };
        }

        @Override
        public Function<List<ServiceBusMessageBatch>, List<ServiceBusMessageBatch>> finisher() {
            return list -> {
                ServiceBusMessageBatch batch = this.currentBatch;
                this.currentBatch = null;
                if (batch != null) {
                    list.add(batch);
                }
                return list;
            };
        }

        @Override
        public Set<Collector.Characteristics> characteristics() {
            return Collections.emptySet();
        }
    }
}

