/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnectionCache;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusMessageBatch;
import com.azure.messaging.servicebus.ServiceBusSenderInstrumentation;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.CreateMessageBatchOptions;
import java.nio.ByteBuffer;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.StreamSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ServiceClient(builder=ServiceBusClientBuilder.class, isAsync=true)
public final class ServiceBusSenderAsyncClient
implements AutoCloseable {
    static final int MAX_MESSAGE_LENGTH_BYTES = 262144;
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions();
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSenderAsyncClient.class);
    private final AtomicReference<String> linkName = new AtomicReference();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final MessageSerializer messageSerializer;
    private final AmqpRetryOptions retryOptions;
    private final AmqpRetryPolicy retryPolicy;
    private final MessagingEntityType entityType;
    private final Runnable onClientClose;
    private final String entityName;
    private final ReactorConnectionCache<ServiceBusReactorAmqpConnection> connectionCache;
    private final String viaEntityName;
    private final String identifier;
    private final ServiceBusSenderInstrumentation instrumentation;
    private final ServiceBusTracer tracer;

    ServiceBusSenderAsyncClient(String entityName, MessagingEntityType entityType, ReactorConnectionCache<ServiceBusReactorAmqpConnection> connectionCache, AmqpRetryOptions retryOptions, ServiceBusSenderInstrumentation instrumentation, MessageSerializer messageSerializer, Runnable onClientClose, String viaEntityName, String identifier) {
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null.");
        this.entityName = Objects.requireNonNull(entityName, "'entityPath' cannot be null.");
        this.connectionCache = Objects.requireNonNull(connectionCache, "'connectionCache' cannot be null.");
        this.instrumentation = Objects.requireNonNull(instrumentation, "'instrumentation' cannot be null.");
        this.tracer = instrumentation.getTracer();
        this.retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)retryOptions);
        this.entityType = entityType;
        this.viaEntityName = viaEntityName;
        this.onClientClose = onClientClose;
        this.identifier = identifier;
    }

    public String getFullyQualifiedNamespace() {
        return this.connectionCache.getFullyQualifiedNamespace();
    }

    public String getEntityPath() {
        return this.entityName;
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public Mono<Void> sendMessage(ServiceBusMessage message) {
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        return this.sendInternal((Flux<ServiceBusMessage>)Flux.just((Object)message), null);
    }

    public Mono<Void> sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.sendInternal((Flux<ServiceBusMessage>)Flux.just((Object)message), transactionContext);
    }

    public Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.sendIterable(messages, transactionContext);
    }

    public Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages) {
        return this.sendIterable(messages, null);
    }

    public Mono<Void> sendMessages(ServiceBusMessageBatch batch) {
        return this.sendInternal(batch, null);
    }

    public Mono<Void> sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.sendInternal(batch, transactionContext);
    }

    public Mono<ServiceBusMessageBatch> createMessageBatch() {
        return this.createMessageBatch(DEFAULT_BATCH_OPTIONS);
    }

    public Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions options) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "createMessageBatch")));
        }
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        int maxSize = options.getMaximumSizeInBytes();
        return this.getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> {
            int maximumLinkSize;
            int n = maximumLinkSize = size > 0 ? size : 262144;
            if (maxSize > maximumLinkSize) {
                return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException(String.format(Locale.US, "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size (%s bytes).", maxSize, maximumLinkSize)));
            }
            int batchSize = maxSize > 0 ? maxSize : maximumLinkSize;
            return Mono.just((Object)new ServiceBusMessageBatch(batchSize, () -> ((AmqpSendLink)link).getErrorContext(), this.tracer, this.messageSerializer));
        })).onErrorMap(this::mapError);
    }

    public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.scheduleMessageInternal(message, scheduledEnqueueTime, transactionContext);
    }

    public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime) {
        return this.scheduleMessageInternal(message, scheduledEnqueueTime, null);
    }

    public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime) {
        return this.scheduleMessages(messages, scheduledEnqueueTime, null);
    }

    public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "scheduleMessages")));
        }
        if (Objects.isNull(messages)) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'messages' cannot be null."));
        }
        if (Objects.isNull(scheduledEnqueueTime)) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'scheduledEnqueueTime' cannot be null."));
        }
        return this.createMessageBatch().map(messageBatch -> {
            int index = 0;
            for (ServiceBusMessage message : messages) {
                if (!messageBatch.tryAddMessage(message)) {
                    String error = String.format(Locale.US, "Messages exceed max allowed size for all the messages together. Failed to add message at index '%s'.", index);
                    throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException(error));
                }
                ++index;
            }
            return messageBatch;
        }).flatMapMany(messageBatch -> this.tracer.traceFluxWithLinks("ServiceBus.scheduleMessages", this.connectionCache.get().flatMap(connection -> connection.getManagementNode(this.entityName, this.entityType)).flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime, messageBatch.getMaxSizeInBytes(), this.linkName.get(), transactionContext)), messageBatch.getMessages(), ServiceBusMessage::getContext)).onErrorMap(this::mapError);
    }

    public Mono<Void> cancelScheduledMessage(long sequenceNumber) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "cancelScheduledMessage")));
        }
        if (sequenceNumber < 0L) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'sequenceNumber' cannot be negative."));
        }
        return this.tracer.traceMono("ServiceBus.cancelScheduledMessage", this.connectionCache.get().flatMap(connection -> connection.getManagementNode(this.entityName, this.entityType)).flatMap(managementNode -> managementNode.cancelScheduledMessages(Collections.singletonList(sequenceNumber), this.linkName.get()))).onErrorMap(this::mapError);
    }

    public Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "cancelScheduledMessages")));
        }
        if (Objects.isNull(sequenceNumbers)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'messages' cannot be null."));
        }
        return this.tracer.traceMono("ServiceBus.cancelScheduledMessages", this.connectionCache.get().flatMap(connection -> connection.getManagementNode(this.entityName, this.entityType)).flatMap(managementNode -> managementNode.cancelScheduledMessages(sequenceNumbers, this.linkName.get()))).onErrorMap(this::mapError);
    }

    public Mono<ServiceBusTransactionContext> createTransaction() {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "createTransaction")));
        }
        return this.tracer.traceMono("ServiceBus.createTransaction", this.connectionCache.get().flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.createTransaction()).map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId()))).onErrorMap(this::mapError);
    }

    public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "commitTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.tracer.traceMono("ServiceBus.commitTransaction", this.connectionCache.get().flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.commitTransaction(new AmqpTransaction(transactionContext.getTransactionId())))).onErrorMap(this::mapError);
    }

    public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "rollbackTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.tracer.traceMono("ServiceBus.rollbackTransaction", this.connectionCache.get().flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.rollbackTransaction(new AmqpTransaction(transactionContext.getTransactionId())))).onErrorMap(this::mapError);
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.onClientClose.run();
    }

    private Mono<Void> sendIterable(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transaction) {
        if (Objects.isNull(messages)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'messages' cannot be null."));
        }
        return this.createMessageBatch().flatMap(messageBatch -> {
            StreamSupport.stream(messages.spliterator(), false).forEach(message -> messageBatch.tryAddMessage((ServiceBusMessage)message));
            return this.sendInternal((ServiceBusMessageBatch)messageBatch, transaction);
        }).onErrorMap(this::mapError);
    }

    private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "scheduleMessage")));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        if (Objects.isNull(scheduledEnqueueTime)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'scheduledEnqueueTime' cannot be null."));
        }
        return this.tracer.traceMonoWithLink("ServiceBus.scheduleMessage", this.getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> {
            int maxSize = size > 0 ? size : 262144;
            return this.connectionCache.get().flatMap(connection -> connection.getManagementNode(this.entityName, this.entityType)).flatMap(managementNode -> managementNode.schedule(Arrays.asList(message), scheduledEnqueueTime, maxSize, link.getLinkName(), transactionContext).next());
        })), message, message.getContext()).onErrorMap(this::mapError);
    }

    private Mono<Void> sendInternal(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "sendMessages")));
        }
        if (Objects.isNull(batch)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'batch' cannot be null."));
        }
        if (batch.getMessages().isEmpty()) {
            LOGGER.info("Cannot send an EventBatch that is empty.");
            return Mono.empty();
        }
        LOGGER.atInfo().addKeyValue("batchSize", (long)batch.getCount()).log("Sending batch.");
        List messages = Collections.synchronizedList(new ArrayList());
        batch.getMessages().forEach(serviceBusMessage -> {
            Message message = this.messageSerializer.serialize(serviceBusMessage);
            MessageAnnotations messageAnnotations = message.getMessageAnnotations() == null ? new MessageAnnotations(new HashMap()) : message.getMessageAnnotations();
            message.setMessageAnnotations(messageAnnotations);
            messages.add(message);
        });
        Mono sendMessage = this.getSendLink().flatMap(link -> {
            if (transactionContext != null && transactionContext.getTransactionId() != null) {
                TransactionalState deliveryState = new TransactionalState();
                deliveryState.setTxnId(Binary.create((ByteBuffer)transactionContext.getTransactionId()));
                return messages.size() == 1 ? link.send((Message)messages.get(0), (DeliveryState)deliveryState) : link.send(messages, (DeliveryState)deliveryState);
            }
            return messages.size() == 1 ? link.send((Message)messages.get(0)) : link.send(messages);
        });
        Mono sendWithRetry = RetryUtil.withRetry((Mono)sendMessage, (AmqpRetryOptions)this.retryOptions, (String)String.format("entityPath[%s], partitionId[%s]: Sending messages timed out.", this.entityName, batch.getCount())).onErrorMap(this::mapError);
        return this.instrumentation.instrumentSendBatch("ServiceBus.send", sendWithRetry, batch.getMessages());
    }

    private Mono<Void> sendInternal(Flux<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "sendMessage")));
        }
        return RetryUtil.withRetry(this.getSendLink(), (AmqpRetryOptions)this.retryOptions, (String)("Failed to create send link " + this.linkName)).flatMap(link -> link.getLinkSize().flatMap(size -> {
            int batchSize = size > 0 ? size : 262144;
            CreateMessageBatchOptions batchOptions = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize);
            return messages.collect((Collector)new AmqpMessageCollector(batchOptions, 1, () -> ((AmqpSendLink)link).getErrorContext(), this.tracer, this.messageSerializer));
        }).flatMap(list -> this.sendInternalBatch((Flux<ServiceBusMessageBatch>)Flux.fromIterable((Iterable)list), transactionContext))).onErrorMap(this::mapError);
    }

    private Mono<Void> sendInternalBatch(Flux<ServiceBusMessageBatch> eventBatches, ServiceBusTransactionContext transactionContext) {
        return eventBatches.flatMap(messageBatch -> this.sendInternal((ServiceBusMessageBatch)messageBatch, transactionContext)).then().doOnError(error -> LOGGER.error("Error sending batch.", new Object[]{error}));
    }

    private Mono<AmqpSendLink> getSendLink() {
        return this.connectionCache.get().flatMap(connection -> {
            if (!CoreUtils.isNullOrEmpty((CharSequence)this.viaEntityName)) {
                return connection.createSendLink("VIA-".concat(this.viaEntityName), this.viaEntityName, this.retryOptions, this.entityName, this.identifier);
            }
            return connection.createSendLink(this.entityName, this.entityName, this.retryOptions, null, this.identifier);
        }).doOnNext(next -> this.linkName.compareAndSet(null, next.getLinkName()));
    }

    private Throwable mapError(Throwable throwable) {
        if (!(throwable instanceof ServiceBusException)) {
            return new ServiceBusException(throwable, ServiceBusErrorSource.SEND);
        }
        return throwable;
    }

    private static class AmqpMessageCollector
    implements Collector<ServiceBusMessage, List<ServiceBusMessageBatch>, List<ServiceBusMessageBatch>> {
        private final int maxMessageSize;
        private final Integer maxNumberOfBatches;
        private final ErrorContextProvider contextProvider;
        private final ServiceBusTracer tracer;
        private final MessageSerializer serializer;
        private volatile ServiceBusMessageBatch currentBatch;

        AmqpMessageCollector(CreateMessageBatchOptions options, Integer maxNumberOfBatches, ErrorContextProvider contextProvider, ServiceBusTracer tracer, MessageSerializer serializer) {
            this.maxNumberOfBatches = maxNumberOfBatches;
            this.maxMessageSize = options.getMaximumSizeInBytes() > 0 ? options.getMaximumSizeInBytes() : 262144;
            this.contextProvider = contextProvider;
            this.tracer = tracer;
            this.serializer = serializer;
            this.currentBatch = new ServiceBusMessageBatch(this.maxMessageSize, contextProvider, tracer, serializer);
        }

        @Override
        public Supplier<List<ServiceBusMessageBatch>> supplier() {
            return ArrayList::new;
        }

        @Override
        public BiConsumer<List<ServiceBusMessageBatch>, ServiceBusMessage> accumulator() {
            return (list, event) -> {
                ServiceBusMessageBatch batch = this.currentBatch;
                if (batch.tryAddMessage((ServiceBusMessage)event)) {
                    return;
                }
                if (this.maxNumberOfBatches != null && list.size() == this.maxNumberOfBatches.intValue()) {
                    String message = String.format(Locale.US, "EventData does not fit into maximum number of batches. '%s'", this.maxNumberOfBatches);
                    throw new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, message, this.contextProvider.getErrorContext());
                }
                this.currentBatch = new ServiceBusMessageBatch(this.maxMessageSize, this.contextProvider, this.tracer, this.serializer);
                this.currentBatch.tryAddMessage((ServiceBusMessage)event);
                list.add(batch);
            };
        }

        @Override
        public BinaryOperator<List<ServiceBusMessageBatch>> combiner() {
            return (existing, another) -> {
                existing.addAll(another);
                return existing;
            };
        }

        @Override
        public Function<List<ServiceBusMessageBatch>, List<ServiceBusMessageBatch>> finisher() {
            return list -> {
                ServiceBusMessageBatch batch = this.currentBatch;
                this.currentBatch = null;
                if (batch != null) {
                    list.add(batch);
                }
                return list;
            };
        }

        @Override
        public Set<Collector.Characteristics> characteristics() {
            return Collections.emptySet();
        }
    }
}

