/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.addons.quarkus.jobs.service.embedded.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.annotations.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.event.AbstractDataEvent;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.jobs.JobsServiceException;
import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class EventPublisherJobStreams {
    public static final String DATA_INDEX_EVENT_PUBLISHER = "org.kie.kogito.index.addon.DataIndexEventPublisher";
    private static final Logger LOGGER = LoggerFactory.getLogger(EventPublisherJobStreams.class);
    private final String url;
    private final EventPublisher eventPublisher;
    private final ObjectMapper objectMapper;

    @Inject
    public EventPublisherJobStreams(@ConfigProperty(name="kogito.service.url", defaultValue="http://localhost:8080") String url, Instance<EventPublisher> eventPublishers, ObjectMapper objectMapper) {
        this.url = url;
        this.eventPublisher = eventPublishers.stream().filter(publisher -> publisher.getClass().getName().startsWith(DATA_INDEX_EVENT_PUBLISHER)).findFirst().orElse(null);
        this.objectMapper = objectMapper;
    }

    @Incoming(value="job-status-change-events")
    @Acknowledgment(value=Acknowledgment.Strategy.PRE_PROCESSING)
    @Blocking
    public void onJobStatusChange(JobDetails jobDetails) {
        if (this.eventPublisher != null) {
            byte[] jsonContent;
            ScheduledJob scheduledJob = ScheduledJobAdapter.of((JobDetails)jobDetails);
            try {
                jsonContent = this.objectMapper.writeValueAsBytes((Object)scheduledJob);
            }
            catch (Exception e) {
                throw new JobsServiceException("It was not possible to serialize scheduledJob to json: " + scheduledJob, (Throwable)e);
            }
            EventPublisherJobDataEvent event = new EventPublisherJobDataEvent("JobEvent", this.url + "/jobs", jsonContent, scheduledJob.getProcessInstanceId(), scheduledJob.getRootProcessInstanceId(), scheduledJob.getProcessId(), scheduledJob.getRootProcessId(), null);
            try {
                this.eventPublisher.publish((DataEvent)event);
            }
            catch (Exception e) {
                LOGGER.error("Job status change propagation has failed at eventPublisher: " + this.eventPublisher.getClass() + " execution.", (Throwable)e);
            }
        }
    }

    public static class EventPublisherJobDataEvent
    extends AbstractDataEvent<byte[]> {
        public EventPublisherJobDataEvent(String type, String source, byte[] data, String kogitoProcessInstanceId, String kogitoRootProcessInstanceId, String kogitoProcessId, String kogitoRootProcessId, String kogitoIdentity) {
            super(type, source, (Object)data, kogitoProcessInstanceId, kogitoRootProcessInstanceId, kogitoProcessId, kogitoRootProcessId, null, kogitoIdentity);
        }
    }
}

