/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.axonframework.axonserver.connector.event.axon.PersistentStreamConnection;
import org.axonframework.common.Registration;
import org.axonframework.config.Configuration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.SubscribableMessageSource;

public class PersistentStreamMessageSource
implements SubscribableMessageSource<EventMessage<?>> {
    private final PersistentStreamConnection persistentStreamConnection;
    private final String name;
    private Consumer<List<? extends EventMessage<?>>> consumer = NO_OP_CONSUMER;
    private static final Consumer<List<? extends EventMessage<?>>> NO_OP_CONSUMER = events -> {};

    public PersistentStreamMessageSource(String name, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduler, int batchSize) {
        this(name, configuration, persistentStreamProperties, scheduler, batchSize, null);
    }

    public PersistentStreamMessageSource(String name, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduler, int batchSize, String context) {
        this.name = name;
        this.persistentStreamConnection = new PersistentStreamConnection(name, configuration, persistentStreamProperties, scheduler, batchSize, context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Registration subscribe(@Nonnull Consumer<List<? extends EventMessage<?>>> consumer) {
        PersistentStreamMessageSource persistentStreamMessageSource = this;
        synchronized (persistentStreamMessageSource) {
            boolean noConsumer = this.consumer.equals(NO_OP_CONSUMER);
            if (noConsumer) {
                this.persistentStreamConnection.open(consumer);
                this.consumer = consumer;
            } else {
                boolean sameConsumer = this.consumer.equals(consumer);
                if (!sameConsumer) {
                    throw new IllegalStateException(String.format("%s: Cannot subscribe to PersistentStreamMessageSource with another consumer: there is already an active subscription.", this.name));
                }
            }
        }
        return () -> {
            PersistentStreamMessageSource persistentStreamMessageSource = this;
            synchronized (persistentStreamMessageSource) {
                this.persistentStreamConnection.close();
                this.consumer = NO_OP_CONSUMER;
                return true;
            }
        };
    }
}

