/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.IndexerInterface;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Requires(beans={ExecutionRepositoryInterface.class, LogRepositoryInterface.class, TriggerRepositoryInterface.class})
public class Indexer
implements IndexerInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(Indexer.class);
    private final ExecutionRepositoryInterface executionRepository;
    private final QueueInterface<Execution> executionQueue;
    private final LogRepositoryInterface logRepository;
    private final QueueInterface<LogEntry> logQueue;
    private final MetricRepositoryInterface metricRepository;
    private final QueueInterface<MetricEntry> metricQueue;
    private final MetricRegistry metricRegistry;

    @Inject
    public Indexer(ExecutionRepositoryInterface executionRepository, @Named(value="executionQueue") QueueInterface<Execution> executionQueue, LogRepositoryInterface logRepository, @Named(value="workerTaskLogQueue") QueueInterface<LogEntry> logQueue, MetricRepositoryInterface metricRepositor, @Named(value="workerTaskMetricQueue") QueueInterface<MetricEntry> metricQueue, MetricRegistry metricRegistry) {
        this.executionRepository = executionRepository;
        this.executionQueue = executionQueue;
        this.logRepository = logRepository;
        this.logQueue = logQueue;
        this.metricRepository = metricRepositor;
        this.metricQueue = metricQueue;
        this.metricRegistry = metricRegistry;
    }

    @Override
    public void run() {
        this.send(this.executionQueue, this.executionRepository);
        this.send(this.logQueue, this.logRepository);
        this.send(this.metricQueue, this.metricRepository);
    }

    protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
        queueInterface.receive(Indexer.class, either -> {
            if (either.isRight()) {
                log.error("unable to deserialize an item: {}", (Object)((DeserializationException)either.getRight()).getMessage());
                return;
            }
            Object item = either.getLeft();
            this.metricRegistry.counter("indexer.request.count", "type", item.getClass().getName()).increment();
            this.metricRegistry.counter("indexer.message.in.count", "type", item.getClass().getName()).increment();
            this.metricRegistry.timer("indexer.request.duration", "type", item.getClass().getName()).record(() -> {
                saveRepositoryInterface.save(item);
                this.metricRegistry.counter("indexer.message.out.count", "type", item.getClass().getName()).increment();
            });
        });
    }

    @Override
    public void close() throws IOException {
        this.executionQueue.close();
        this.logQueue.close();
        this.metricQueue.close();
    }
}

