/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.internal;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mulesoft.mq.restclient.api.AnypointMQMessage;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.LockedMessage;
import com.mulesoft.mq.restclient.api.MessageIdResult;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.TimeSupplier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultMessagePreserver
implements MessagePreserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessagePreserver.class);
    private static final int UPDATE_BATCH_SIZE = 10;
    private static final int PRESERVE_INTERVAL = 500;
    private final Map<String, InternalPreservedMessage> preservedMessages = new ConcurrentHashMap<String, InternalPreservedMessage>();
    private final Collection<String> expiredMessages = new ConcurrentLinkedQueue<String>();
    private final ScheduledExecutorService preserver;
    private final TimeSupplier timeSupplier;
    private final Destination destination;
    private final long refreshFrequency;

    public DefaultMessagePreserver(Destination destination, TimeSupplier timeSupplier) {
        this(destination, timeSupplier, 500L);
    }

    public DefaultMessagePreserver(Destination destination, TimeSupplier timeSupplier, long refreshFrequency) {
        this.timeSupplier = timeSupplier;
        this.destination = destination;
        this.refreshFrequency = refreshFrequency;
        this.preserver = this.createExecutorService();
        this.start();
    }

    private void start() {
        this.preserver.scheduleAtFixedRate(() -> {
            long now = this.now();
            ArrayList<LockedMessage> messagesToUpdate = new ArrayList<LockedMessage>();
            for (InternalPreservedMessage preservedMessage : this.preservedMessages.values()) {
                if (preservedMessage.isExpired(now)) {
                    this.markExpired(preservedMessage.message.getMessageId());
                    continue;
                }
                if (!preservedMessage.isAboutToExpire(now)) continue;
                messagesToUpdate.add(new LockedMessage(preservedMessage.message, preservedMessage.ttl));
            }
            if (!messagesToUpdate.isEmpty()) {
                Lists.partition(messagesToUpdate, (int)10).forEach(messages -> this.modifyLockInterval((List<LockedMessage>)messages, this.destination));
            }
        }, 0L, this.refreshFrequency, TimeUnit.MILLISECONDS);
    }

    private void modifyLockInterval(final List<LockedMessage> messagesToUpdate, Destination destination) {
        destination.modifyLockInterval(messagesToUpdate).subscribe(new CourierObserver<List<MessageIdResult>>(){

            @Override
            public void onSuccess(List<MessageIdResult> messageIdResults) {
                long now = DefaultMessagePreserver.this.now();
                for (MessageIdResult messageIdResult : messageIdResults) {
                    if (messageIdResult.isSuccess()) {
                        DefaultMessagePreserver.this.update(messageIdResult.getMessageId(), now);
                        continue;
                    }
                    LOGGER.debug("Failed to update lock TTL for message {}: {}", (Object)messageIdResult.getMessageId(), (Object)messageIdResult.getStatusMessage());
                    DefaultMessagePreserver.this.remove(messageIdResult.getMessageId());
                }
            }

            @Override
            public void onError(Throwable e) {
                LOGGER.debug("An error occurred while updating the lockTTL for a message batch: " + e.getMessage(), e);
                messagesToUpdate.forEach(m -> DefaultMessagePreserver.this.remove(m.getMessage().getMessageId()));
            }
        });
    }

    protected ScheduledExecutorService createExecutorService() {
        return Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("default-message-preserver-%d").build());
    }

    @Override
    public void add(List<AnypointMQMessage> messages, long ttl) {
        for (AnypointMQMessage message : messages) {
            this.add(message, ttl);
        }
    }

    @Override
    public void add(AnypointMQMessage message, long ttl) {
        LOGGER.debug("Adding preserved message - {}", (Object)message.getMessageId());
        this.expiredMessages.remove(message.getMessageId());
        this.preservedMessages.put(message.getMessageId(), new InternalPreservedMessage(message, ttl, this.now()));
    }

    @Override
    public boolean remove(String messageId) {
        LOGGER.debug("Removing preserved message - {}", (Object)messageId);
        if (this.preservedMessages.remove(messageId) == null) {
            return this.expiredMessages.remove(messageId);
        }
        return true;
    }

    @Override
    public boolean isExpired(String messageId) {
        if (this.expiredMessages.contains(messageId)) {
            return true;
        }
        InternalPreservedMessage preservedMessage = this.preservedMessages.get(messageId);
        if (preservedMessage != null && preservedMessage.isExpired(this.now())) {
            this.markExpired(preservedMessage.message.getMessageId());
            return true;
        }
        return false;
    }

    private void markExpired(String messageId) {
        this.preservedMessages.remove(messageId);
        this.expiredMessages.add(messageId);
        LOGGER.debug("Message expired - {}", (Object)messageId);
    }

    private long now() {
        return this.timeSupplier.get();
    }

    private void update(String messageId, long now) {
        InternalPreservedMessage preservedMessage = this.preservedMessages.get(messageId);
        if (preservedMessage != null) {
            preservedMessage.update(now);
        }
    }

    private static class InternalPreservedMessage {
        private static final double MIN_TOLERANCE = 0.8;
        private final AnypointMQMessage message;
        private final long ttl;
        private long lastUpdate;

        public InternalPreservedMessage(AnypointMQMessage message, long ttl, long now) {
            this.message = message;
            this.ttl = ttl;
            this.update(now);
        }

        public void update(long now) {
            this.lastUpdate = now;
        }

        public boolean isAboutToExpire(long now) {
            return (double)(now - this.lastUpdate) > (double)this.ttl * 0.8;
        }

        public boolean isExpired(long now) {
            return now - this.lastUpdate > this.ttl;
        }
    }
}

