/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.services;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueInterface;
import io.micronaut.http.sse.Event;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

@Singleton
public class LogStreamingService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(LogStreamingService.class);
    private final Map<String, Map<String, Pair<FluxSink<Event<LogEntry>>, List<String>>>> subscribers = new ConcurrentHashMap<String, Map<String, Pair<FluxSink<Event<LogEntry>>, List<String>>>>();
    private final Object subscriberLock = new Object();
    @Inject
    @Named(value="workerTaskLogQueue")
    protected QueueInterface<LogEntry> logQueue;
    private Runnable queueConsumer;

    @PostConstruct
    void startQueueConsumer() {
        this.queueConsumer = this.logQueue.receive(either -> {
            if (either.isRight()) {
                log.error("Unable to deserialize log: {}", (Object)((DeserializationException)either.getRight()).getMessage());
                return;
            }
            LogEntry current = (LogEntry)either.getLeft();
            if (current.getExecutionId() == null) {
                return;
            }
            Map<String, Pair<FluxSink<Event<LogEntry>>, List<String>>> executionSubscribers = this.subscribers.get(current.getExecutionId());
            if (executionSubscribers != null && !executionSubscribers.isEmpty()) {
                executionSubscribers.values().forEach(pair -> {
                    FluxSink sink = (FluxSink)pair.getLeft();
                    List levels = (List)pair.getRight();
                    if (levels.contains(current.getLevel().name())) {
                        sink.next((Object)Event.of((Object)current).id("progress"));
                    }
                });
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerSubscriber(String executionId, String subscriberId, FluxSink<Event<LogEntry>> sink, List<String> levels) {
        Object object = this.subscriberLock;
        synchronized (object) {
            this.subscribers.computeIfAbsent(executionId, k -> new ConcurrentHashMap()).put(subscriberId, Pair.of(sink, levels));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterSubscriber(String executionId, String subscriberId) {
        Object object = this.subscriberLock;
        synchronized (object) {
            Map<String, Pair<FluxSink<Event<LogEntry>>, List<String>>> executionSubscribers = this.subscribers.get(executionId);
            if (executionSubscribers != null) {
                executionSubscribers.remove(subscriberId);
                if (executionSubscribers.isEmpty()) {
                    this.subscribers.remove(executionId);
                }
            }
        }
    }

    @PreDestroy
    void shutdown() {
        if (this.queueConsumer != null) {
            this.queueConsumer.run();
        }
    }
}

