/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.common.tracking.MessageStore;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.SimpleTrackerRead;
import io.fluxcapacitor.javaclient.tracking.client.TrackerRead;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryMessageStore
implements GatewayClient,
TrackingClient,
MessageStore {
    private static final Logger log = LoggerFactory.getLogger(InMemoryMessageStore.class);
    private final Set<Consumer<List<SerializedMessage>>> monitors = new CopyOnWriteArraySet<Consumer<List<SerializedMessage>>>();
    private final ExecutorService executor = Executors.newCachedThreadPool(ObjectUtils.newThreadFactory("InMemoryMessageStore"));
    private final AtomicLong nextIndex = new AtomicLong();
    private final Map<String, TrackerRead> trackers = new ConcurrentHashMap<String, TrackerRead>();
    private final ConcurrentSkipListMap<Long, SerializedMessage> messageLog = new ConcurrentSkipListMap();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap<String, Long>();
    private final MessageType messageType;
    private final Duration messageExpiration;

    public InMemoryMessageStore(MessageType messageType) {
        this(messageType, Duration.ofMinutes(2L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized CompletableFuture<Void> append(Guarantee guarantee, SerializedMessage ... messages) {
        try {
            Arrays.stream(messages).forEach(m -> {
                if (m.getIndex() == null) {
                    m.setIndex(this.nextIndex.updateAndGet(IndexUtils::nextIndex));
                }
                this.messageLog.put(m.getIndex(), (SerializedMessage)m);
            });
            if (this.messageExpiration != null) {
                this.purgeExpiredMessages(this.messageExpiration);
            }
            CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
            return completableFuture;
        }
        finally {
            this.notifyMonitors(messages);
        }
    }

    public synchronized void notifyMonitors(SerializedMessage ... messages) {
        this.notifyAll();
        if (!this.monitors.isEmpty()) {
            this.monitors.forEach(m -> m.accept(Arrays.asList(messages)));
        }
    }

    protected void purgeExpiredMessages(Duration messageExpiration) {
        long threshold = FluxCapacitor.currentTime().minus(messageExpiration).toEpochMilli();
        this.messageLog.headMap((Object)IndexUtils.maxIndexFromMillis(threshold), true).clear();
    }

    @Override
    public CompletableFuture<MessageBatch> read(String consumer, String trackerId, Long lastIndex, ConsumerConfiguration configuration) {
        return this.read(new SimpleTrackerRead(consumer, trackerId, lastIndex, configuration, this.messageType));
    }

    public CompletableFuture<int[]> claimSegment(TrackerRead trackerRead) {
        if (trackerRead.getMessageType() != MessageType.RESULT && !Objects.equals(trackerRead.getTrackerId(), this.trackers.computeIfAbsent(trackerRead.getConsumer(), c -> trackerRead).getTrackerId())) {
            return CompletableFuture.supplyAsync(() -> new int[]{0, 0}, CompletableFuture.delayedExecutor(trackerRead.getDeadline() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        }
        return CompletableFuture.completedFuture(new int[]{0, Position.MAX_SEGMENT});
    }

    public CompletableFuture<MessageBatch> read(TrackerRead trackerRead) {
        if (trackerRead.getMessageType() != MessageType.RESULT && !Objects.equals(trackerRead.getTrackerId(), this.trackers.computeIfAbsent(trackerRead.getConsumer(), c -> trackerRead).getTrackerId())) {
            log.debug("Delaying read by secondary tracker {} (message type {})", (Object)trackerRead.getConsumer(), (Object)this.messageType);
            return CompletableFuture.supplyAsync(() -> new MessageBatch(new int[]{0, 0}, Collections.emptyList(), null), CompletableFuture.delayedExecutor(trackerRead.getDeadline() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        }
        CompletableFuture<MessageBatch> result = new CompletableFuture<MessageBatch>();
        this.executor.execute(() -> {
            NavigableMap tailMap = Collections.emptyMap();
            InMemoryMessageStore inMemoryMessageStore = this;
            synchronized (inMemoryMessageStore) {
                while (System.currentTimeMillis() < trackerRead.getDeadline() && this.shouldWait(tailMap = this.messageLog.tailMap((Object)Optional.ofNullable(trackerRead.getLastIndex()).orElseGet(() -> this.getLastIndex(trackerRead.getConsumer())), false))) {
                    long duration = trackerRead.getDeadline() - System.currentTimeMillis();
                    if (duration <= 0L) continue;
                    try {
                        this.wait(duration);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
            List<SerializedMessage> messages = new ArrayList<SerializedMessage>(this.filterMessages(tailMap.values()));
            messages = messages.subList(0, Math.min(messages.size(), trackerRead.getMaxSize()));
            Long lastIndex = messages.isEmpty() ? null : messages.get(messages.size() - 1).getIndex();
            messages = messages.stream().filter(trackerRead::canHandle).collect(Collectors.toList());
            result.complete(new MessageBatch(new int[]{0, 128}, messages, lastIndex));
        });
        return result;
    }

    protected boolean shouldWait(Map<Long, SerializedMessage> tailMap) {
        return this.filterMessages(tailMap.values()).isEmpty();
    }

    protected Collection<SerializedMessage> filterMessages(Collection<SerializedMessage> messages) {
        return messages;
    }

    @Override
    public List<SerializedMessage> getBatch(Long minIndex, int maxSize, boolean inclusive) {
        ArrayList<SerializedMessage> list = new ArrayList<SerializedMessage>(this.filterMessages(this.messageLog.tailMap((Object)Optional.ofNullable(minIndex).map(i -> inclusive ? i : i + 1L).orElse(-1L)).values()));
        return list.subList(0, Math.min(maxSize, list.size()));
    }

    @Override
    public List<SerializedMessage> readFromIndex(long minIndex, int maxSize) {
        ArrayList list = new ArrayList(this.messageLog.tailMap((Object)minIndex).values());
        return list.subList(0, Math.min(maxSize, list.size()));
    }

    private long getLastIndex(String consumer) {
        return this.consumerTokens.computeIfAbsent(consumer, k -> -1L);
    }

    @Override
    public CompletableFuture<Void> storePosition(String consumer, int[] segment, long lastIndex, Guarantee guarantee) {
        return this.resetPosition(consumer, lastIndex, guarantee);
    }

    @Override
    public CompletableFuture<Void> resetPosition(String consumer, long lastIndex, Guarantee guarantee) {
        this.consumerTokens.put(consumer, lastIndex);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public Position getPosition(String consumer) {
        return Optional.ofNullable(this.consumerTokens.get(consumer)).map(Position::new).orElse(null);
    }

    @Override
    public CompletableFuture<Void> disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch, Guarantee guarantee) {
        this.disconnectTrackersMatching(t -> Objects.equals(trackerId, t.getTrackerId()));
        return CompletableFuture.completedFuture(null);
    }

    public <T extends TrackerRead> void disconnectTrackersMatching(Predicate<T> predicate) {
        this.trackers.values().removeIf(t -> predicate.test(t));
    }

    @Override
    public void close() {
        this.executor.shutdownNow();
    }

    protected SerializedMessage getMessage(long index) {
        return this.messageLog.get(index);
    }

    @Override
    public Registration registerMonitor(Consumer<List<SerializedMessage>> monitor) {
        this.monitors.add(monitor);
        return () -> this.monitors.remove(monitor);
    }

    @ConstructorProperties(value={"messageType", "messageExpiration"})
    public InMemoryMessageStore(MessageType messageType, Duration messageExpiration) {
        this.messageType = messageType;
        this.messageExpiration = messageExpiration;
    }

    @Override
    public MessageType getMessageType() {
        return this.messageType;
    }

    public Duration getMessageExpiration() {
        return this.messageExpiration;
    }
}

