/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import com.google.common.base.Strings;
import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.PersistentStream;
import io.axoniq.axonserver.connector.event.PersistentStreamCallbacks;
import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.streams.PersistentStreamEvent;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.event.axon.GrpcBackedDomainEventData;
import org.axonframework.axonserver.connector.event.axon.GrpcMetaDataAwareSerializer;
import org.axonframework.config.Configuration;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventUtils;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.TrackedDomainEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStreamConnection {
    private final Logger logger = LoggerFactory.getLogger(PersistentStreamConnection.class);
    private static final int MAX_MESSAGES_PER_RUN = 10000;
    private final String streamId;
    private final Configuration configuration;
    private final PersistentStreamProperties persistentStreamProperties;
    private final AtomicReference<PersistentStream> persistentStreamHolder = new AtomicReference();
    private final AtomicReference<Consumer<List<? extends EventMessage<?>>>> consumer;
    private final ScheduledExecutorService scheduler;
    private final int batchSize;
    private final Map<Integer, SegmentConnection> segments;
    private final AtomicInteger retrySeconds;
    private final String defaultContext;

    public PersistentStreamConnection(String streamId, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduler, int batchSize) {
        this.consumer = new AtomicReference<Consumer<List>>(events -> {});
        this.segments = new ConcurrentHashMap<Integer, SegmentConnection>();
        this.retrySeconds = new AtomicInteger(1);
        this.streamId = streamId;
        this.configuration = configuration;
        this.persistentStreamProperties = persistentStreamProperties;
        this.scheduler = scheduler;
        this.batchSize = batchSize;
        this.defaultContext = null;
    }

    public PersistentStreamConnection(String streamId, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduler, int batchSize, String defaultContext) {
        this.consumer = new AtomicReference<Consumer<List>>(events -> {});
        this.segments = new ConcurrentHashMap<Integer, SegmentConnection>();
        this.retrySeconds = new AtomicInteger(1);
        this.streamId = streamId;
        this.configuration = configuration;
        this.persistentStreamProperties = persistentStreamProperties;
        this.scheduler = scheduler;
        this.batchSize = batchSize;
        this.defaultContext = defaultContext;
    }

    public void open(Consumer<List<? extends EventMessage<?>>> consumer) {
        this.consumer.set(consumer);
        this.start();
    }

    private void start() {
        AxonServerConnectionManager axonServerConnectionManager = (AxonServerConnectionManager)this.configuration.getComponent(AxonServerConnectionManager.class);
        AxonServerConfiguration axonServerConfiguration = (AxonServerConfiguration)this.configuration.getComponent(AxonServerConfiguration.class);
        String context = Strings.isNullOrEmpty((String)this.defaultContext) ? axonServerConfiguration.getContext() : this.defaultContext;
        PersistentStreamCallbacks callbacks = new PersistentStreamCallbacks(this::segmentOpened, this::segmentClosed, this::messageAvailable, this::streamClosed);
        EventChannel eventChannel = axonServerConnectionManager.getConnection(context).eventChannel();
        PersistentStream persistentStream = eventChannel.openPersistentStream(this.streamId, axonServerConfiguration.getEventFlowControl().getPermits().intValue(), axonServerConfiguration.getEventFlowControl().getNrOfNewPermits().intValue(), callbacks, this.persistentStreamProperties);
        this.persistentStreamHolder.set(persistentStream);
    }

    private void segmentOpened(PersistentStreamSegment persistentStreamSegment) {
        this.logger.info("Segment opened: {}", (Object)persistentStreamSegment);
        this.retrySeconds.set(1);
        this.segments.put(persistentStreamSegment.segment(), new SegmentConnection(persistentStreamSegment));
    }

    private void segmentClosed(PersistentStreamSegment persistentStreamSegment) {
        SegmentConnection segmentConnection = this.segments.remove(persistentStreamSegment.segment());
        if (segmentConnection != null) {
            segmentConnection.close();
        }
        this.logger.info("Segment closed: {}", (Object)persistentStreamSegment);
    }

    private void messageAvailable(PersistentStreamSegment persistentStreamSegment) {
        SegmentConnection segmentConnection = this.segments.get(persistentStreamSegment.segment());
        if (segmentConnection != null) {
            segmentConnection.messageAvailable();
        }
    }

    private void streamClosed(Throwable throwable) {
        this.persistentStreamHolder.set(null);
        if (throwable != null) {
            this.logger.info("{}: Rescheduling persistent stream", (Object)this.streamId, (Object)throwable);
            this.scheduler.schedule(this::start, (long)this.retrySeconds.getAndUpdate(current -> Math.min(60, current * 2)), TimeUnit.SECONDS);
        }
    }

    public void close() {
        PersistentStream persistentStream = this.persistentStreamHolder.getAndSet(null);
        if (persistentStream != null) {
            persistentStream.close();
        }
    }

    private class SegmentConnection {
        private final AtomicBoolean processGate = new AtomicBoolean();
        private final AtomicBoolean doneConfirmed = new AtomicBoolean();
        private final AtomicBoolean closed = new AtomicBoolean();
        private final PersistentStreamSegment persistentStreamSegment;

        public SegmentConnection(PersistentStreamSegment persistentStreamSegment) {
            this.persistentStreamSegment = persistentStreamSegment;
        }

        public void messageAvailable() {
            if (!this.processGate.get()) {
                PersistentStreamConnection.this.scheduler.submit(() -> this.readMessagesFromSegment(this.persistentStreamSegment));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void readMessagesFromSegment(PersistentStreamSegment persistentStreamSegment) {
            if (!this.processGate.compareAndSet(false, true)) {
                return;
            }
            if (PersistentStreamConnection.this.logger.isTraceEnabled()) {
                PersistentStreamConnection.this.logger.trace("{}[{}] readMessagesFromSegment - closed: {}", new Object[]{PersistentStreamConnection.this.streamId, persistentStreamSegment.segment(), persistentStreamSegment.isClosed()});
            }
            try {
                List<PersistentStreamEvent> batch;
                int remaining = Math.max(10000, PersistentStreamConnection.this.batchSize);
                GrpcMetaDataAwareSerializer serializer = new GrpcMetaDataAwareSerializer(PersistentStreamConnection.this.configuration.eventSerializer());
                while (remaining > 0 && !this.closed.get() && !(batch = this.readBatch(persistentStreamSegment)).isEmpty()) {
                    List<TrackedEventMessage<?>> eventMessages = this.upcastAndDeserialize(batch, serializer);
                    if (this.closed.get()) continue;
                    ((Consumer)PersistentStreamConnection.this.consumer.get()).accept(eventMessages);
                    if (PersistentStreamConnection.this.logger.isTraceEnabled()) {
                        PersistentStreamConnection.this.logger.trace("{}/{} processed {} entries", new Object[]{PersistentStreamConnection.this.streamId, persistentStreamSegment.segment(), eventMessages.size()});
                    }
                    long token = batch.get(batch.size() - 1).getEvent().getToken();
                    persistentStreamSegment.acknowledge(token);
                    remaining -= batch.size();
                }
                this.acknowledgeDoneWhenClosed(persistentStreamSegment);
            }
            catch (StreamClosedException e) {
                PersistentStreamConnection.this.logger.debug("{}: Stream closed for segment {}", (Object)PersistentStreamConnection.this.streamId, (Object)persistentStreamSegment.segment());
                this.close();
            }
            catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                persistentStreamSegment.error(e.getMessage());
                PersistentStreamConnection.this.logger.warn("{}: Exception while processing events for segment {}", new Object[]{PersistentStreamConnection.this.streamId, persistentStreamSegment.segment(), e});
                this.close();
            }
            finally {
                this.processGate.set(false);
                if (!this.closed.get() && persistentStreamSegment.peek() != null) {
                    PersistentStreamConnection.this.scheduler.submit(() -> this.readMessagesFromSegment(persistentStreamSegment));
                }
            }
        }

        private List<PersistentStreamEvent> readBatch(PersistentStreamSegment persistentStreamSegment) throws InterruptedException {
            LinkedList<PersistentStreamEvent> batch = new LinkedList<PersistentStreamEvent>();
            PersistentStreamEvent event = (PersistentStreamEvent)persistentStreamSegment.nextIfAvailable();
            if (event == null) {
                return batch;
            }
            batch.add(event);
            while (batch.size() < PersistentStreamConnection.this.batchSize && !this.closed.get() && (event = (PersistentStreamEvent)persistentStreamSegment.nextIfAvailable(1L, TimeUnit.MILLISECONDS)) != null) {
                batch.add(event);
            }
            return batch;
        }

        private List<TrackedEventMessage<?>> upcastAndDeserialize(List<PersistentStreamEvent> batch, GrpcMetaDataAwareSerializer serializer) {
            return EventUtils.upcastAndDeserializeTrackedEvents(batch.stream().map(e -> {
                TrackingToken trackingToken = this.createToken((PersistentStreamEvent)e);
                return new TrackedDomainEventData(trackingToken, (DomainEventData)new GrpcBackedDomainEventData(e.getEvent().getEvent()));
            }), (Serializer)serializer, (EventUpcaster)PersistentStreamConnection.this.configuration.upcasterChain()).collect(Collectors.toList());
        }

        private void acknowledgeDoneWhenClosed(PersistentStreamSegment persistentStreamSegment) {
            if (this.closed.get() && this.doneConfirmed.compareAndSet(false, true)) {
                persistentStreamSegment.acknowledge(-45L);
            }
        }

        private TrackingToken createToken(PersistentStreamEvent event) {
            if (!event.getReplay()) {
                return new GlobalSequenceTrackingToken(event.getEvent().getToken());
            }
            return ReplayToken.createReplayToken((TrackingToken)new GlobalSequenceTrackingToken(event.getEvent().getToken() + 1L), (TrackingToken)new GlobalSequenceTrackingToken(event.getEvent().getToken()));
        }

        public void close() {
            this.closed.set(true);
        }
    }
}

