/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.utils;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.messaging.StreamableMessageSource;

public class InMemoryStreamableEventSource
implements StreamableMessageSource<TrackedEventMessage<?>> {
    private static final String FAIL_PAYLOAD = "FAIL";
    public static final EventMessage<String> FAIL_EVENT = GenericEventMessage.asEventMessage((Object)"FAIL");
    private List<TrackedEventMessage<?>> messages = new CopyOnWriteArrayList();
    private final boolean streamCallbackSupported;
    private final List<TrackedEventMessage<?>> ignoredEvents = new CopyOnWriteArrayList();
    private Runnable onAvailableCallback = null;

    public InMemoryStreamableEventSource() {
        this(false);
    }

    public InMemoryStreamableEventSource(boolean streamCallbackSupported) {
        this.streamCallbackSupported = streamCallbackSupported;
    }

    public BlockingStream<TrackedEventMessage<?>> openStream(TrackingToken trackingToken) {
        return new BlockingStream<TrackedEventMessage<?>>(){
            private int lastToken;

            public Optional<TrackedEventMessage<?>> peek() {
                if (InMemoryStreamableEventSource.this.messages.size() > this.lastToken) {
                    return Optional.of(InMemoryStreamableEventSource.this.messages.get(this.lastToken));
                }
                return Optional.empty();
            }

            public boolean hasNextAvailable(int timeout, TimeUnit unit) {
                return this.peek().isPresent();
            }

            public TrackedEventMessage<?> nextAvailable() {
                TrackedEventMessage<?> next = this.peek().orElseThrow(() -> new RuntimeException("The processor should never perform a blocking call"));
                this.lastToken = (int)next.trackingToken().position().orElseThrow(() -> new UnsupportedOperationException("Not supported"));
                if (next.getPayload().equals(InMemoryStreamableEventSource.FAIL_PAYLOAD)) {
                    throw new IllegalStateException("Cannot retrieve event at position [" + this.lastToken + "].");
                }
                return next;
            }

            public void close() {
                InMemoryStreamableEventSource.this.clearAllMessages();
            }

            public void skipMessagesWithPayloadTypeOf(TrackedEventMessage<?> ignoredEvent) {
                InMemoryStreamableEventSource.this.ignoredEvents.add(ignoredEvent);
            }

            public boolean setOnAvailableCallback(Runnable callback) {
                InMemoryStreamableEventSource.this.onAvailableCallback = callback;
                return InMemoryStreamableEventSource.this.streamCallbackSupported;
            }
        };
    }

    private synchronized void clearAllMessages() {
        this.messages = new CopyOnWriteArrayList();
    }

    public TrackingToken createTailToken() {
        return null;
    }

    public TrackingToken createHeadToken() {
        if (this.messages.isEmpty()) {
            return null;
        }
        return this.messages.get(this.messages.size() - 1).trackingToken();
    }

    public TrackingToken createTokenAt(Instant dateTime) {
        throw new UnsupportedOperationException("Not supported for InMemoryStreamableEventSource");
    }

    public TrackingToken createTokenSince(Duration duration) {
        throw new UnsupportedOperationException("Not supported for InMemoryStreamableEventSource");
    }

    public synchronized void publishMessage(EventMessage<?> event) {
        int nextToken = this.messages.size();
        this.messages.add((TrackedEventMessage<?>)new GenericTrackedEventMessage((TrackingToken)new GlobalSequenceTrackingToken((long)(nextToken + 1)), event));
    }

    public List<TrackedEventMessage<?>> getIgnoredEvents() {
        return Collections.unmodifiableList(this.ignoredEvents);
    }

    public void runOnAvailableCallback() {
        this.onAvailableCallback.run();
    }
}

