/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class FunctionTaskExecutor
extends AbstractTaskExecutor
implements TaskExecutor {
    private final String name;

    public FunctionTaskExecutor(String name, ExecutionContext context) {
        super(context);
        this.name = name;
    }

    protected abstract Publisher<RuleData> apply(RuleData var1);

    private Mono<Void> doApply(RuleData input) {
        return this.context.getOutput().write((Publisher<RuleData>)Flux.from(this.apply(input)).flatMap(output -> this.context.fireEvent("result", (RuleData)output).thenReturn(output))).then(this.context.fireEvent("complete", input)).onErrorResume(error -> this.context.onError((Throwable)error, input)).then();
    }

    @Override
    public final Mono<Void> execute(RuleData ruleData) {
        return this.doApply(ruleData);
    }

    @Override
    protected Disposable doStart() {
        return this.context.getInput().accept().filter(data -> this.state == Task.State.running).flatMap(this::doApply, Integer.MAX_VALUE).onErrorResume(error -> this.context.onError((Throwable)error, null)).subscribe();
    }

    @Override
    public String getName() {
        return this.name;
    }
}

