/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import lombok.Generated;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class FunctionTaskExecutor
extends AbstractTaskExecutor
implements TaskExecutor {
    private final String name;

    public FunctionTaskExecutor(String name, ExecutionContext context) {
        super(context);
        this.name = name;
    }

    protected abstract Publisher<RuleData> apply(RuleData var1);

    private Mono<Void> doApply(RuleData input) {
        return ((Mono)Flux.from(this.apply(input)).concatMap(data -> this.context.fireEvent("result", (RuleData)data).then(this.context.getOutput().write((RuleData)data)), 0).then(this.context.fireEvent("complete", input)).contextWrite(this.contextWriter()).as(this.tracer())).onErrorResume(error -> {
            this.context.logger().warn(error.getLocalizedMessage(), error);
            return this.context.onError((Throwable)error, input);
        }).then();
    }

    @Override
    public final Mono<Void> execute(RuleData ruleData) {
        return this.doApply(this.context.newRuleData(ruleData));
    }

    @Override
    protected Disposable doStart() {
        return this.context.getInput().accept(data -> {
            if (this.state != Task.State.running) {
                return Mono.empty();
            }
            return this.doApply((RuleData)data).then(Reactors.ALWAYS_TRUE);
        });
    }

    @Override
    @Generated
    public String getName() {
        return this.name;
    }
}

