/*
 * Decompiled with CFR 0.152.
 */
package com.github.fridujo.rabbitmq.mock;

import com.github.fridujo.rabbitmq.mock.AmqArguments;
import com.github.fridujo.rabbitmq.mock.DeadLettering;
import com.github.fridujo.rabbitmq.mock.Message;
import com.github.fridujo.rabbitmq.mock.MessageComparator;
import com.github.fridujo.rabbitmq.mock.MockChannel;
import com.github.fridujo.rabbitmq.mock.MockConnection;
import com.github.fridujo.rabbitmq.mock.Receiver;
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
import com.github.fridujo.rabbitmq.mock.tool.Exceptions;
import com.github.fridujo.rabbitmq.mock.tool.NamedThreadFactory;
import com.github.fridujo.rabbitmq.mock.tool.RestartableExecutorService;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MockQueue
implements Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class);
    private static final long SLEEPING_TIME_BETWEEN_SUBMISSIONS_TO_CONSUMERS = 30L;
    private final String name;
    private final ReceiverPointer pointer;
    private final AmqArguments arguments;
    private final ReceiverRegistry receiverRegistry;
    private final Queue<Message> messages;
    private final RestartableExecutorService executorService;
    private final Map<String, ConsumerAndTag> consumersByTag = new LinkedHashMap<String, ConsumerAndTag>();
    private final AtomicInteger consumerRollingSequence = new AtomicInteger();
    private final AtomicInteger messageSequence = new AtomicInteger();
    private final Map<Long, Message> unackedMessagesByDeliveryTag = new LinkedHashMap<Long, Message>();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final Map<String, Set<Long>> unackedDeliveryTagsByConsumerTag = new LinkedHashMap<String, Set<Long>>();

    public MockQueue(String name, AmqArguments arguments, ReceiverRegistry receiverRegistry) {
        this.name = name;
        this.pointer = new ReceiverPointer(ReceiverPointer.Type.QUEUE, name);
        this.arguments = arguments;
        this.receiverRegistry = receiverRegistry;
        this.messages = new PriorityBlockingQueue<Message>(11, new MessageComparator(arguments));
        this.executorService = new RestartableExecutorService(() -> Executors.newFixedThreadPool(1, new NamedThreadFactory(i -> name + "_queue_consuming")));
        this.start();
    }

    private void start() {
        this.executorService.submit(() -> {
            while (this.running.get()) {
                while (this.deliverToConsumerIfPossible()) {
                }
                Exceptions.runAndTransformExceptions(() -> TimeUnit.MILLISECONDS.sleep(30L), e -> new RuntimeException("Queue " + this.name + " consumer Thread have been interrupted", (Throwable)e));
            }
        });
    }

    private boolean deliverToConsumerIfPossible() {
        if (!this.running.get()) {
            LOGGER.debug(this.localized("shutting down"));
            return false;
        }
        boolean delivered = false;
        delivered = this.deadLetterFirstMessageIfExpired();
        if (this.consumersByTag.size() > 0) {
            Message message = this.messages.poll();
            if (message != null) {
                if (message.isExpired()) {
                    this.deadLetterWithReason(message, DeadLettering.ReasonType.EXPIRED);
                } else {
                    int index = this.consumerRollingSequence.incrementAndGet() % this.consumersByTag.size();
                    ConsumerAndTag nextConsumer = new ArrayList<ConsumerAndTag>(this.consumersByTag.values()).get(index);
                    long deliveryTag = (Long)nextConsumer.deliveryTagSupplier.get();
                    this.unackedMessagesByDeliveryTag.put(deliveryTag, message);
                    this.unackedDeliveryTagsByConsumerTag.computeIfAbsent(nextConsumer.tag, cTag -> new LinkedHashSet()).add(deliveryTag);
                    Envelope envelope = new Envelope(deliveryTag, message.redelivered, message.exchangeName, message.routingKey);
                    try {
                        LOGGER.debug(this.localized("delivering message to consumer"));
                        nextConsumer.mockChannel.getMetricsCollector().consumedMessage((Channel)nextConsumer.mockChannel, deliveryTag, nextConsumer.tag);
                        nextConsumer.consumer.handleDelivery(nextConsumer.tag, envelope, message.props, message.body);
                        if (nextConsumer.autoAck) {
                            this.internal_removeFromUnacked(deliveryTag);
                        }
                        delivered = true;
                    }
                    catch (IOException e) {
                        LOGGER.warn(this.localized("Unable to deliver message to consumer [" + nextConsumer.tag + "]"));
                        this.basicReject(deliveryTag, true);
                    }
                }
            } else {
                LOGGER.trace(this.localized("no consumer to deliver message to"));
            }
        }
        return delivered;
    }

    private boolean deadLetterFirstMessageIfExpired() {
        Message expiredMessage;
        Message potentiallyExpired = this.messages.peek();
        if (potentiallyExpired != null && potentiallyExpired.isExpired() && potentiallyExpired == (expiredMessage = this.messages.poll())) {
            this.deadLetterWithReason(expiredMessage, DeadLettering.ReasonType.EXPIRED);
            return true;
        }
        return false;
    }

    @Override
    public boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body) {
        boolean queueLengthLimitReached;
        boolean bl = queueLengthLimitReached = this.queueLengthLimitReached() || this.queueLengthBytesLimitReached();
        if (queueLengthLimitReached && this.arguments.overflow() == AmqArguments.Overflow.REJECT_PUBLISH) {
            return true;
        }
        Message message = new Message(this.messageSequence.incrementAndGet(), exchangeName, routingKey, props, body, this.computeExpiryTime(props));
        if (message.expiryTime != -1L) {
            LOGGER.debug(this.localized("Message published expiring at " + Instant.ofEpochMilli(message.expiryTime)) + ": " + message);
        } else {
            LOGGER.debug(this.localized("Message published: " + message));
        }
        this.messages.offer(message);
        if (queueLengthLimitReached) {
            this.deadLetterWithReason(this.messages.poll(), DeadLettering.ReasonType.MAX_LEN);
        }
        return true;
    }

    @Override
    public ReceiverPointer pointer() {
        return this.pointer;
    }

    public void basicConsume(String consumerTag, com.rabbitmq.client.Consumer consumer, boolean autoAck, Supplier<Long> deliveryTagSupplier, MockConnection mockConnection, MockChannel mockChannel) {
        LOGGER.debug(this.localized("registering consumer"));
        this.consumersByTag.put(consumerTag, new ConsumerAndTag(consumerTag, consumer, autoAck, deliveryTagSupplier, mockConnection, mockChannel));
        consumer.handleConsumeOk(consumerTag);
    }

    public GetResponse basicGet(boolean autoAck, Supplier<Long> deliveryTagSupplier) {
        long deliveryTag = deliveryTagSupplier.get();
        Message message = this.messages.poll();
        if (message != null) {
            if (message.isExpired()) {
                this.deadLetterWithReason(message, DeadLettering.ReasonType.EXPIRED);
                return null;
            }
            if (!autoAck) {
                this.unackedMessagesByDeliveryTag.put(deliveryTag, message);
            }
            Envelope envelope = new Envelope(deliveryTag, false, message.exchangeName, message.routingKey);
            LOGGER.debug(this.localized("basic_get a message"));
            return new GetResponse(envelope, message.props, message.body, this.messages.size());
        }
        LOGGER.debug(this.localized("basic_get no message available"));
        return null;
    }

    public void basicAck(long deliveryTag, boolean multiple) {
        if (multiple) {
            this.doWithUnackedUntil(deliveryTag, this::internal_removeFromUnacked);
        } else {
            this.internal_removeFromUnacked(deliveryTag);
        }
    }

    public void basicNack(long deliveryTag, boolean multiple, boolean requeue) {
        if (multiple) {
            this.doWithUnackedUntil(deliveryTag, relevantDeliveryTag -> this.basicReject((long)relevantDeliveryTag, requeue));
        } else {
            this.basicReject(deliveryTag, requeue);
        }
    }

    public void basicReject(long deliveryTag, boolean requeue) {
        Message nacked = this.internal_removeFromUnacked(deliveryTag);
        if (nacked != null) {
            if (requeue) {
                this.messages.offer(nacked.asRedelivered());
            } else {
                this.deadLetterWithReason(nacked, DeadLettering.ReasonType.REJECTED);
            }
        }
    }

    private Message internal_removeFromUnacked(long deliveryTag) {
        Message message = this.unackedMessagesByDeliveryTag.remove(deliveryTag);
        this.unackedDeliveryTagsByConsumerTag.forEach((ctag, deliveryTags) -> deliveryTags.remove(deliveryTag));
        return message;
    }

    private String localized(String message) {
        return "[Q " + this.name + "] " + message;
    }

    public void basicCancel(String consumerTag) {
        if (this.consumersByTag.containsKey(consumerTag)) {
            com.rabbitmq.client.Consumer consumer = this.consumersByTag.remove(consumerTag).consumer;
            consumer.handleCancelOk(consumerTag);
            new HashSet(this.unackedDeliveryTagsByConsumerTag.computeIfAbsent(consumerTag, k -> Collections.emptySet())).forEach(deliveryTag -> this.basicReject((long)deliveryTag, true));
        }
    }

    synchronized void restartDeliveryLoop() {
        if (!this.running.get()) {
            this.running.set(true);
            this.executorService.restart();
            this.start();
        }
    }

    void notifyDeleted() {
        this.close();
    }

    void close(MockConnection mockConnection) {
        this.consumersByTag.entrySet().removeIf(e -> {
            boolean mustCancelConsumer;
            boolean bl = mustCancelConsumer = ((ConsumerAndTag)e.getValue()).mockConnection == mockConnection;
            if (mustCancelConsumer) {
                this.cancel((ConsumerAndTag)e.getValue());
            }
            return mustCancelConsumer;
        });
        if (this.consumersByTag.isEmpty()) {
            this.running.set(false);
            this.stopDeliveryLoop();
        }
    }

    private void close() {
        this.running.set(false);
        this.cancelConsumers();
        this.stopDeliveryLoop();
    }

    private void stopDeliveryLoop() {
        this.executorService.shutdown();
        Exceptions.runAndEatExceptions(() -> this.executorService.awaitTermination(90L, TimeUnit.MILLISECONDS));
    }

    private void cancelConsumers() {
        for (ConsumerAndTag consumerAndTag : this.consumersByTag.values()) {
            this.cancel(consumerAndTag);
        }
        this.consumersByTag.clear();
    }

    private void cancel(ConsumerAndTag consumerAndTag) {
        try {
            consumerAndTag.consumer.handleCancel(consumerAndTag.tag);
        }
        catch (IOException e) {
            LOGGER.warn("Consumer threw an exception when notified about cancellation", (Throwable)e);
        }
    }

    public void basicRecover(boolean requeue) {
        LinkedHashSet<Long> unackedDeliveryTags = new LinkedHashSet<Long>(this.unackedMessagesByDeliveryTag.keySet());
        unackedDeliveryTags.forEach(unackedDeliveryTag -> this.messages.offer(this.internal_removeFromUnacked((long)unackedDeliveryTag)));
        this.consumersByTag.values().forEach(consumerAndTag -> ((ConsumerAndTag)consumerAndTag).consumer.handleRecoverOk(((ConsumerAndTag)consumerAndTag).tag));
    }

    public int messageCount() {
        return this.messages.size();
    }

    public int consumerCount() {
        return this.consumersByTag.size();
    }

    public int purge() {
        int messageCount = this.messageCount();
        this.messages.clear();
        return messageCount;
    }

    private void doWithUnackedUntil(long maxDeliveryTag, Consumer<Long> doWithRelevantDeliveryTag) {
        if (this.unackedMessagesByDeliveryTag.containsKey(maxDeliveryTag)) {
            LinkedHashSet<Long> storedDeliveryTagsToRemove = new LinkedHashSet<Long>();
            for (Long storedDeliveryTag : this.unackedMessagesByDeliveryTag.keySet()) {
                storedDeliveryTagsToRemove.add(storedDeliveryTag);
                if (!Long.valueOf(maxDeliveryTag).equals(storedDeliveryTag)) continue;
                break;
            }
            storedDeliveryTagsToRemove.forEach(doWithRelevantDeliveryTag);
        }
    }

    private boolean queueLengthLimitReached() {
        return this.arguments.queueLengthLimit().map(limit -> limit <= this.messages.size()).orElse(false);
    }

    private boolean queueLengthBytesLimitReached() {
        int messageBytesReady = this.messages.stream().mapToInt(m -> m.body.length).sum();
        return this.arguments.queueLengthBytesLimit().map(limit -> limit <= messageBytesReady).orElse(false);
    }

    private long computeExpiryTime(AMQP.BasicProperties props) {
        long messageExpiryTimeOfQueue = this.arguments.getMessageTtlOfQueue().map(this::computeExpiry).orElse(-1L);
        return this.getMessageExpiryTime(props).orElse(messageExpiryTimeOfQueue);
    }

    private Optional<Long> getMessageExpiryTime(AMQP.BasicProperties props) {
        return Optional.ofNullable(props.getExpiration()).flatMap(this::toLong).map(this::computeExpiry);
    }

    private Long computeExpiry(long ttl) {
        return System.currentTimeMillis() + ttl;
    }

    private Optional<Long> toLong(String s) {
        try {
            return Optional.of(Long.parseLong(s));
        }
        catch (NumberFormatException e) {
            return Optional.empty();
        }
    }

    public String toString() {
        return "Queue " + this.name;
    }

    private void deadLetterWithReason(Message message, DeadLettering.ReasonType reason) {
        this.arguments.getDeadLetterExchange().flatMap(this.receiverRegistry::getReceiver).ifPresent(deadLetterExchange -> {
            LOGGER.debug(this.localized("dead-lettered to " + deadLetterExchange + ": " + message));
            DeadLettering.Event event = new DeadLettering.Event(this.name, reason, message, 1);
            AMQP.BasicProperties props = event.prependOn(message.props);
            deadLetterExchange.publish(message.exchangeName, this.arguments.getDeadLetterRoutingKey().orElse(message.routingKey), props, message.body);
        });
    }

    public List<Message> getAvailableMessages() {
        return new ArrayList<Message>(this.messages);
    }

    public List<Message> getUnackedMessages() {
        return new ArrayList<Message>(this.unackedMessagesByDeliveryTag.values());
    }

    static class ConsumerAndTag {
        private final String tag;
        private final com.rabbitmq.client.Consumer consumer;
        private final boolean autoAck;
        private final Supplier<Long> deliveryTagSupplier;
        private final MockConnection mockConnection;
        private final MockChannel mockChannel;

        ConsumerAndTag(String tag, com.rabbitmq.client.Consumer consumer, boolean autoAck, Supplier<Long> deliveryTagSupplier, MockConnection mockConnection, MockChannel mockChannel) {
            this.tag = tag;
            this.consumer = consumer;
            this.autoAck = autoAck;
            this.deliveryTagSupplier = deliveryTagSupplier;
            this.mockConnection = mockConnection;
            this.mockChannel = mockChannel;
        }
    }
}

