/*
 * Decompiled with CFR 0.152.
 */
package org.occurrent.subscription.inmemory;

import io.cloudevents.CloudEvent;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.occurrent.filter.Filter;
import org.occurrent.retry.RetryStrategy;
import org.occurrent.subscription.OccurrentSubscriptionFilter;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.api.blocking.Subscription;
import org.occurrent.subscription.api.blocking.SubscriptionModel;
import org.occurrent.subscription.inmemory.InMemorySubscription;
import org.occurrent.subscription.internal.ExecutorShutdown;

public class InMemorySubscriptionModel
implements SubscriptionModel,
Consumer<Stream<CloudEvent>> {
    private final ConcurrentMap<String, InMemorySubscription> subscriptions;
    private final ConcurrentMap<String, Boolean> pausedSubscriptions;
    private final ExecutorService cloudEventDispatcher;
    private final RetryStrategy retryStrategy;
    private final Supplier<BlockingQueue<CloudEvent>> queueSupplier;
    private volatile boolean shutdown = false;
    private volatile boolean running = true;

    public InMemorySubscriptionModel() {
        this((RetryStrategy)RetryStrategy.fixed((long)200L));
    }

    public InMemorySubscriptionModel(RetryStrategy retryStrategy) {
        this(Executors.newCachedThreadPool(), retryStrategy);
    }

    public InMemorySubscriptionModel(ExecutorService cloudEventDispatcher, RetryStrategy retryStrategy) {
        this(cloudEventDispatcher, retryStrategy, LinkedBlockingQueue::new);
    }

    public InMemorySubscriptionModel(ExecutorService cloudEventDispatcher, RetryStrategy retryStrategy, Supplier<BlockingQueue<CloudEvent>> queue) {
        if (cloudEventDispatcher == null) {
            throw new IllegalArgumentException("cloudEventDispatcher cannot be null");
        }
        if (retryStrategy == null) {
            throw new IllegalArgumentException(RetryStrategy.class.getSimpleName() + " cannot be null");
        }
        if (queue == null) {
            throw new IllegalArgumentException(BlockingQueue.class.getSimpleName() + " cannot be null");
        }
        this.queueSupplier = queue;
        this.cloudEventDispatcher = cloudEventDispatcher;
        this.retryStrategy = retryStrategy;
        this.subscriptions = new ConcurrentHashMap<String, InMemorySubscription>();
        this.pausedSubscriptions = new ConcurrentHashMap<String, Boolean>();
    }

    public synchronized Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action) {
        if (this.shutdown) {
            throw new IllegalStateException("Cannot subscribe when shutdown");
        }
        if (subscriptionId == null) {
            throw new IllegalArgumentException("subscriptionId cannot be null");
        }
        if (action == null) {
            throw new IllegalArgumentException("action cannot be null");
        }
        if (this.subscriptions.containsKey(subscriptionId) || this.pausedSubscriptions.containsKey(subscriptionId)) {
            throw new IllegalArgumentException("Subscription " + subscriptionId + " is already defined.");
        }
        if (startAt == null) {
            throw new IllegalArgumentException(StartAt.class.getSimpleName() + " cannot be null");
        }
        StartAt startAtToUse = startAt.get(new StartAt.SubscriptionModelContext(InMemorySubscriptionModel.class));
        if (!startAtToUse.isNow() && !startAtToUse.isDefault()) {
            throw new IllegalArgumentException(InMemorySubscriptionModel.class.getSimpleName() + " only supports starting from 'now' and 'default' (StartAt.now() or StartAt.subscriptionModelDefault())");
        }
        Filter f = InMemorySubscriptionModel.getFilter(filter);
        InMemorySubscription subscription = new InMemorySubscription(subscriptionId, this.queueSupplier.get(), action, f, this.retryStrategy);
        this.subscriptions.put(subscriptionId, subscription);
        if (!this.running) {
            this.pausedSubscriptions.put(subscriptionId, true);
        }
        this.cloudEventDispatcher.execute(subscription);
        return subscription;
    }

    public void cancelSubscription(String subscriptionId) {
        this.subscriptions.remove(subscriptionId);
        this.pausedSubscriptions.remove(subscriptionId);
    }

    @Override
    public void accept(Stream<CloudEvent> cloudEventStream) {
        if (!this.running) {
            return;
        }
        List<CloudEvent> cloudEvents = cloudEventStream.toList();
        this.subscriptions.values().forEach(subscription -> {
            if (this.isRunning(subscription.id())) {
                cloudEvents.stream().filter(subscription::matches).forEach(subscription::eventAvailable);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @PreDestroy
    public void shutdown() {
        ConcurrentMap<String, InMemorySubscription> concurrentMap = this.subscriptions;
        synchronized (concurrentMap) {
            this.shutdown = true;
            this.subscriptions.values().forEach(InMemorySubscription::shutdown);
            this.subscriptions.clear();
        }
        this.pausedSubscriptions.clear();
        ExecutorShutdown.shutdownSafely((ExecutorService)this.cloudEventDispatcher, (long)5L, (TimeUnit)TimeUnit.SECONDS);
    }

    private static Filter getFilter(SubscriptionFilter filter) {
        Filter f;
        if (filter == null) {
            f = Filter.all();
        } else if (filter instanceof OccurrentSubscriptionFilter) {
            f = ((OccurrentSubscriptionFilter)filter).filter;
        } else {
            throw new IllegalArgumentException(InMemorySubscriptionModel.class.getSimpleName() + " only support filters of type " + OccurrentSubscriptionFilter.class.getName());
        }
        return f;
    }

    public void stop() {
        this.running = false;
        this.subscriptions.values().forEach(subscription -> this.pausedSubscriptions.put(subscription.id(), true));
    }

    public void start(boolean resumeSubscriptionsAutomatically) {
        this.running = true;
        if (resumeSubscriptionsAutomatically) {
            this.pausedSubscriptions.clear();
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public boolean isRunning(String subscriptionId) {
        return this.running && this.subscriptions.containsKey(subscriptionId) && !this.pausedSubscriptions.containsKey(subscriptionId);
    }

    public boolean isPaused(String subscriptionId) {
        return this.pausedSubscriptions.containsKey(subscriptionId);
    }

    public Subscription resumeSubscription(String subscriptionId) {
        if (!this.isPaused(subscriptionId)) {
            throw new IllegalArgumentException("Subscription " + subscriptionId + " is not paused");
        }
        this.running = true;
        this.pausedSubscriptions.remove(subscriptionId);
        return (Subscription)this.subscriptions.get(subscriptionId);
    }

    public void pauseSubscription(String subscriptionId) {
        if (!this.isRunning(subscriptionId)) {
            throw new IllegalArgumentException("Subscription " + subscriptionId + " is not running");
        }
        this.pausedSubscriptions.put(subscriptionId, true);
    }
}

