/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.services.event.impl;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.EventUnmarshaller;
import org.kie.kogito.event.SubscriptionInfo;
import org.kie.kogito.event.process.ProcessDataEvent;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessService;
import org.kie.kogito.services.event.impl.ProcessEventDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessageConsumer<M extends Model, D> {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractMessageConsumer.class);
    private String trigger;
    private EventDispatcher<M> eventDispatcher;

    public AbstractMessageConsumer() {
    }

    public AbstractMessageConsumer(Application application, Process<M> process, String trigger, EventReceiver eventReceiver, Class<D> dataClass, boolean useCloudEvents, ProcessService processService, ExecutorService executorService, EventUnmarshaller<Object> eventUnmarshaller) {
        this.init(application, process, trigger, eventReceiver, dataClass, useCloudEvents, processService, executorService, eventUnmarshaller, null);
    }

    public void init(Application application, Process<M> process, String trigger, EventReceiver eventReceiver, Class<D> dataClass, boolean useCloudEvents, ProcessService processService, ExecutorService executorService, EventUnmarshaller<Object> eventUnmarshaller, Set<String> correlations) {
        this.trigger = trigger;
        this.eventDispatcher = new ProcessEventDispatcher<M>(process, this.getModelConverter().orElse(null), processService, executorService, correlations, this.getDataResolver(useCloudEvents));
        SubscriptionInfo.SubscriptionInfoBuilder builder = SubscriptionInfo.builder().converter(eventUnmarshaller).type(trigger);
        if (useCloudEvents) {
            builder.outputClass(ProcessDataEvent.class).parametrizedClasses(new Class[]{dataClass});
        } else {
            builder.outputClass(dataClass);
        }
        eventReceiver.subscribe(this::consume, builder.createSubscriptionInfo());
        logger.info("Consumer for {} started", (Object)trigger);
    }

    protected UnaryOperator<Object> getDataResolver(boolean useCloudEvents) {
        return useCloudEvents ? AbstractMessageConsumer::cloudEventResolver : AbstractMessageConsumer::eventResolver;
    }

    protected static Object cloudEventResolver(Object object) {
        return ((ProcessDataEvent)object).getData();
    }

    protected static Object eventResolver(Object object) {
        return object;
    }

    private CompletionStage<?> consume(Object payload) {
        logger.trace("Received {} for trigger {}", payload, (Object)this.trigger);
        return this.eventDispatcher.dispatch(this.trigger, payload).thenAccept(v -> logger.trace("Consume completed {} for trigger {}", payload, (Object)this.trigger));
    }

    protected Optional<Function<Object, M>> getModelConverter() {
        return Optional.empty();
    }
}

