/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.executor.WorkerJobRunningStateStore;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.runner.JdbcQueue;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class JdbcWorkerTriggerResultQueueService
implements Closeable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcWorkerTriggerResultQueueService.class);
    private static final ObjectMapper MAPPER = JdbcMapper.of();
    @Inject
    private WorkerJobRunningStateStore workerJobRunningStateStore;
    private final AtomicReference<Runnable> disposable = new AtomicReference();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    public Runnable receive(JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue, String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
        this.disposable.set(workerTriggerResultQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> eithers.forEach(either -> {
            if (either.isRight()) {
                log.error("Unable to deserialize a worker job: {}", (Object)((DeserializationException)((Object)((Object)((Object)either.getRight())))).getMessage());
                try {
                    JsonNode json = MAPPER.readTree(((DeserializationException)((Object)((Object)((Object)either.getRight())))).getRecord());
                    TriggerContext triggerContext = (TriggerContext)MAPPER.treeToValue((TreeNode)json.get("triggerContext"), TriggerContext.class);
                    this.workerJobRunningStateStore.deleteByKey(triggerContext.uid());
                }
                catch (JsonProcessingException | DeserializationException e) {
                    log.error("Unexpected exception when trying to handle a deserialization error", e);
                }
            } else {
                WorkerTriggerResult workerTriggerResult = (WorkerTriggerResult)either.getLeft();
                this.workerJobRunningStateStore.deleteByKey(workerTriggerResult.getTriggerContext().uid());
            }
            consumer.accept((Either<WorkerTriggerResult, DeserializationException>)either);
        })));
        return this.disposable.get();
    }

    @Override
    public void close() {
        if (!this.isClosed.compareAndSet(false, true)) {
            return;
        }
        Runnable runnable = this.disposable.get();
        if (runnable != null) {
            runnable.run();
            this.disposable.set(null);
        }
    }
}

