/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.Worker;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Generated;
import org.jooq.DSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class JdbcWorkerJobQueueService
implements Closeable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcWorkerJobQueueService.class);
    private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
    private final AtomicReference<Runnable> disposable = new AtomicReference();
    private final AtomicBoolean isStopped = new AtomicBoolean(false);

    @Inject
    public JdbcWorkerJobQueueService(ApplicationContext applicationContext) {
        this.jdbcWorkerJobRunningRepository = (AbstractJdbcWorkerJobRunningRepository)applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class);
    }

    public Runnable subscribe(JdbcQueue<WorkerJob> workerJobQueue, String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
        this.disposable.set(workerJobQueue.receiveTransaction(workerGroup, Worker.class, (dslContext, eithers) -> {
            WorkerInstance workerInstance = new WorkerInstance(workerId, workerGroup);
            eithers.forEach(either -> {
                WorkerTaskRunning workerJobRunning;
                if (either.isRight()) {
                    log.error("Unable to deserialize a worker job: {}", (Object)((DeserializationException)((Object)((Object)((Object)either.getRight())))).getMessage());
                    return;
                }
                WorkerJob workerJob = (WorkerJob)either.getLeft();
                if (workerJob instanceof WorkerTask) {
                    WorkerTask workerTask = (WorkerTask)workerJob;
                    workerJobRunning = WorkerTaskRunning.of((WorkerTask)workerTask, (WorkerInstance)workerInstance, (int)0);
                } else if (workerJob instanceof WorkerTrigger) {
                    WorkerTrigger workerTrigger = (WorkerTrigger)workerJob;
                    workerJobRunning = WorkerTriggerRunning.of((WorkerTrigger)workerTrigger, (WorkerInstance)workerInstance, (int)0);
                } else {
                    throw new IllegalArgumentException("Message is of type " + String.valueOf(workerJob.getClass()) + " which should never occurs");
                }
                this.jdbcWorkerJobRunningRepository.save((WorkerJobRunning)workerJobRunning, (DSLContext)dslContext);
                if (log.isTraceEnabled()) {
                    log.trace("Sending a workerJobRunning: {}", (Object)workerJobRunning);
                }
            });
            eithers.forEach(consumer);
        }));
        return this.disposable.get();
    }

    @Override
    public void close() {
        if (!this.isStopped.compareAndSet(true, false)) {
            return;
        }
        Runnable runnable = this.disposable.get();
        if (runnable != null) {
            runnable.run();
            this.disposable.set(null);
        }
    }
}

