/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.scheduling.client;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.scheduling.SerializedSchedule;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryScheduleStore
extends InMemoryMessageStore
implements SchedulingClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InMemoryScheduleStore.class);
    private final ConcurrentSkipListMap<Long, String> scheduleIdsByIndex = new ConcurrentSkipListMap();
    private final AtomicLong minScheduleIndex = new AtomicLong();
    private volatile Clock clock = Clock.systemUTC();

    public InMemoryScheduleStore() {
        super(MessageType.SCHEDULE);
    }

    public InMemoryScheduleStore(Duration messageExpiration) {
        super(MessageType.SCHEDULE, messageExpiration);
    }

    @Override
    protected Collection<SerializedMessage> filterMessages(Collection<SerializedMessage> messages) {
        long maximumIndex = IndexUtils.maxIndexFromMillis(this.clock.millis());
        return super.filterMessages(messages).stream().filter(m -> m.getIndex() <= maximumIndex && this.scheduleIdsByIndex.containsKey(m.getIndex())).collect(Collectors.toList());
    }

    @Override
    public CompletableFuture<Void> append(SerializedMessage ... messages) {
        throw new UnsupportedOperationException();
    }

    @Override
    public synchronized CompletableFuture<Void> schedule(Guarantee guarantee, SerializedSchedule ... schedules) {
        List<SerializedSchedule> filtered = Arrays.stream(schedules).filter(s -> !s.isIfAbsent() || !this.scheduleIdsByIndex.containsValue(s.getScheduleId())).toList();
        long now = this.clock.millis();
        for (SerializedSchedule schedule : filtered) {
            long index;
            this.cancelSchedule(schedule.getScheduleId());
            long l = index = schedule.getTimestamp() > now ? IndexUtils.indexFromMillis(schedule.getTimestamp()) : this.minScheduleIndex.updateAndGet(i -> Math.max(IndexUtils.indexFromMillis(now), i + 1L));
            while (this.scheduleIdsByIndex.putIfAbsent(index, schedule.getScheduleId()) != null) {
                ++index;
            }
            schedule.getMessage().setIndex(index);
        }
        return super.append(filtered.stream().map(SerializedSchedule::getMessage).toList());
    }

    @Override
    public synchronized CompletableFuture<Void> cancelSchedule(String scheduleId, Guarantee guarantee) {
        this.scheduleIdsByIndex.values().removeIf(s -> s.equals(scheduleId));
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public synchronized SerializedSchedule getSchedule(String scheduleId) {
        return this.scheduleIdsByIndex.entrySet().stream().filter(e -> scheduleId.equals(e.getValue())).findFirst().map(e -> {
            SerializedMessage message = this.getMessage((Long)e.getKey());
            return new SerializedSchedule(scheduleId, IndexUtils.millisFromIndex((Long)e.getKey()), message, false);
        }).orElse(null);
    }

    @Override
    public CompletableFuture<Void> append(List<SerializedMessage> messages) {
        throw new UnsupportedOperationException("Use method #schedule instead");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void setClock(@NonNull Clock clock) {
        if (clock == null) {
            throw new NullPointerException("clock is marked non-null but is null");
        }
        InMemoryScheduleStore inMemoryScheduleStore = this;
        synchronized (inMemoryScheduleStore) {
            this.clock = clock;
            this.notifyMonitors();
        }
    }

    public synchronized List<Schedule> getSchedules(Serializer serializer) {
        return this.asList(this.scheduleIdsByIndex, serializer);
    }

    public synchronized List<Schedule> removeExpiredSchedules(Serializer serializer) {
        NavigableMap expiredEntries = this.scheduleIdsByIndex.headMap((Object)IndexUtils.maxIndexFromMillis(this.clock.millis()), true);
        List<Schedule> result = this.asList(expiredEntries, serializer);
        expiredEntries.clear();
        return result;
    }

    protected List<Schedule> asList(Map<Long, String> scheduleIdsByIndex, Serializer serializer) {
        return scheduleIdsByIndex.entrySet().stream().map(e -> {
            SerializedMessage m = this.getMessage((Long)e.getKey());
            return new Schedule(serializer.deserializeMessages(Stream.of(m), MessageType.SCHEDULE).findFirst().get().getPayload(), m.getMetadata(), (String)e.getValue(), IndexUtils.timestampFromIndex((Long)e.getKey()));
        }).toList();
    }

    @Override
    protected void purgeExpiredMessages(Duration messageExpiration) {
        this.scheduleIdsByIndex.headMap((Object)IndexUtils.maxIndexFromMillis(this.clock.millis() - messageExpiration.toMillis()), true).clear();
        super.purgeExpiredMessages(messageExpiration);
    }

    @Override
    public String toString() {
        return "InMemoryScheduleStore";
    }
}

