/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.messaging;

import io.cloudevents.CloudEvent;
import io.smallrye.mutiny.Uni;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ReactiveMessagingEventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    private final TimerDelegateJobScheduler scheduler;
    private final ReactiveJobRepository jobRepository;
    private final String createJobEventType;
    private final String cancelJobEventType;

    protected ReactiveMessagingEventConsumer() {
        this(null, null, null, null);
    }

    protected ReactiveMessagingEventConsumer(TimerDelegateJobScheduler scheduler, ReactiveJobRepository jobRepository, String createJobEventType, String cancelJobEventType) {
        this.scheduler = scheduler;
        this.jobRepository = jobRepository;
        this.createJobEventType = createJobEventType;
        this.cancelJobEventType = cancelJobEventType;
    }

    public Uni<Void> onKogitoServiceRequest(Message<CloudEvent> message) {
        CloudEvent cloudEvent = (CloudEvent)message.getPayload();
        String eventType = cloudEvent.getType();
        if (Objects.equals(this.createJobEventType, eventType)) {
            return this.handleCreateEvent(message, this.getJobDetails(cloudEvent));
        }
        if (Objects.equals(this.cancelJobEventType, eventType)) {
            return this.handleCancelEvent(message, this.getJobId(cloudEvent));
        }
        LOGGER.error("Unexpected job request type: {}, for the cloud event: {}", (Object)eventType, (Object)cloudEvent);
        return Uni.createFrom().completionStage(message.nack((Throwable)new JobServiceException("Unexpected job request type: " + eventType)));
    }

    public abstract JobDetails getJobDetails(CloudEvent var1);

    public abstract String getJobId(CloudEvent var1);

    protected Uni<Void> handleCreateEvent(Message<?> message, JobDetails job) {
        return Uni.createFrom().completionStage(this.jobRepository.get(job.getId())).flatMap(existingJob -> {
            if (existingJob == null || existingJob.getStatus() == JobStatus.SCHEDULED) {
                return Uni.createFrom().publisher(this.scheduler.schedule(job));
            }
            LOGGER.info("A Job in status: {} already exists for the job id: {}, no processing will be done fot the event: {}.", new Object[]{existingJob.getStatus(), existingJob.getId(), message.getPayload()});
            return Uni.createFrom().item(existingJob);
        }).onItem().transformToUni(createdJob -> {
            if (createdJob == null) {
                return Uni.createFrom().failure((Throwable)new JobServiceException("An internal scheduler error was produced during Job scheduling"));
            }
            return Uni.createFrom().completionStage(message.ack());
        }).onFailure().recoverWithUni(throwable -> {
            String msg = String.format("An error was produced during Job scheduling for the event: %s", message.getPayload());
            LOGGER.error(msg, throwable);
            return Uni.createFrom().completionStage(message.nack((Throwable)new JobServiceException("An error was produced during Job scheduling: " + throwable.getMessage(), throwable)));
        });
    }

    protected Uni<Void> handleCancelEvent(Message<?> message, String id) {
        return Uni.createFrom().completionStage((CompletionStage)this.scheduler.cancel(id)).onItemOrFailure().transformToUni((cancelledJob, throwable) -> {
            if (throwable != null) {
                String msg = String.format("An error was produced during Job cancelling for the event: %s", message.getPayload());
                LOGGER.error(msg, throwable);
                return Uni.createFrom().completionStage(message.nack((Throwable)new JobServiceException("An error was produced during Job cancelling: " + throwable.getMessage(), throwable)));
            }
            if (cancelledJob == null) {
                LOGGER.info("No Job exists for the job id: {} or it was already cancelled", (Object)id);
            }
            return Uni.createFrom().completionStage(message.ack());
        });
    }

    public String getCreateJobEventType() {
        return this.createJobEventType;
    }

    public String getCancelJobEventType() {
        return this.cancelJobEventType;
    }
}

