/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.runner.memory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.runner.memory.MemoryQueueEnabled;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@MemoryQueueEnabled
public class MemoryFlowListeners
implements FlowListenersInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MemoryFlowListeners.class);
    private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
    private static final TypeReference<List<Flow>> TYPE_REFERENCE = new TypeReference<List<Flow>>(){};
    private final QueueInterface<Flow> flowQueue;
    private final List<Flow> flows;
    private final List<Consumer<List<Flow>>> consumers = new ArrayList<Consumer<List<Flow>>>();

    @Inject
    public MemoryFlowListeners(FlowRepositoryInterface flowRepository, @Named(value="flowQueue") QueueInterface<Flow> flowQueue) {
        this.flowQueue = flowQueue;
        this.flows = flowRepository.findAll();
    }

    public void run() {
        this.flowQueue.receive(flow -> {
            if (flow.isDeleted()) {
                this.remove((Flow)flow);
            } else {
                this.upsert((Flow)flow);
            }
            if (log.isTraceEnabled()) {
                log.trace("Received {} flow '{}.{}'", new Object[]{flow.isDeleted() ? "deletion" : "update", flow.getNamespace(), flow.getId()});
            }
            this.notifyConsumers();
        });
        this.notifyConsumers();
        if (log.isTraceEnabled()) {
            log.trace("FlowListenersService started with {} flows", (Object)this.flows.size());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean remove(Flow flow) {
        MemoryFlowListeners memoryFlowListeners = this;
        synchronized (memoryFlowListeners) {
            boolean remove = this.flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
            if (!remove && flow.isDeleted()) {
                log.warn("Can't remove flow {}.{}", (Object)flow.getNamespace(), (Object)flow.getId());
            }
            return remove;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void upsert(Flow flow) {
        MemoryFlowListeners memoryFlowListeners = this;
        synchronized (memoryFlowListeners) {
            this.remove(flow);
            this.flows.add(flow);
        }
    }

    private void notifyConsumers() {
        this.consumers.forEach(consumer -> consumer.accept(new ArrayList<Flow>(this.flows)));
    }

    public void listen(Consumer<List<Flow>> consumer) {
        this.consumers.add(consumer);
        consumer.accept(new ArrayList<Flow>(this.flows()));
    }

    public List<Flow> flows() {
        return (List)MAPPER.readValue(MAPPER.writeValueAsString(this.flows), TYPE_REFERENCE);
    }
}

