/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.ConsistentHashing;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.ClaimSegmentResult;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import io.fluxcapacitor.javaclient.tracking.client.WebsocketTrackingClient;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachingTrackingClient
implements TrackingClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CachingTrackingClient.class);
    private final WebsocketTrackingClient delegate;
    private final int maxCacheSize;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile Registration registration;
    private final ConcurrentSkipListMap<Long, SerializedMessage> cache = new ConcurrentSkipListMap();
    private final Map<String, Runnable> waitingTrackers = new ConcurrentHashMap<String, Runnable>();

    public CachingTrackingClient(WebsocketTrackingClient delegate) {
        this(delegate, 1024);
    }

    @Override
    public CompletableFuture<MessageBatch> read(String consumer, String trackerId, Long lastIndex, ConsumerConfiguration config) {
        if (this.started.compareAndSet(false, true)) {
            ConsumerConfiguration cacheFillerConfig = ConsumerConfiguration.builder().ignoreSegment(true).clientControlledIndex(true).minIndex(IndexUtils.indexForCurrentTime()).name(CachingTrackingClient.class.getSimpleName()).build();
            this.registration = FluxCapacitor.getOptionally().map(fc -> DefaultTracker.start(this::cacheNewMessages, this.delegate.getMessageType(), cacheFillerConfig, fc)).orElseGet(() -> DefaultTracker.start(this::cacheNewMessages, cacheFillerConfig, this.delegate));
        }
        if (lastIndex != null && this.cache.containsKey(lastIndex)) {
            Instant deadline = Instant.now().plus(config.getMaxWaitDuration());
            return this.delegate.claimSegment(consumer, trackerId, lastIndex, config).thenCompose(r -> {
                Long minIndex = r.getPosition().lowestIndexForSegment(r.getSegment()).orElse(null);
                if (minIndex != null) {
                    CompletableFuture<MessageBatch> result = new CompletableFuture<MessageBatch>();
                    MessageBatch messageBatch = this.getMessageBatch(config, minIndex, (ClaimSegmentResult)r);
                    if (!messageBatch.isEmpty()) {
                        result.complete(messageBatch);
                    } else {
                        this.waitForMessages(consumer, trackerId, Optional.ofNullable(messageBatch.getLastIndex()).orElse(minIndex), config, (ClaimSegmentResult)r, deadline, result);
                    }
                    return result;
                }
                return this.delegate.read(consumer, trackerId, lastIndex, config);
            });
        }
        return this.delegate.read(consumer, trackerId, lastIndex, config);
    }

    private void waitForMessages(String consumer, final String trackerId, long minIndex, final ConsumerConfiguration config, final ClaimSegmentResult claimResult, Instant deadline, final CompletableFuture<MessageBatch> future) {
        final AtomicLong atomicIndex = new AtomicLong(minIndex);
        long timeout = Duration.between(Instant.now(), deadline).toMillis();
        if (timeout <= 0L) {
            future.complete(new MessageBatch(claimResult.getSegment(), List.of(), Long.valueOf(atomicIndex.get()), claimResult.getPosition()));
        } else {
            final ScheduledFuture<?> timeoutSchedule = this.scheduler.schedule(() -> {
                try {
                    if (future.complete(new MessageBatch(claimResult.getSegment(), List.of(), Long.valueOf(atomicIndex.get()), claimResult.getPosition()))) {
                        this.waitingTrackers.remove(trackerId);
                    }
                }
                finally {
                    if (atomicIndex.get() > minIndex) {
                        try {
                            this.storePosition(consumer, claimResult.getSegment(), atomicIndex.get()).get();
                        }
                        catch (Exception e) {
                            log.error("Failed to update position of {}", (Object)consumer, (Object)e);
                        }
                    }
                }
            }, timeout, TimeUnit.MILLISECONDS);
            Runnable fetchTask = new Runnable(){

                @Override
                public void run() {
                    MessageBatch batch = CachingTrackingClient.this.getMessageBatch(config, atomicIndex.get(), claimResult);
                    if (!batch.isEmpty() && future.complete(batch) && CachingTrackingClient.this.waitingTrackers.remove(trackerId, this)) {
                        timeoutSchedule.cancel(false);
                    } else {
                        atomicIndex.updateAndGet(c -> Optional.ofNullable(batch.getLastIndex()).orElse(c));
                    }
                }
            };
            this.waitingTrackers.put(trackerId, fetchTask);
        }
    }

    protected MessageBatch getMessageBatch(ConsumerConfiguration config, long minIndex, ClaimSegmentResult claim) {
        List<SerializedMessage> unfiltered = this.cache.tailMap((Object)minIndex, false).values().stream().limit(config.getMaxFetchSize()).collect(Collectors.toList());
        Long lastIndex = unfiltered.isEmpty() ? null : ((SerializedMessage)unfiltered.get(unfiltered.size() - 1)).getIndex();
        return new MessageBatch(claim.getSegment(), this.filterMessages(unfiltered, claim.getSegment(), claim.getPosition(), config), lastIndex, claim.getPosition());
    }

    protected List<SerializedMessage> filterMessages(List<SerializedMessage> messages, int[] segmentRange, Position position, ConsumerConfiguration config) {
        if (messages.isEmpty()) {
            return messages;
        }
        Predicate<SerializedMessage> predicate = m -> (config.getTypeFilter() == null || m.getData().getType() == null || config.getTypeFilter().matches(m.getData().getType())) && position.isNewMessage(m);
        if (!config.ignoreSegment()) {
            predicate = predicate.and(m -> segmentRange[1] != 0 && m.getSegment() >= segmentRange[0] && m.getSegment() < segmentRange[1]);
        }
        return messages.stream().filter(predicate).collect(Collectors.toList());
    }

    protected void cacheNewMessages(List<SerializedMessage> messages) {
        if (!messages.isEmpty()) {
            Map messageMap = messages.stream().peek(m -> m.setSegment(Integer.valueOf(m.getSegment() == null ? ConsistentHashing.computeSegment((String)m.getMessageId(), (int)Position.MAX_SEGMENT) : m.getSegment() % Position.MAX_SEGMENT))).collect(Collectors.toMap(SerializedMessage::getIndex, Function.identity()));
            this.cache.putAll(messageMap);
            this.waitingTrackers.values().forEach(Runnable::run);
            this.removeOldMessages();
        }
    }

    protected synchronized void removeOldMessages() {
        int removeCount = this.cache.size() - this.maxCacheSize;
        for (int i = 0; i < removeCount; ++i) {
            this.cache.pollFirstEntry();
        }
    }

    @Override
    public List<SerializedMessage> readFromIndex(long minIndex, int maxSize) {
        return this.delegate.readFromIndex(minIndex, maxSize);
    }

    @Override
    public CompletableFuture<Void> storePosition(String consumer, int[] segment, long lastIndex, Guarantee guarantee) {
        return this.delegate.storePosition(consumer, segment, lastIndex, guarantee);
    }

    @Override
    public CompletableFuture<Void> resetPosition(String consumer, long lastIndex, Guarantee guarantee) {
        return this.delegate.resetPosition(consumer, lastIndex, guarantee);
    }

    @Override
    public Position getPosition(String consumer) {
        return this.delegate.getPosition(consumer);
    }

    @Override
    public CompletableFuture<Void> disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch, Guarantee guarantee) {
        return this.delegate.disconnectTracker(consumer, trackerId, sendFinalEmptyBatch, guarantee);
    }

    @Override
    public MessageType getMessageType() {
        return this.delegate.getMessageType();
    }

    @Override
    public void close() {
        Optional.ofNullable(this.registration).ifPresent(Registration::cancel);
        this.scheduler.shutdown();
        this.delegate.close();
    }

    @ConstructorProperties(value={"delegate", "maxCacheSize"})
    @Generated
    public CachingTrackingClient(WebsocketTrackingClient delegate, int maxCacheSize) {
        this.delegate = delegate;
        this.maxCacheSize = maxCacheSize;
    }

    @Generated
    public WebsocketTrackingClient getDelegate() {
        return this.delegate;
    }
}

