/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.WorkerInstanceRepositoryInterface;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcHeartbeat;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import java.util.function.Consumer;
import lombok.Generated;
import org.jooq.DSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class JdbcWorkerJobQueueService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcWorkerJobQueueService.class);
    private final JdbcQueue<WorkerJob> workerTaskQueue;
    private final JdbcHeartbeat jdbcHeartbeat;
    private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
    private final WorkerInstanceRepositoryInterface workerInstanceRepository;
    private Runnable queueStop;

    public JdbcWorkerJobQueueService(ApplicationContext applicationContext) {
        this.workerTaskQueue = (JdbcQueue)applicationContext.getBean(QueueInterface.class, Qualifiers.byName((String)"workerJobQueue"));
        this.jdbcHeartbeat = (JdbcHeartbeat)applicationContext.getBean(JdbcHeartbeat.class);
        this.jdbcWorkerJobRunningRepository = (AbstractJdbcWorkerJobRunningRepository)applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class);
        this.workerInstanceRepository = (WorkerInstanceRepositoryInterface)applicationContext.getBean(WorkerInstanceRepositoryInterface.class);
    }

    public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
        this.queueStop = this.workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
            WorkerInstance workerInstance = this.jdbcHeartbeat.get();
            eithers.forEach(either -> {
                WorkerTaskRunning workerJobRunning;
                if (either.isRight()) {
                    log.error("Unable to deserialize a worker job: {}", (Object)((DeserializationException)((Object)((Object)((Object)either.getRight())))).getMessage());
                    return;
                }
                WorkerJob workerJob = (WorkerJob)either.getLeft();
                if (workerJob instanceof WorkerTask) {
                    WorkerTask workerTask = (WorkerTask)workerJob;
                    workerJobRunning = WorkerTaskRunning.of((WorkerTask)workerTask, (WorkerInstance)workerInstance, (int)0);
                } else if (workerJob instanceof WorkerTrigger) {
                    WorkerTrigger workerTrigger = (WorkerTrigger)workerJob;
                    workerJobRunning = WorkerTriggerRunning.of((WorkerTrigger)workerTrigger, (WorkerInstance)workerInstance, (int)0);
                } else {
                    throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs");
                }
                this.jdbcWorkerJobRunningRepository.save((WorkerJobRunning)workerJobRunning, (DSLContext)dslContext);
                if (log.isTraceEnabled()) {
                    log.trace("Sending a workerJobRunning: {}", (Object)workerJobRunning);
                }
            });
            eithers.forEach(consumer);
        });
        return this.queueStop;
    }

    public void pause() {
        this.stopQueue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopQueue() {
        JdbcWorkerJobQueueService jdbcWorkerJobQueueService = this;
        synchronized (jdbcWorkerJobQueueService) {
            if (this.queueStop != null) {
                this.queueStop.run();
                this.queueStop = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanup() {
        if (this.jdbcHeartbeat.get() != null) {
            JdbcWorkerJobQueueService jdbcWorkerJobQueueService = this;
            synchronized (jdbcWorkerJobQueueService) {
                this.workerInstanceRepository.delete(this.jdbcHeartbeat.get());
            }
        }
    }

    public void close() {
        this.stopQueue();
    }
}

