/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axondb.client.axon;

import io.axoniq.axondb.client.ClientConnectionException;
import io.axoniq.axondb.client.axon.GrpcBackedDomainEventData;
import io.axoniq.axondb.client.axon.GrpcMetaDataAwareSerializer;
import io.axoniq.axondb.grpc.EventWithToken;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.axonframework.common.ObjectUtils;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.GlobalSequenceTrackingToken;
import org.axonframework.eventsourcing.eventstore.TrackedDomainEventData;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventBuffer
implements TrackingEventStream {
    final Logger logger = LoggerFactory.getLogger(EventBuffer.class);
    private final BlockingQueue<TrackedEventData<byte[]>> events;
    private final Iterator<TrackedEventMessage<?>> eventStream;
    private final int heartbeatInterval;
    private final AtomicLong lastServerInteraction = new AtomicLong(System.currentTimeMillis());
    private TrackedEventData<byte[]> peekData;
    private TrackedEventMessage<?> peekEvent;
    private Consumer<EventBuffer> closeCallback;
    private volatile RuntimeException exception;
    private volatile boolean closed;
    private Consumer<Integer> consumeListener = i -> {};

    public EventBuffer(EventUpcaster upcasterChain, Serializer serializer, int heartbeatInterval) {
        this(upcasterChain, serializer, heartbeatInterval, new LinkedBlockingQueue<TrackedEventData<byte[]>>());
    }

    protected EventBuffer(EventUpcaster upcasterChain, Serializer serializer, int heartbeatInterval, BlockingQueue<TrackedEventData<byte[]>> events) {
        this.heartbeatInterval = heartbeatInterval;
        this.events = events;
        this.eventStream = EventUtils.upcastAndDeserializeTrackedEvents(StreamSupport.stream(new SimpleSpliterator<TrackedEventData>(this::poll), false), (Serializer)new GrpcMetaDataAwareSerializer(serializer), (EventUpcaster)((EventUpcaster)ObjectUtils.getOrDefault((Object)upcasterChain, (Object)NoOpEventUpcaster.INSTANCE)), (boolean)true).iterator();
    }

    private TrackedEventData<byte[]> poll() {
        if (this.peekData != null) {
            TrackedEventData<byte[]> nextItem = this.peekData;
            this.peekData = null;
            return nextItem;
        }
        TrackedEventData nextItem = (TrackedEventData)this.events.poll();
        if (nextItem != null) {
            this.consumeListener.accept(1);
        }
        return nextItem;
    }

    private void waitForData(long deadline) throws InterruptedException {
        long now = System.currentTimeMillis();
        long timeLeft = deadline - now;
        if (this.peekData == null && timeLeft > 0L) {
            this.peekData = this.events.poll(this.heartbeatInterval > 0 ? Math.min((long)this.heartbeatInterval, timeLeft) : timeLeft, TimeUnit.MILLISECONDS);
            if (this.peekData != null) {
                this.consumeListener.accept(1);
            } else if (this.heartbeatInterval > 0 && this.lastServerInteraction.get() < System.currentTimeMillis() - (long)(2 * this.heartbeatInterval)) {
                this.fail(new ClientConnectionException("Connection timed out"));
                this.close();
            }
        }
    }

    public void registerCloseListener(Consumer<EventBuffer> closeCallback) {
        this.closeCallback = closeCallback;
    }

    public void registerConsumeListener(Consumer<Integer> consumeListener) {
        this.consumeListener = consumeListener;
    }

    public Optional<TrackedEventMessage<?>> peek() {
        if (this.peekEvent == null && this.eventStream.hasNext()) {
            this.peekEvent = this.eventStream.next();
        }
        return Optional.ofNullable(this.peekEvent);
    }

    public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) {
        long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        try {
            while (this.exception == null && this.peekEvent == null && !this.eventStream.hasNext() && System.currentTimeMillis() < deadline) {
                this.waitForData(deadline);
            }
            if (this.exception != null) {
                RuntimeException runtimeException = this.exception;
                this.exception = null;
                throw runtimeException;
            }
            return this.peekEvent != null || this.eventStream.hasNext();
        }
        catch (InterruptedException e) {
            this.logger.warn("Consumer thread was interrupted. Returning thread to event processor.", (Throwable)e);
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public TrackedEventMessage<?> nextAvailable() {
        try {
            this.hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            TrackedEventMessage<?> trackedEventMessage = this.peekEvent == null ? this.eventStream.next() : this.peekEvent;
            return trackedEventMessage;
        }
        finally {
            this.peekEvent = null;
        }
    }

    public void close() {
        this.closed = true;
        if (this.closeCallback != null) {
            this.closeCallback.accept(this);
        }
        this.events.clear();
    }

    public boolean push(EventWithToken event) {
        if (this.closed) {
            this.logger.debug("Received event while closed: {}", (Object)event.getToken());
            return false;
        }
        try {
            this.touch();
            GlobalSequenceTrackingToken trackingToken = new GlobalSequenceTrackingToken(event.getToken());
            this.events.put((TrackedEventData<byte[]>)new TrackedDomainEventData((TrackingToken)trackingToken, (DomainEventData)new GrpcBackedDomainEventData(event.getEvent())));
        }
        catch (InterruptedException e) {
            this.closeCallback.accept(this);
            return false;
        }
        return true;
    }

    public void fail(RuntimeException e) {
        this.exception = e;
    }

    public void touch() {
        this.lastServerInteraction.set(System.currentTimeMillis());
    }

    private static class SimpleSpliterator<T>
    implements Spliterator<T> {
        private final Supplier<T> supplier;

        protected SimpleSpliterator(Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T nextValue = this.supplier.get();
            if (nextValue != null) {
                action.accept(nextValue);
            }
            return nextValue != null;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 1296;
        }
    }
}

