/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster.worker;

import java.util.List;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
import org.jetlinks.rule.engine.api.task.Output;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class QueueOutput
implements Output {
    private static final Logger log = LoggerFactory.getLogger(QueueOutput.class);
    private final String instanceId;
    private final ClusterManager clusterManager;
    private final List<ScheduleJob.Output> outputs;
    private final ConditionEvaluator evaluator;

    public Mono<Boolean> write(Publisher<RuleData> dataStream) {
        return Flux.from(dataStream).flatMap(data -> Flux.fromIterable(this.outputs).filterWhen(output -> Mono.fromCallable(() -> this.evaluator.evaluate(output.getCondition(), data)).onErrorResume(error -> {
            log.warn(error.getMessage(), error);
            return Mono.just((Object)false);
        })).flatMap(out -> this.clusterManager.getQueue(this.createTopic(out.getOutput())).add((Publisher)Mono.just((Object)data)))).then(Mono.just((Object)true));
    }

    public Mono<Void> write(String nodeId, Publisher<RuleData> data) {
        return this.clusterManager.getQueue(this.createTopic(nodeId)).add(data).then();
    }

    private String createTopic(String node) {
        return RuleConstants.Topics.input((String)this.instanceId, (String)node);
    }

    public QueueOutput(String instanceId, ClusterManager clusterManager, List<ScheduleJob.Output> outputs, ConditionEvaluator evaluator) {
        this.instanceId = instanceId;
        this.clusterManager = clusterManager;
        this.outputs = outputs;
        this.evaluator = evaluator;
    }
}

