/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public abstract class FunctionTaskExecutor
extends AbstractTaskExecutor
implements TaskExecutor {
    private final String name;

    public FunctionTaskExecutor(String name, ExecutionContext context) {
        super(context);
        this.name = name;
    }

    protected abstract Publisher<RuleData> apply(RuleData var1);

    private Mono<Void> doApply(RuleData input) {
        return ((Mono)this.context.getOutput().write((Publisher<RuleData>)Flux.from(this.apply(input)).flatMap(output -> this.context.fireEvent("result", (RuleData)output).thenReturn(output))).then(this.context.fireEvent("complete", input)).as(this.tracer())).onErrorResume(error -> this.context.onError((Throwable)error, input)).contextWrite((ContextView)TraceHolder.readToContext((ContextView)Context.empty(), input.getHeaders())).then();
    }

    @Override
    public final Mono<Void> execute(RuleData ruleData) {
        return this.doApply(ruleData);
    }

    @Override
    protected Disposable doStart() {
        return this.context.getInput().accept(data -> {
            if (this.state != Task.State.running) {
                return Mono.empty();
            }
            return this.doApply((RuleData)data).then(Reactors.ALWAYS_TRUE);
        });
    }

    @Override
    public String getName() {
        return this.name;
    }
}

