/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ObservationCollector;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodec;
import com.rabbitmq.stream.impl.Clock;
import com.rabbitmq.stream.impl.DynamicBatch;
import com.rabbitmq.stream.impl.MessageAccumulator;
import com.rabbitmq.stream.impl.ProducerUtils;
import com.rabbitmq.stream.impl.StreamProducer;
import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.function.ToLongFunction;

final class DynamicBatchMessageAccumulator
implements MessageAccumulator {
    private final DynamicBatch<Object> dynamicBatch;
    private final ObservationCollector<Object> observationCollector;
    private final StreamProducer producer;
    private final ProducerUtils.MessageAccumulatorHelper helper;

    DynamicBatchMessageAccumulator(int subEntrySize, int batchSize, int maxUnconfirmedMessages, Codec codec, int maxFrameSize, ToLongFunction<Message> publishSequenceFunction, Function<Message, String> filterValueExtractor, Clock clock, String stream, CompressionCodec compressionCodec, ByteBufAllocator byteBufAllocator, ObservationCollector<?> observationCollector, StreamProducer producer) {
        boolean shouldObserve;
        this.helper = new ProducerUtils.MessageAccumulatorHelper(codec, maxFrameSize, publishSequenceFunction, filterValueExtractor, clock, stream, observationCollector);
        this.producer = producer;
        this.observationCollector = observationCollector;
        boolean bl = shouldObserve = !this.observationCollector.isNoop();
        if (subEntrySize <= 1) {
            this.dynamicBatch = new DynamicBatch(items -> {
                boolean result = this.publish(items);
                if (result && shouldObserve) {
                    items.forEach(i -> {
                        ProducerUtils.AccumulatedEntity entity = (ProducerUtils.AccumulatedEntity)i;
                        this.observationCollector.published(entity.observationContext(), entity.confirmationCallback().message());
                    });
                }
                return result;
            }, batchSize, maxUnconfirmedMessages);
        } else {
            byte compressionCode = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
            this.dynamicBatch = new DynamicBatch(items -> {
                boolean result;
                ArrayList<Object> subBatches = new ArrayList<Object>();
                int count = 0;
                ProducerUtils.Batch batch = this.helper.batch(byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
                ProducerUtils.AccumulatedEntity lastMessageInBatch = null;
                for (Object msg : items) {
                    ProducerUtils.AccumulatedEntity message;
                    lastMessageInBatch = message = (ProducerUtils.AccumulatedEntity)msg;
                    batch.add((Codec.EncodedMessage)message.encodedEntity(), message.confirmationCallback());
                    if (++count != subEntrySize) continue;
                    batch.time = lastMessageInBatch.time();
                    batch.publishingId = lastMessageInBatch.publishingId();
                    batch.encodedMessageBatch.close();
                    subBatches.add(batch);
                    lastMessageInBatch = null;
                    batch = this.helper.batch(byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
                    count = 0;
                }
                if (!batch.isEmpty() && count < subEntrySize) {
                    batch.time = lastMessageInBatch.time();
                    batch.publishingId = lastMessageInBatch.publishingId();
                    batch.encodedMessageBatch.close();
                    subBatches.add(batch);
                }
                if ((result = this.publish(subBatches)) && shouldObserve) {
                    for (Object msg : items) {
                        ProducerUtils.AccumulatedEntity message = (ProducerUtils.AccumulatedEntity)msg;
                        this.observationCollector.published(message.observationContext(), message.confirmationCallback().message());
                    }
                }
                return result;
            }, batchSize * subEntrySize, maxUnconfirmedMessages);
        }
    }

    @Override
    public void add(Message message, ConfirmationHandler confirmationHandler) {
        this.dynamicBatch.add(this.helper.entity(message, confirmationHandler));
    }

    @Override
    public int size() {
        return 0;
    }

    @Override
    public void flush(boolean force) {
    }

    private boolean publish(List<Object> entities) {
        if (this.producer.canSend()) {
            this.producer.publishInternal(entities);
            return true;
        }
        return false;
    }

    @Override
    public void close() {
        this.dynamicBatch.close();
    }
}

