/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Clock;
import com.rabbitmq.stream.impl.MessageAccumulator;
import com.rabbitmq.stream.impl.StreamProducer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.ToLongFunction;

class SimpleMessageAccumulator
implements MessageAccumulator {
    protected final BlockingQueue<MessageAccumulator.AccumulatedEntity> messages;
    protected final Clock clock;
    private final int capacity;
    private final Codec codec;
    private final int maxFrameSize;
    private final ToLongFunction<Message> publishSequenceFunction;

    SimpleMessageAccumulator(int capacity, Codec codec, int maxFrameSize, ToLongFunction<Message> publishSequenceFunction, Clock clock) {
        this.capacity = capacity;
        this.messages = new LinkedBlockingQueue<MessageAccumulator.AccumulatedEntity>(capacity);
        this.codec = codec;
        this.maxFrameSize = maxFrameSize;
        this.publishSequenceFunction = publishSequenceFunction;
        this.clock = clock;
    }

    @Override
    public boolean add(Message message, ConfirmationHandler confirmationHandler) {
        Codec.EncodedMessage encodedMessage = this.codec.encode(message);
        Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
        long publishingId = this.publishSequenceFunction.applyAsLong(message);
        try {
            boolean offered = this.messages.offer(new SimpleAccumulatedEntity(this.clock.time(), publishingId, encodedMessage, new SimpleConfirmationCallback(message, confirmationHandler)), 60L, TimeUnit.SECONDS);
            if (!offered) {
                throw new StreamException("Could not accumulate outbound message");
            }
        }
        catch (InterruptedException e) {
            throw new StreamException("Error while accumulating outbound message", e);
        }
        return this.messages.size() == this.capacity;
    }

    @Override
    public MessageAccumulator.AccumulatedEntity get() {
        return (MessageAccumulator.AccumulatedEntity)this.messages.poll();
    }

    @Override
    public boolean isEmpty() {
        return this.messages.isEmpty();
    }

    @Override
    public int size() {
        return this.messages.size();
    }

    private static final class SimpleConfirmationCallback
    implements StreamProducer.ConfirmationCallback {
        private final Message message;
        private final ConfirmationHandler confirmationHandler;

        private SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
            this.message = message;
            this.confirmationHandler = confirmationHandler;
        }

        @Override
        public int handle(boolean confirmed, short code) {
            this.confirmationHandler.handle(new ConfirmationStatus(this.message, confirmed, code));
            return 1;
        }
    }

    private static final class SimpleAccumulatedEntity
    implements MessageAccumulator.AccumulatedEntity {
        private final long time;
        private final long publishingId;
        private final Codec.EncodedMessage encodedMessage;
        private final StreamProducer.ConfirmationCallback confirmationCallback;

        private SimpleAccumulatedEntity(long time, long publishingId, Codec.EncodedMessage encodedMessage, StreamProducer.ConfirmationCallback confirmationCallback) {
            this.time = time;
            this.publishingId = publishingId;
            this.encodedMessage = encodedMessage;
            this.confirmationCallback = confirmationCallback;
        }

        @Override
        public long publishindId() {
            return this.publishingId;
        }

        @Override
        public Object encodedEntity() {
            return this.encodedMessage;
        }

        @Override
        public long time() {
            return this.time;
        }

        @Override
        public StreamProducer.ConfirmationCallback confirmationCallback() {
            return this.confirmationCallback;
        }
    }
}

