/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.testserver.scheduling;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.scheduling.SerializedSchedule;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.common.tracking.InMemoryTaskScheduler;
import io.fluxcapacitor.common.tracking.MessageStore;
import io.fluxcapacitor.common.tracking.TaskScheduler;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.scheduling.client.InMemoryScheduleStore;
import io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.TrackerRead;
import java.beans.ConstructorProperties;
import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Predicate;

public class TestServerScheduleStore
implements MessageStore,
SchedulingClient {
    private final TaskScheduler scheduler = new InMemoryTaskScheduler();
    private final InMemoryScheduleStore delegate;
    private volatile Deadline upcomingDeadline;

    public TestServerScheduleStore(InMemoryScheduleStore delegate) {
        this.delegate = delegate;
        this.getBatch(IndexUtils.indexForCurrentTime(), 1).stream().findFirst().map(SerializedMessage::getIndex).ifPresent(this::rescheduleNextDeadline);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized CompletableFuture<Void> schedule(Guarantee guarantee, SerializedSchedule ... schedules) {
        long now = FluxCapacitor.currentClock().millis();
        try {
            CompletableFuture<Void> completableFuture = this.delegate.schedule(guarantee, schedules);
            return completableFuture;
        }
        finally {
            Arrays.stream(schedules).mapToLong(SerializedSchedule::getTimestamp).filter(t -> t > now).map(IndexUtils::indexFromMillis).findFirst().ifPresent(this::rescheduleNextDeadline);
        }
    }

    @Override
    public List<SerializedMessage> getBatch(Long minIndex, int maxSize) {
        return this.getBatch(minIndex, maxSize, false);
    }

    @Override
    public synchronized List<SerializedMessage> getBatch(Long minIndex, int maxSize, boolean inclusive) {
        List<SerializedMessage> unfiltered = this.readFromIndex(minIndex == null ? -1L : (inclusive ? minIndex : minIndex + 1L), maxSize);
        List<SerializedMessage> filtered = this.delegate.getBatch(minIndex, maxSize, inclusive);
        unfiltered.stream().filter(m -> !filtered.contains(m)).map(SerializedMessage::getIndex).min(Comparator.naturalOrder()).ifPresent(this::rescheduleNextDeadline);
        return filtered;
    }

    protected void rescheduleNextDeadline(long nextIndex) {
        long nextDeadline = IndexUtils.millisFromIndex(nextIndex);
        if (this.upcomingDeadline == null || nextDeadline < this.upcomingDeadline.getTimestamp()) {
            if (this.upcomingDeadline != null) {
                this.upcomingDeadline.getScheduleToken().cancel();
            }
            Registration token = this.scheduler.schedule(nextDeadline, () -> {
                this.upcomingDeadline = null;
                this.delegate.notifyMonitors(new SerializedMessage[0]);
            });
            this.upcomingDeadline = new Deadline(nextDeadline, token);
        }
    }

    @Override
    public CompletableFuture<Void> cancelSchedule(String scheduleId, Guarantee guarantee) {
        return this.delegate.cancelSchedule(scheduleId, guarantee);
    }

    @Override
    public SerializedSchedule getSchedule(String scheduleId) {
        return this.delegate.getSchedule(scheduleId);
    }

    @Override
    public CompletableFuture<Void> append(Guarantee guarantee, SerializedMessage ... messages) {
        return this.delegate.append(guarantee, messages);
    }

    public void setClock(Clock clock) {
        this.delegate.setClock(clock);
    }

    public List<Schedule> getSchedules(Serializer serializer) {
        return this.delegate.getSchedules(serializer);
    }

    public List<Schedule> removeExpiredSchedules(Serializer serializer) {
        return this.delegate.removeExpiredSchedules(serializer);
    }

    public void notifyMonitors(SerializedMessage ... messages) {
        this.delegate.notifyMonitors(messages);
    }

    public CompletableFuture<MessageBatch> read(String consumer, String trackerId, Long lastIndex, ConsumerConfiguration configuration) {
        return this.delegate.read(consumer, trackerId, lastIndex, configuration);
    }

    public CompletableFuture<int[]> claimSegment(TrackerRead trackerRead) {
        return this.delegate.claimSegment(trackerRead);
    }

    public CompletableFuture<MessageBatch> read(TrackerRead trackerRead) {
        return this.delegate.read(trackerRead);
    }

    public List<SerializedMessage> readFromIndex(long minIndex, int maxSize) {
        return this.delegate.readFromIndex(minIndex, maxSize);
    }

    public CompletableFuture<Void> storePosition(String consumer, int[] segment, long lastIndex, Guarantee guarantee) {
        return this.delegate.storePosition(consumer, segment, lastIndex, guarantee);
    }

    public CompletableFuture<Void> resetPosition(String consumer, long lastIndex, Guarantee guarantee) {
        return this.delegate.resetPosition(consumer, lastIndex, guarantee);
    }

    public Position getPosition(String consumer) {
        return this.delegate.getPosition(consumer);
    }

    public CompletableFuture<Void> disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch, Guarantee guarantee) {
        return this.delegate.disconnectTracker(consumer, trackerId, sendFinalEmptyBatch, guarantee);
    }

    public <T extends TrackerRead> void disconnectTrackersMatching(Predicate<T> predicate) {
        this.delegate.disconnectTrackersMatching(predicate);
    }

    @Override
    public void close() {
        this.delegate.close();
    }

    @Override
    public Registration registerMonitor(Consumer<List<SerializedMessage>> monitor) {
        return this.delegate.registerMonitor(monitor);
    }

    public MessageType getMessageType() {
        return this.delegate.getMessageType();
    }

    public Duration getMessageExpiration() {
        return this.delegate.getMessageExpiration();
    }

    public MessageBatch readAndWait(String consumer, String trackerId, Long lastIndex, ConsumerConfiguration configuration) {
        return this.delegate.readAndWait(consumer, trackerId, lastIndex, configuration);
    }

    public CompletableFuture<Void> storePosition(String consumer, int[] segment, long lastIndex) {
        return this.delegate.storePosition(consumer, segment, lastIndex);
    }

    public CompletableFuture<Void> resetPosition(String consumer, long lastIndex) {
        return this.delegate.resetPosition(consumer, lastIndex);
    }

    public CompletableFuture<Void> disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch) {
        return this.delegate.disconnectTracker(consumer, trackerId, sendFinalEmptyBatch);
    }

    @Override
    public CompletableFuture<Void> schedule(SerializedSchedule ... schedules) {
        return this.delegate.schedule(schedules);
    }

    @Override
    public CompletableFuture<Void> cancelSchedule(String scheduleId) {
        return this.delegate.cancelSchedule(scheduleId);
    }

    static final class Deadline {
        private final long timestamp;
        private final Registration scheduleToken;

        @ConstructorProperties(value={"timestamp", "scheduleToken"})
        public Deadline(long timestamp, Registration scheduleToken) {
            this.timestamp = timestamp;
            this.scheduleToken = scheduleToken;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public Registration getScheduleToken() {
            return this.scheduleToken;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Deadline)) {
                return false;
            }
            Deadline other = (Deadline)o;
            if (this.getTimestamp() != other.getTimestamp()) {
                return false;
            }
            Registration this$scheduleToken = this.getScheduleToken();
            Registration other$scheduleToken = other.getScheduleToken();
            return !(this$scheduleToken == null ? other$scheduleToken != null : !this$scheduleToken.equals(other$scheduleToken));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $timestamp = this.getTimestamp();
            result = result * 59 + (int)($timestamp >>> 32 ^ $timestamp);
            Registration $scheduleToken = this.getScheduleToken();
            result = result * 59 + ($scheduleToken == null ? 43 : $scheduleToken.hashCode());
            return result;
        }

        public String toString() {
            return "TestServerScheduleStore.Deadline(timestamp=" + this.getTimestamp() + ", scheduleToken=" + String.valueOf(this.getScheduleToken()) + ")";
        }
    }
}

