/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc.runner;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.runners.Indexer;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.jdbc.runner.JdbcQueue;
import io.kestra.jdbc.runner.JdbcRunnerEnabled;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@JdbcRunnerEnabled
public class JdbcIndexer
implements Indexer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcIndexer.class);
    private final LogRepositoryInterface logRepository;
    private final JdbcQueue<LogEntry> logQueue;
    private final MetricRepositoryInterface metricRepository;
    private final JdbcQueue<MetricEntry> metricQueue;
    private final MetricRegistry metricRegistry;
    private final List<Runnable> receiveCancellations = new ArrayList<Runnable>();
    private final String id = IdUtils.create();
    private final AtomicReference<Service.ServiceState> state = new AtomicReference();
    private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final SkipExecutionService skipExecutionService;
    private final QueueService queueService;

    @Inject
    public JdbcIndexer(LogRepositoryInterface logRepository, @Named(value="workerTaskLogQueue") QueueInterface<LogEntry> logQueue, MetricRepositoryInterface metricRepositor, @Named(value="workerTaskMetricQueue") QueueInterface<MetricEntry> metricQueue, MetricRegistry metricRegistry, ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher, SkipExecutionService skipExecutionService, QueueService queueService) {
        this.logRepository = logRepository;
        this.logQueue = (JdbcQueue)logQueue;
        this.metricRepository = metricRepositor;
        this.metricQueue = (JdbcQueue)metricQueue;
        this.metricRegistry = metricRegistry;
        this.eventPublisher = eventPublisher;
        this.skipExecutionService = skipExecutionService;
        this.queueService = queueService;
        this.setState(Service.ServiceState.CREATED);
    }

    public void run() {
        log.debug("Starting the indexer");
        this.startQueues();
        this.setState(Service.ServiceState.RUNNING);
        log.info("Indexer started");
    }

    protected void startQueues() {
        this.sendBatch(this.logQueue, (SaveRepositoryInterface)this.logRepository);
        this.sendBatch(this.metricQueue, (SaveRepositoryInterface)this.metricRepository);
    }

    protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
        this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
            eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", (Object)((DeserializationException)((Object)((Object)((Object)either.getRight())))).getMessage()));
            List<Object> items = eithers.stream().filter(either -> either.isLeft()).map(either -> either.getLeft()).filter(it -> {
                if (this.skipExecutionService.skipIndexerRecord(this.queueService.key(it))) {
                    log.warn("Skipping indexer record for key: {}", (Object)this.queueService.key(it));
                    return false;
                }
                return true;
            }).toList();
            if (!ListUtils.isEmpty(items)) {
                String itemClassName = items.getFirst().getClass().getName();
                this.metricRegistry.counter("indexer.request.count", "Total number of batches of records received by the Indexer", new String[]{"type", itemClassName}).increment();
                this.metricRegistry.counter("indexer.message.in.count", "Total number of records received by the Indexer", new String[]{"type", itemClassName}).increment((double)items.size());
                this.metricRegistry.timer("indexer.request.duration", "Batch of records duration inside the Indexer", new String[]{"type", itemClassName}).record(() -> {
                    int saved = saveRepositoryInterface.saveBatch(items);
                    this.metricRegistry.counter("indexer.message.out.count", "Total number of records indexed by the Indexer", new String[]{"type", itemClassName}).increment((double)saved);
                });
            }
        }));
    }

    private void setState(Service.ServiceState state) {
        this.state.set(state);
        this.eventPublisher.publishEvent((Object)new ServiceStateChangeEvent((Service)this));
    }

    public String getId() {
        return this.id;
    }

    public ServiceType getType() {
        return ServiceType.INDEXER;
    }

    public Service.ServiceState getState() {
        return this.state.get();
    }

    @PreDestroy
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.setState(Service.ServiceState.TERMINATING);
            if (log.isDebugEnabled()) {
                log.debug("Terminating");
            }
            this.receiveCancellations.forEach(Runnable::run);
            try {
                this.stopQueue();
                this.setState(Service.ServiceState.TERMINATED_GRACEFULLY);
            }
            catch (IOException e) {
                log.error("Failed to close the queue", (Throwable)e);
                this.setState(Service.ServiceState.TERMINATED_FORCED);
            }
        }
    }

    protected void stopQueue() throws IOException {
        this.logQueue.close();
        this.metricQueue.close();
    }
}

