/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.event.impl;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
import org.kie.kogito.services.event.EventConsumer;
import org.kie.kogito.services.event.ProcessDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudEventConsumer<D, M extends Model, T extends ProcessDataEvent<D>>
implements EventConsumer<M> {
    private static final Logger logger = LoggerFactory.getLogger(CloudEventConsumer.class);
    private Optional<Function<D, M>> function;
    private ProcessService processService;
    private ExecutorService executor;

    public CloudEventConsumer(ProcessService processService, ExecutorService executor, Optional<Function<D, M>> function) {
        this.processService = processService;
        this.executor = executor;
        this.function = function;
    }

    public CompletionStage<?> consume(Application application, Process<M> process, Object object, String trigger) {
        ProcessDataEvent cloudEvent = (ProcessDataEvent)object;
        String simpleName = cloudEvent.getClass().getSimpleName();
        if (this.ignoredMessageType(cloudEvent, simpleName) && this.ignoredMessageType(cloudEvent, trigger)) {
            logger.warn("Consumer for CloudEvent type '{}', trigger '{}': ignoring message with type '{}',  source '{}'", new Object[]{simpleName, trigger, cloudEvent.getType(), cloudEvent.getSource()});
            return CompletableFuture.completedFuture(null);
        }
        if (cloudEvent.getKogitoReferenceId() != null && !cloudEvent.getKogitoReferenceId().isEmpty()) {
            logger.debug("Received message with reference id '{}' going to use it to send signal '{}'", (Object)cloudEvent.getKogitoReferenceId(), (Object)trigger);
            return CompletableFuture.supplyAsync(() -> {
                Optional<ProcessInstance<M>> instance = this.findProcessInstance(process, cloudEvent);
                if (instance.isPresent()) {
                    return this.signalProcessInstance(process, trigger, cloudEvent);
                }
                if (this.function.isPresent()) {
                    logger.info("Process instance with id '{}' not found for triggering signal '{}', starting a new one", (Object)cloudEvent.getKogitoReferenceId(), (Object)trigger);
                    return this.startNewInstance(process, (Model)this.function.get().apply(cloudEvent.getData()), cloudEvent, trigger);
                }
                logger.info("Process instance with id {} not found for triggering signal {}", (Object)cloudEvent.getKogitoReferenceId(), (Object)trigger);
                return null;
            }, this.executor);
        }
        if (this.function.isPresent()) {
            logger.debug("Received message without reference id, starting new process instance with trigger '{}'", (Object)trigger);
            return CompletableFuture.supplyAsync(() -> this.startNewInstance(process, (Model)this.function.get().apply(cloudEvent.getData()), cloudEvent, trigger), this.executor);
        }
        logger.warn("Received not start event without kogito referecence id for trigger {}", (Object)trigger);
        return CompletableFuture.completedFuture(null);
    }

    private Optional<M> signalProcessInstance(Process process, String trigger, T cloudEvent) {
        return this.processService.signalProcessInstance(process, cloudEvent.getKogitoReferenceId(), cloudEvent.getData(), "Message-" + trigger);
    }

    private Optional<ProcessInstance<M>> findProcessInstance(Process<M> process, T cloudEvent) {
        return process.instances().findById(cloudEvent.getKogitoReferenceId());
    }

    private ProcessInstance<M> startNewInstance(Process<M> process, M model, T cloudEvent, String trigger) {
        return this.processService.createProcessInstance(process, cloudEvent.getKogitoBusinessKey(), model, cloudEvent.getKogitoStartFromNode(), trigger, cloudEvent.getKogitoProcessinstanceId());
    }

    private boolean ignoredMessageType(T cloudEvent, String type) {
        return !type.equals(cloudEvent.getType()) && !type.equals(String.valueOf(cloudEvent.getSource()));
    }
}

