/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.common.tracking;

import io.fluxcapacitor.common.ConsistentHashing;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.common.application.AutoClosing;
import io.fluxcapacitor.common.tracking.InMemoryTaskScheduler;
import io.fluxcapacitor.common.tracking.MessageStore;
import io.fluxcapacitor.common.tracking.PositionStore;
import io.fluxcapacitor.common.tracking.TaskScheduler;
import io.fluxcapacitor.common.tracking.Tracker;
import io.fluxcapacitor.common.tracking.TrackerCluster;
import io.fluxcapacitor.common.tracking.TrackingStrategy;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTrackingStrategy
extends AutoClosing
implements TrackingStrategy {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTrackingStrategy.class);
    private final MessageStore source;
    private final TaskScheduler scheduler;
    private final int segments;
    private final Map<Tracker, WaitingTracker> waitingTrackers = Collections.synchronizedMap(new HashMap());
    private final ConcurrentHashMap<String, TrackerCluster> clusters = new ConcurrentHashMap();
    private volatile long lastSeenIndex = -1L;

    public DefaultTrackingStrategy(MessageStore source) {
        this(source, new InMemoryTaskScheduler());
    }

    public DefaultTrackingStrategy(MessageStore source, TaskScheduler scheduler) {
        this(source, scheduler, Position.MAX_SEGMENT);
    }

    protected DefaultTrackingStrategy(MessageStore source, TaskScheduler scheduler, int segments) {
        this.source = source;
        this.scheduler = scheduler;
        this.segments = segments;
        source.registerMonitor(this::onUpdate);
        this.purgeCeasedTrackers(Duration.ofSeconds(2L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void getBatch(Tracker tracker, PositionStore positionStore) {
        TrackerCluster oldCluster = this.clusters.get(tracker.getConsumerName());
        int[] newSegment = this.claimSegment(tracker);
        try {
            Position position;
            List<SerializedMessage> filtered;
            List<SerializedMessage> unfiltered;
            if (newSegment[0] == newSegment[1]) {
                this.waitForMessages(tracker, new MessageBatch(newSegment, Collections.emptyList(), null, Position.newPosition()), positionStore);
                return;
            }
            int batchSize = this.adjustMaxSize(tracker, tracker.getMaxSize());
            long lastIndex = this.lastSeenIndex;
            do {
                position = this.position(tracker, positionStore, newSegment);
                unfiltered = this.getBatch(newSegment, position, batchSize);
                filtered = this.filter(unfiltered, newSegment, position, tracker);
                if (unfiltered.isEmpty() || !filtered.isEmpty()) continue;
                long batchIndex = unfiltered.get(unfiltered.size() - 1).getIndex();
                if (batchIndex < DefaultTrackingStrategy.indexFromMillis(System.currentTimeMillis() - tracker.maxTimeout())) {
                    MessageBatch emptyBatch = new MessageBatch(newSegment, filtered, batchIndex, position);
                    tracker.send(emptyBatch);
                    return;
                }
                positionStore.storePosition(tracker.getConsumerName(), newSegment, batchIndex);
                tracker = tracker.withLastTrackerIndex(batchIndex);
            } while (!unfiltered.isEmpty() && filtered.isEmpty() && !tracker.hasMissedDeadline());
            MessageBatch messageBatch = new MessageBatch(newSegment, filtered, this.getLastIndex(unfiltered), position);
            if (messageBatch.isEmpty()) {
                WaitingTracker task;
                this.waitForMessages(tracker, messageBatch, positionStore);
                if (lastIndex < this.lastSeenIndex && (messageBatch.getLastIndex() == null || messageBatch.getLastIndex() < this.lastSeenIndex) && (task = this.waitingTrackers.get(tracker)) != null && task.tracker == tracker) {
                    task.run();
                }
            } else {
                tracker.send(messageBatch);
            }
        }
        catch (Throwable e) {
            log.error("Failed to get a batch for tracker {}", (Object)tracker, (Object)e);
            this.waitForMessages(tracker, new MessageBatch(newSegment, Collections.emptyList(), null, Position.newPosition()), positionStore);
        }
        finally {
            if (oldCluster != null && !Objects.deepEquals(oldCluster.getSegment(tracker), newSegment)) {
                this.onClusterUpdate(oldCluster);
            }
        }
    }

    @Override
    public void claimSegment(Tracker tracker, PositionStore positionStore) {
        int[] newSegment = this.claimSegment(tracker);
        if (newSegment[0] == newSegment[1]) {
            this.waitForUpdate(tracker, new MessageBatch(newSegment, Collections.emptyList(), null, Position.newPosition()), () -> this.claimSegment(tracker, positionStore));
        } else {
            tracker.send(new MessageBatch(newSegment, Collections.emptyList(), null, this.position(tracker, positionStore, newSegment)));
        }
    }

    protected List<SerializedMessage> getBatch(int[] segment, Position position, int batchSize) {
        return this.source.getBatch(position.lowestIndexForSegment(segment).orElse(null), batchSize);
    }

    protected void waitForMessages(Tracker tracker, MessageBatch emptyBatch, PositionStore positionStore) {
        this.waitForUpdate(tracker, emptyBatch, () -> this.getBatch(tracker, positionStore));
    }

    protected void waitForUpdate(Tracker tracker, MessageBatch emptyBatch, Runnable followUp) {
        if (tracker.hasMissedDeadline()) {
            tracker.send(emptyBatch);
            return;
        }
        this.clusters.compute(tracker.getConsumerName(), (p, c) -> Optional.ofNullable(c).orElseGet(() -> new TrackerCluster(this.segments)).withWaitingTracker(tracker));
        Registration scheduleToken = this.scheduler.schedule(tracker.getDeadline(), () -> {
            if (this.waitingTrackers.keySet().removeIf(t -> t == tracker)) {
                this.clusters.compute(tracker.getConsumerName(), (p, cluster) -> cluster != null && cluster.contains(tracker) ? cluster.withActiveTracker(tracker) : cluster);
                tracker.send(emptyBatch);
            }
        });
        WaitingTracker existing = this.waitingTrackers.remove(tracker);
        this.waitingTrackers.put(tracker, new WaitingTracker(tracker, scheduleToken, followUp));
        if (existing != null) {
            log.warn("Tracker replaced another waiting tracker. This should normally not happen. New tracker: {}", (Object)tracker);
            existing.tracker.send(emptyBatch);
        }
    }

    protected Position position(Tracker tracker, PositionStore positionStore, int[] segment) {
        if (tracker.clientControlledIndex()) {
            return new Position(segment, Optional.ofNullable(tracker.getLastTrackerIndex()).orElseGet(() -> DefaultTrackingStrategy.indexFromMillis(System.currentTimeMillis() - 1000L)));
        }
        Position position = positionStore.position(tracker.getConsumerName());
        if (position.isNew(segment)) {
            return new Position(segment, tracker.getLastTrackerIndex() == null ? DefaultTrackingStrategy.indexFromMillis(System.currentTimeMillis() - 1000L) : tracker.getLastTrackerIndex());
        }
        return position;
    }

    protected List<SerializedMessage> filter(List<SerializedMessage> messages, int[] segmentRange, Position position, Tracker tracker) {
        return messages.stream().filter(m -> tracker.canHandle(this.ensureMessageSegment((SerializedMessage)m), segmentRange) && (tracker.ignoreSegment() || position.isNewMessage((SerializedMessage)m))).toList();
    }

    protected SerializedMessage ensureMessageSegment(SerializedMessage message) {
        message.setSegment(message.getSegment() == null ? ConsistentHashing.computeSegment(message.getMessageId(), this.segments) : message.getSegment() % this.segments);
        return message;
    }

    protected int adjustMaxSize(Tracker tracker, int maxSize) {
        return Optional.ofNullable(this.clusters.get(tracker.getConsumerName())).map(cluster -> cluster.getTrackers().size() * maxSize).orElse(maxSize);
    }

    protected int[] claimSegment(Tracker tracker) {
        TrackerCluster cluster = this.clusters.compute(tracker.getConsumerName(), (p, c) -> Optional.ofNullable(c).orElseGet(() -> new TrackerCluster(this.segments)).withActiveTracker(tracker));
        return cluster.getSegment(tracker);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onUpdate(List<SerializedMessage> messages) {
        if (!this.isStopped()) {
            Map<Tracker, WaitingTracker> map = this.waitingTrackers;
            synchronized (map) {
                this.lastSeenIndex = Optional.ofNullable(this.getLastIndex(messages)).orElse(this.lastSeenIndex);
                new ArrayList<WaitingTracker>(this.waitingTrackers.values()).forEach(WaitingTracker::run);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onClusterUpdate(TrackerCluster cluster) {
        if (!this.isStopped()) {
            Map<Tracker, WaitingTracker> map = this.waitingTrackers;
            synchronized (map) {
                this.waitingTrackers.entrySet().stream().filter(e -> cluster.contains((Tracker)e.getKey())).map(Map.Entry::getValue).toList().forEach(WaitingTracker::run);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnectTrackers(Predicate<Tracker> predicate, boolean sendFinalEmptyBatch) {
        HashSet removedAndWaiting = new HashSet();
        HashSet updatedClusters = new HashSet();
        try {
            Map<Tracker, WaitingTracker> map = this.waitingTrackers;
            synchronized (map) {
                this.waitingTrackers.keySet().removeIf(tracker -> {
                    boolean match = predicate.test((Tracker)tracker);
                    if (match) {
                        removedAndWaiting.add(tracker);
                    }
                    return match;
                });
                this.clusters.replaceAll((key, cluster) -> {
                    TrackerCluster updatedCluster = cluster.purgeTrackers(predicate);
                    if (!Objects.equals(updatedCluster, cluster) && !updatedCluster.isEmpty()) {
                        updatedClusters.add(updatedCluster);
                    }
                    return updatedCluster;
                });
                this.clusters.values().removeIf(TrackerCluster::isEmpty);
            }
            updatedClusters.forEach(this::onClusterUpdate);
        }
        finally {
            if (sendFinalEmptyBatch) {
                removedAndWaiting.forEach(tracker -> {
                    try {
                        tracker.send(new MessageBatch(new int[]{0, 0}, Collections.emptyList(), null, Position.newPosition()));
                    }
                    catch (Exception e) {
                        log.error("Failed to send final empty batch to disconnecting tracker: {}", (Object)predicate, (Object)e);
                    }
                });
            }
        }
    }

    protected void purgeCeasedTrackers(Duration delay) {
        this.scheduler.schedule(System.currentTimeMillis() + delay.toMillis(), () -> {
            this.clusters.replaceAll((key, cluster) -> {
                TrackerCluster after = cluster.purgeTrackers(t -> t.getPurgeDelay() != null && cluster.getProcessingDuration((Tracker)t).filter(d -> d.toMillis() > t.getPurgeDelay()).isPresent());
                if (after != cluster) {
                    HashSet<Tracker> removed = new HashSet<Tracker>(cluster.getTrackers());
                    removed.removeAll(after.getTrackers());
                    if (!removed.isEmpty()) {
                        log.warn("Purged trackers from consumer {} because they have ceased processing: {}", key, (Object)removed);
                        return after;
                    }
                }
                return cluster;
            });
            this.purgeCeasedTrackers(delay);
        });
    }

    private Long getLastIndex(List<SerializedMessage> messages) {
        return messages.isEmpty() ? null : messages.get(messages.size() - 1).getIndex();
    }

    private static long indexFromMillis(long millisSinceEpoch) {
        return millisSinceEpoch << 16;
    }

    @Override
    protected void onShutdown() {
        this.scheduler.shutdown();
    }

    @Override
    public void close() {
        this.onShutdown();
    }

    protected class WaitingTracker
    implements Runnable {
        private final Tracker tracker;
        private final Registration scheduleToken;
        private final Runnable followUp;

        @Override
        public void run() {
            try {
                this.scheduleToken.cancel();
                if (DefaultTrackingStrategy.this.waitingTrackers.remove(this.tracker, this)) {
                    this.followUp.run();
                }
            }
            catch (Throwable e) {
                log.error("Failed to execute tracker fetch / follow up", e);
            }
        }

        @ConstructorProperties(value={"tracker", "scheduleToken", "followUp"})
        @Generated
        public WaitingTracker(Tracker tracker, Registration scheduleToken, Runnable followUp) {
            this.tracker = tracker;
            this.scheduleToken = scheduleToken;
            this.followUp = followUp;
        }
    }
}

