/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.services.event.impl;

import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.apache.commons.lang3.StringUtils;
import org.kie.kogito.Model;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.CorrelationResolver;
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
import org.kie.kogito.services.event.correlation.CompositeAttributeCorrelationResolver;
import org.kie.kogito.services.event.correlation.SimpleAttributeCorrelationResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessEventDispatcher<M extends Model>
implements EventDispatcher<M> {
    private final CorrelationResolver kogitoReferenceCorrelationResolver = new SimpleAttributeCorrelationResolver("kogitoprocrefid");
    private final CorrelationResolver eventTypeResolver = new SimpleAttributeCorrelationResolver("type");
    private final CorrelationResolver eventSourceResolver = new SimpleAttributeCorrelationResolver("source");
    private final CorrelationResolver businessKeyResolver = new SimpleAttributeCorrelationResolver("kogitobusinesskey");
    private final CorrelationResolver nodeIdResolver = new SimpleAttributeCorrelationResolver("kogitoprocstartfrom");
    private final CorrelationResolver referenceIdResolver = new SimpleAttributeCorrelationResolver("kogitoprocinstanceid");
    private UnaryOperator<Object> dataResolver;
    private final Optional<CompositeAttributeCorrelationResolver> instanceCorrelationResolver;
    private ProcessService processService;
    private Function<Object, M> modelConverter;
    private Process<M> process;
    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessEventDispatcher.class);
    private ExecutorService executor;

    public ProcessEventDispatcher(Process<M> process, Function<Object, M> modelConverter, ProcessService processService, ExecutorService executor) {
        this(process, modelConverter, processService, executor, null, null);
    }

    public ProcessEventDispatcher(Process<M> process, Function<Object, M> modelConverter, ProcessService processService, ExecutorService executor, Set<String> correlations, UnaryOperator<Object> dataResolver) {
        this.process = process;
        this.modelConverter = modelConverter;
        this.processService = processService;
        this.executor = executor;
        this.dataResolver = dataResolver;
        this.instanceCorrelationResolver = Optional.ofNullable(correlations).filter(c -> !c.isEmpty()).map(CompositeAttributeCorrelationResolver::new);
    }

    public CompletableFuture<ProcessInstance<M>> dispatch(String trigger, Object event) {
        if (this.shouldSkipMessage(trigger, event)) {
            LOGGER.info("Ignoring message for trigger {} in process {}. Skipping consumed message {}", new Object[]{trigger, this.process.id(), event});
            return CompletableFuture.completedFuture(null);
        }
        String kogitoReferenceId = this.resolveCorrelationId(event);
        if (StringUtils.isNotEmpty((CharSequence)kogitoReferenceId)) {
            return CompletableFuture.supplyAsync(() -> this.handleMessageWithReference(trigger, event, kogitoReferenceId), this.executor);
        }
        if (this.modelConverter != null) {
            return CompletableFuture.supplyAsync(() -> this.startNewInstance(trigger, event), this.executor);
        }
        LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", new Object[]{trigger, this.process.id(), event});
        return CompletableFuture.completedFuture(null);
    }

    private String resolveCorrelationId(Object event) {
        Optional<CompositeCorrelation> correlation = this.instanceCorrelationResolver.map(r -> r.resolve(event));
        Optional<String> correlationInstance = correlation.flatMap(arg_0 -> ((CorrelationService)this.process.correlations()).find(arg_0));
        return correlationInstance.map(CorrelationInstance::getCorrelatedId).orElseGet(() -> this.kogitoReferenceCorrelationResolver.resolve(event).asString());
    }

    private ProcessInstance<M> handleMessageWithReference(String trigger, Object event, String instanceId) {
        LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'", (Object)instanceId, (Object)trigger);
        return this.process.instances().findById(instanceId).map(instance -> {
            this.signalProcessInstance(trigger, instance.id(), event);
            return instance;
        }).orElseGet(() -> {
            LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", (Object)instanceId, (Object)trigger);
            return this.startNewInstance(trigger, event);
        });
    }

    private Optional<M> signalProcessInstance(String trigger, String id, Object event) {
        return this.processService.signalProcessInstance(this.process, id, this.dataResolver.apply(event), "Message-" + trigger);
    }

    private ProcessInstance<M> startNewInstance(String trigger, Object event) {
        if (this.modelConverter == null) {
            return null;
        }
        String businessKey = this.businessKeyResolver.resolve(event).asString();
        String fromNode = this.nodeIdResolver.resolve(event).asString();
        String referenceId = this.referenceIdResolver.resolve(event).asString();
        Object data = this.dataResolver.apply(event);
        CompositeCorrelation correlation = this.instanceCorrelationResolver.map(r -> r.resolve(event)).orElse(null);
        LOGGER.info("Starting new process instance with signal '{}'", (Object)trigger);
        return this.processService.createProcessInstance(this.process, businessKey, (Model)this.modelConverter.apply(data), fromNode, trigger, referenceId, correlation);
    }

    private boolean shouldSkipMessage(String trigger, Object event) {
        String eventType = this.eventTypeResolver.resolve(event).asString();
        String source = this.eventSourceResolver.resolve(event).asString();
        boolean isEventTypeNotMatched = Objects.nonNull(eventType) && !Objects.equals(trigger, eventType);
        boolean isSourceNotMatched = Objects.nonNull(source) && !Objects.equals(event.getClass().getSimpleName(), source) && !Objects.equals(trigger, source);
        return isEventTypeNotMatched && isSourceNotMatched;
    }
}

