/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class FlowListeners
implements FlowListenersInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlowListeners.class);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final QueueInterface<FlowInterface> flowQueue;
    private final List<FlowWithSource> flows;
    private final List<Consumer<List<FlowWithSource>>> consumers = new ArrayList<Consumer<List<FlowWithSource>>>();
    private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new ArrayList<BiConsumer<FlowWithSource, FlowWithSource>>();
    private final PluginDefaultService pluginDefaultService;

    @Inject
    public FlowListeners(FlowRepositoryInterface flowRepository, @Named(value="flowQueue") QueueInterface<FlowInterface> flowQueue, PluginDefaultService pluginDefaultService) {
        this.flowQueue = flowQueue;
        this.flows = new ArrayList<FlowWithSource>(flowRepository.findAllWithSourceForAllTenants());
        this.pluginDefaultService = pluginDefaultService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            if (this.isStarted.compareAndSet(false, true)) {
                this.flowQueue.receive(either -> {
                    FlowWithSource flow;
                    if (either.isRight()) {
                        flow = FlowWithException.from(((DeserializationException)either.getRight()).getRecord(), (Exception)either.getRight(), log).orElse(null);
                    } else {
                        try {
                            flow = this.pluginDefaultService.injectVersionDefaults((FlowInterface)either.getLeft(), true);
                        }
                        catch (FlowProcessingException ignore) {
                            flow = null;
                        }
                    }
                    if (flow == null) {
                        return;
                    }
                    FlowWithSource previous = this.previous(flow).orElse(null);
                    if (flow.isDeleted()) {
                        this.remove(flow);
                    } else {
                        this.upsert(flow);
                    }
                    if (log.isTraceEnabled()) {
                        log.trace("Received {} flow '{}.{}'", new Object[]{flow.isDeleted() ? "deletion" : "update", flow.getNamespace(), flow.getId()});
                    }
                    this.notifyConsumersEach(flow, previous);
                    this.notifyConsumers();
                });
                if (log.isTraceEnabled()) {
                    log.trace("FlowListenersService started with {} flows", (Object)this.flows.size());
                }
            }
            this.notifyConsumers();
        }
    }

    private Optional<FlowWithSource> previous(FlowWithSource flow) {
        ArrayList<FlowWithSource> copy = new ArrayList<FlowWithSource>(this.flows);
        return copy.stream().filter(r -> r.isSameId(flow)).findFirst();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean remove(FlowInterface flow) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            boolean remove = this.flows.removeIf(r -> r.isSameId(flow));
            if (!remove && flow.isDeleted()) {
                log.warn("Can't remove flow {}.{}", (Object)flow.getNamespace(), (Object)flow.getId());
            }
            return remove;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void upsert(FlowWithSource flow) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.remove(flow);
            this.flows.add(flow);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyConsumers() {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumers.forEach(consumer -> consumer.accept(new ArrayList<FlowWithSource>(this.flows)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyConsumersEach(FlowWithSource flow, FlowWithSource previous) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumersEach.forEach(consumer -> consumer.accept(flow, previous));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void listen(Consumer<List<FlowWithSource>> consumer) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumers.add(consumer);
            consumer.accept(new ArrayList<FlowWithSource>(this.flows()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void listen(BiConsumer<FlowWithSource, FlowWithSource> consumer) {
        FlowListeners flowListeners = this;
        synchronized (flowListeners) {
            this.consumersEach.add(consumer);
        }
    }

    @Override
    public List<FlowWithSource> flows() {
        return new ArrayList<FlowWithSource>(this.flows);
    }
}

