/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class JdbcWorkerTriggerResultQueueService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcWorkerTriggerResultQueueService.class);
    private final JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue;
    @Inject
    private AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
    private Runnable queueStop;

    public JdbcWorkerTriggerResultQueueService(ApplicationContext applicationContext) {
        this.workerTriggerResultQueue = (JdbcQueue)applicationContext.getBean(QueueInterface.class, Qualifiers.byName((String)"workerTriggerResultQueue"));
    }

    public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
        this.queueStop = this.workerTriggerResultQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
            eithers.forEach(either -> {
                if (either.isRight()) {
                    log.error("Unable to deserialize a worker job: {}", (Object)((DeserializationException)either.getRight()).getMessage());
                    return;
                }
                WorkerTriggerResult workerTriggerResult = (WorkerTriggerResult)either.getLeft();
                this.jdbcWorkerJobRunningRepository.deleteByKey(workerTriggerResult.getTriggerContext().uid());
            });
            eithers.forEach(consumer);
        });
        return this.queueStop;
    }

    public void pause() {
        this.stopQueue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopQueue() {
        JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService = this;
        synchronized (jdbcWorkerTriggerResultQueueService) {
            if (this.queueStop != null) {
                this.queueStop.run();
                this.queueStop = null;
            }
        }
    }

    public void cleanup() {
    }

    public void close() {
        this.stopQueue();
    }
}

