/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowListenersInterface;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class FlowListeners
implements FlowListenersInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlowListeners.class);
    private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
    private static final TypeReference<List<Flow>> TYPE_REFERENCE = new TypeReference<List<Flow>>(){};
    private Boolean isStarted = false;
    private final QueueInterface<Flow> flowQueue;
    private final List<Flow> flows;
    private final List<Consumer<List<Flow>>> consumers = new ArrayList<Consumer<List<Flow>>>();
    private final List<BiConsumer<Flow, Flow>> consumersEach = new ArrayList<BiConsumer<Flow, Flow>>();

    @Inject
    public FlowListeners(FlowRepositoryInterface flowRepository, @Named(value="flowQueue") QueueInterface<Flow> flowQueue) {
        this.flowQueue = flowQueue;
        this.flows = flowRepository.findAll().stream().filter(flow -> !(flow instanceof FlowWithException)).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            if (!this.isStarted.booleanValue()) {
                this.isStarted = true;
                this.flowQueue.receive(flow -> {
                    Optional<Flow> previous = this.previous((Flow)flow);
                    if (flow.isDeleted()) {
                        this.remove((Flow)flow);
                    } else {
                        this.upsert((Flow)flow);
                    }
                    if (log.isTraceEnabled()) {
                        log.trace("Received {} flow '{}.{}'", new Object[]{flow.isDeleted() ? "deletion" : "update", flow.getNamespace(), flow.getId()});
                    }
                    this.notifyConsumersEach((Flow)flow, previous.orElse(null));
                    this.notifyConsumers();
                });
                if (log.isTraceEnabled()) {
                    log.trace("FlowListenersService started with {} flows", (Object)this.flows.size());
                }
            }
            this.notifyConsumers();
        }
    }

    private Optional<Flow> previous(Flow flow) {
        return this.flows.stream().filter(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId())).findFirst();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean remove(Flow flow) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            boolean remove = this.flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
            if (!remove && flow.isDeleted()) {
                log.warn("Can't remove flow {}.{}", (Object)flow.getNamespace(), (Object)flow.getId());
            }
            return remove;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void upsert(Flow flow) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.remove(flow);
            this.flows.add(flow);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyConsumers() {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumers.forEach(consumer -> consumer.accept(new ArrayList<Flow>(this.flows)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyConsumersEach(Flow flow, Flow previous) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumersEach.forEach(consumer -> consumer.accept(flow, previous));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void listen(Consumer<List<Flow>> consumer) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumers.add(consumer);
            consumer.accept(new ArrayList<Flow>(this.flows()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void listen(BiConsumer<Flow, Flow> consumer) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumersEach.add(consumer);
        }
    }

    @Override
    public List<Flow> flows() {
        return (List)MAPPER.readValue(MAPPER.writeValueAsString(this.flows), TYPE_REFERENCE);
    }
}

