/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore.inmemory;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TerminalEventMessage;
import org.axonframework.eventhandling.processors.streaming.token.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.processors.streaming.token.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.AppendEventsTransactionRejectedException;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.GlobalIndexConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.eventstreaming.EventsCondition;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.SimpleEntry;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryEventStorageEngine
implements EventStorageEngine {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final boolean WITH_MARKER = true;
    private static final boolean WITHOUT_MARKER = false;
    private final NavigableMap<Long, TaggedEventMessage<? extends EventMessage>> eventStorage = new ConcurrentSkipListMap<Long, TaggedEventMessage<? extends EventMessage>>();
    private final long offset;
    private final ReentrantLock appendLock = new ReentrantLock();
    private final Set<MapBackedMessageStream> openStreams = new CopyOnWriteArraySet<MapBackedMessageStream>();

    public InMemoryEventStorageEngine() {
        this(0L);
    }

    public InMemoryEventStorageEngine(long offset) {
        this.offset = offset;
    }

    @Override
    public CompletableFuture<EventStorageEngine.AppendTransaction<?>> appendEvents(final @Nonnull AppendCondition condition, @Nullable ProcessingContext processingContext, final @Nonnull List<TaggedEventMessage<?>> events) {
        if (this.containsConflicts(condition)) {
            return CompletableFuture.failedFuture((Throwable)((Object)AppendEventsTransactionRejectedException.conflictingEventsDetected(condition.consistencyMarker())));
        }
        return CompletableFuture.completedFuture(new EventStorageEngine.AppendTransaction<ConsistencyMarker>(){
            private final AtomicBoolean finished = new AtomicBoolean(false);

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public CompletableFuture<ConsistencyMarker> commit(@Nullable ProcessingContext context) {
                if (this.finished.getAndSet(true)) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Already committed or rolled back"));
                }
                InMemoryEventStorageEngine.this.appendLock.lock();
                try {
                    if (InMemoryEventStorageEngine.this.containsConflicts(condition)) {
                        CompletableFuture<ConsistencyMarker> completableFuture = CompletableFuture.failedFuture((Throwable)((Object)AppendEventsTransactionRejectedException.conflictingEventsDetected(condition.consistencyMarker())));
                        return completableFuture;
                    }
                    ConsistencyMarker newLatest = events.stream().map(event -> {
                        long next = InMemoryEventStorageEngine.this.nextIndex();
                        long marker = next + 1L;
                        InMemoryEventStorageEngine.this.eventStorage.put(next, (TaggedEventMessage<? extends EventMessage>)event);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Appended event [{}] with position [{}] and timestamp [{}].", new Object[]{event.event().identifier(), next, event.event().timestamp()});
                        }
                        return new GlobalIndexConsistencyMarker(marker);
                    }).reduce(ConsistencyMarker::upperBound).orElse(ConsistencyMarker.ORIGIN);
                    InMemoryEventStorageEngine.this.openStreams.forEach(m -> m.callback().run());
                    CompletableFuture<ConsistencyMarker> completableFuture = CompletableFuture.completedFuture(newLatest);
                    return completableFuture;
                }
                finally {
                    InMemoryEventStorageEngine.this.appendLock.unlock();
                }
            }

            @Override
            public CompletableFuture<ConsistencyMarker> afterCommit(@Nonnull ConsistencyMarker marker, @Nullable ProcessingContext context) {
                return CompletableFuture.completedFuture(marker);
            }

            @Override
            public void rollback(@Nullable ProcessingContext context) {
                this.finished.set(true);
            }
        });
    }

    private long nextIndex() {
        return this.eventStorage.isEmpty() ? 0L : (Long)this.eventStorage.lastKey() + 1L;
    }

    private boolean containsConflicts(AppendCondition condition) {
        if (Objects.equals(condition.consistencyMarker(), ConsistencyMarker.INFINITY)) {
            return false;
        }
        return this.eventStorage.tailMap(GlobalIndexConsistencyMarker.position(condition.consistencyMarker()) + 1L).values().stream().map(event -> event).anyMatch(taggedEvent -> condition.matches(taggedEvent.event().type().qualifiedName(), taggedEvent.tags()));
    }

    @Override
    public MessageStream<EventMessage> source(@Nonnull SourcingCondition condition, @Nullable ProcessingContext processingContext) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start sourcing events with condition [{}].", (Object)condition);
        }
        MapBackedSourcingEventMessageStream messageStream = new MapBackedSourcingEventMessageStream(this, condition.start(), this.eventStorage.isEmpty() ? -1L : (Long)this.eventStorage.lastKey(), condition);
        this.openStreams.add(messageStream);
        return messageStream;
    }

    @Override
    public MessageStream<EventMessage> stream(@Nonnull StreamingCondition condition, @Nullable ProcessingContext processingContext) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start streaming events with condition [{}].", (Object)condition);
        }
        MapBackedStreamingEventMessageStream messageStream = new MapBackedStreamingEventMessageStream(this, condition.position().position().orElse(-1L), (EventsCondition)condition);
        this.openStreams.add(messageStream);
        return messageStream;
    }

    private static boolean match(TaggedEventMessage<?> taggedEvent, EventsCondition condition) {
        QualifiedName qualifiedName = taggedEvent.event().type().qualifiedName();
        return condition.matches(qualifiedName, taggedEvent.tags());
    }

    @Override
    public CompletableFuture<TrackingToken> firstToken(@Nullable ProcessingContext processingContext) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation firstToken() is invoked.");
        }
        return CompletableFuture.completedFuture(this.eventStorage.isEmpty() ? new GlobalSequenceTrackingToken(this.offset - 1L) : new GlobalSequenceTrackingToken(((Long)this.eventStorage.firstKey()).longValue()));
    }

    @Override
    public CompletableFuture<TrackingToken> latestToken(@Nullable ProcessingContext processingContext) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation latestToken() is invoked.");
        }
        return CompletableFuture.completedFuture(this.eventStorage.isEmpty() ? new GlobalSequenceTrackingToken(this.offset - 1L) : new GlobalSequenceTrackingToken((Long)this.eventStorage.lastKey() + 1L));
    }

    @Override
    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant at, @Nullable ProcessingContext processingContext) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation tokenAt() is invoked with Instant [{}].", (Object)at);
        }
        return this.eventStorage.entrySet().stream().filter(positionToEventEntry -> {
            Object event = ((TaggedEventMessage)positionToEventEntry.getValue()).event();
            Instant eventTimestamp = event.timestamp();
            return eventTimestamp.equals(at) || eventTimestamp.isAfter(at);
        }).map(Map.Entry::getKey).min(Comparator.comparingLong(Long::longValue)).map(position -> position - 1L).map(GlobalSequenceTrackingToken::new).map(tt -> tt).map(CompletableFuture::completedFuture).orElseGet(() -> this.latestToken(processingContext));
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("offset", Long.valueOf(this.offset));
    }

    private class MapBackedSourcingEventMessageStream
    extends MapBackedMessageStream {
        private final AtomicBoolean sharedLastEntry = new AtomicBoolean(false);

        private MapBackedSourcingEventMessageStream(InMemoryEventStorageEngine inMemoryEventStorageEngine, long start, long end, EventsCondition condition) {
            super(start, end, condition);
        }

        @Override
        Optional<MessageStream.Entry<EventMessage>> lastEntry() {
            if (this.sharedLastEntry.compareAndSet(false, true)) {
                Context context = Context.with(ConsistencyMarker.RESOURCE_KEY, (Object)new GlobalIndexConsistencyMarker(this.end));
                return Optional.of(new SimpleEntry((Message)TerminalEventMessage.INSTANCE, context));
            }
            return Optional.empty();
        }

        @Override
        public boolean isCompleted() {
            return super.isCompleted() && this.sharedLastEntry.get();
        }

        @Override
        public boolean hasNextAvailable() {
            return super.hasNextAvailable() || !this.sharedLastEntry.get();
        }
    }

    private class MapBackedStreamingEventMessageStream
    extends MapBackedMessageStream {
        private MapBackedStreamingEventMessageStream(InMemoryEventStorageEngine inMemoryEventStorageEngine, long start, EventsCondition condition) {
            super(start, Long.MAX_VALUE, condition);
        }

        @Override
        Optional<MessageStream.Entry<EventMessage>> lastEntry() {
            return Optional.empty();
        }
    }

    private abstract class MapBackedMessageStream
    implements MessageStream<EventMessage> {
        private final AtomicLong position;
        protected final long end;
        private final EventsCondition condition;
        private final AtomicReference<Runnable> callback;

        private MapBackedMessageStream(long start, long end, EventsCondition condition) {
            this.position = new AtomicLong(start);
            this.end = end;
            this.condition = condition;
            this.callback = new AtomicReference<Runnable>(() -> {});
        }

        public Optional<MessageStream.Entry<EventMessage>> next() {
            long currentPosition = this.position.get();
            long lookupPosition = Math.max(0L, currentPosition);
            while (lookupPosition <= this.end && InMemoryEventStorageEngine.this.eventStorage.containsKey(lookupPosition) && this.position.compareAndSet(currentPosition, lookupPosition + 1L)) {
                TaggedEventMessage nextEvent = (TaggedEventMessage)InMemoryEventStorageEngine.this.eventStorage.get(lookupPosition);
                if (InMemoryEventStorageEngine.match(nextEvent, this.condition)) {
                    Context context = Context.empty();
                    context = TrackingToken.addToContext((Context)context, (TrackingToken)new GlobalSequenceTrackingToken(lookupPosition + 1L));
                    return Optional.of(new SimpleEntry(nextEvent.event(), context));
                }
                currentPosition = this.position.get();
                lookupPosition = Math.max(0L, currentPosition);
            }
            return this.lastEntry();
        }

        public Optional<MessageStream.Entry<EventMessage>> peek() {
            for (long currentPosition = Math.max(0L, this.position.get()); currentPosition <= this.end && InMemoryEventStorageEngine.this.eventStorage.containsKey(currentPosition); ++currentPosition) {
                TaggedEventMessage nextEvent = (TaggedEventMessage)InMemoryEventStorageEngine.this.eventStorage.get(currentPosition);
                if (!InMemoryEventStorageEngine.match(nextEvent, this.condition)) continue;
                Context context = Context.empty();
                context = TrackingToken.addToContext((Context)context, (TrackingToken)new GlobalSequenceTrackingToken(currentPosition + 1L));
                return Optional.of(new SimpleEntry(nextEvent.event(), context));
            }
            return this.lastEntry();
        }

        abstract Optional<MessageStream.Entry<EventMessage>> lastEntry();

        public void onAvailable(@Nonnull Runnable callback) {
            this.callback.set(callback);
            if (InMemoryEventStorageEngine.this.eventStorage.isEmpty() || InMemoryEventStorageEngine.this.eventStorage.containsKey(Math.max(0L, this.position.get()))) {
                callback.run();
            }
        }

        public Optional<Throwable> error() {
            return Optional.empty();
        }

        public boolean isCompleted() {
            long currentPosition = this.position.get();
            return currentPosition > this.end;
        }

        public boolean hasNextAvailable() {
            long currentPosition = Math.max(0L, this.position.get());
            return currentPosition <= this.end && InMemoryEventStorageEngine.this.eventStorage.containsKey(currentPosition);
        }

        public void close() {
            this.position.set(this.end + 1L);
        }

        Runnable callback() {
            return this.callback.get();
        }
    }
}

