/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.hswebframework.web.exception.I18nSupportException;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.scheduler.SchedulerSelector;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskSnapshot;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.TaskSnapshotRepository;
import org.jetlinks.rule.engine.defaults.ScheduleJobCompiler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClusterRuleEngine
implements RuleEngine {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ClusterRuleEngine.class);
    private final SchedulerRegistry schedulerRegistry;
    private final TaskSnapshotRepository repository;
    private final SchedulerSelector schedulerSelector;

    public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotRepository repository) {
        this(schedulerRegistry, repository, SchedulerSelector.selectAll);
    }

    public Mono<Void> shutdown(String instanceId) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> scheduler.shutdown(instanceId)).then(this.repository.removeTaskByInstanceId(instanceId)).then();
    }

    private Mono<Void> shutdown(TaskSnapshot snapshot) {
        return this.schedulerRegistry.getSchedulers().filter(scheduler -> Objects.equals(snapshot.getSchedulerId(), scheduler.getId())).flatMap(scheduler -> scheduler.shutdownTask(snapshot.getId())).then(this.repository.removeTaskById(snapshot.getId()));
    }

    public Flux<Task> startRule(String instanceId, RuleModel model) {
        log.debug("starting rule {}\n{}", (Object)instanceId, (Object)model.toString());
        Map jobs = new ScheduleJobCompiler(instanceId, model).compile().stream().collect(Collectors.toMap(ScheduleJob::getNodeId, Function.identity()));
        ArrayList startedTask = new ArrayList(jobs.size());
        ConcurrentHashMap readyToStart = new ConcurrentHashMap(jobs);
        return (Flux)this.repository.findByInstanceId(instanceId).doOnNext(snapshot -> readyToStart.remove(snapshot.getJob().getNodeId())).flatMap(snapshot -> {
            ScheduleJob job = (ScheduleJob)jobs.get(snapshot.getJob().getNodeId());
            ScheduleJob old = snapshot.getJob();
            if (job == null || !Objects.equals(job.getExecutor(), old.getExecutor())) {
                if (job != null && !Objects.equals(job.getExecutor(), old.getExecutor())) {
                    readyToStart.put(job.getNodeId(), job);
                    log.debug("change job [{}] executor:{} -> {}", new Object[]{snapshot.getJob().getNodeId(), snapshot.getJob().getExecutor(), job.getExecutor()});
                } else {
                    log.debug("shutdown removed job:{}", (Object)snapshot.getJob().getNodeId());
                }
                return this.shutdown((TaskSnapshot)snapshot).then(Mono.empty());
            }
            return this.getTaskBySnapshot((TaskSnapshot)snapshot).flatMap(task -> {
                startedTask.add(task);
                return task.setJob(job).then(task.reload()).thenReturn(task);
            }).switchIfEmpty((Publisher)this.repository.removeTaskById(snapshot.getId()).then(Mono.fromRunnable(() -> readyToStart.put(job.getNodeId(), job))));
        }).concatWith((Publisher)Flux.defer(() -> this.doStart(readyToStart.values())).doOnNext(startedTask::add)).collectList().map(Flux::fromIterable).flatMapMany(tasks -> this.repository.saveTaskSnapshots((Publisher<TaskSnapshot>)tasks.flatMap(Task::dump)).thenMany((Publisher)tasks)).onErrorResume(err -> Flux.fromIterable((Iterable)startedTask).flatMap(Task::shutdown).then(Mono.error((Throwable)err))).as((Function)FluxTracer.create((String)RuleConstants.Trace.spanName((String)instanceId, (String)"start"), builder -> {
            builder.setAttribute(RuleConstants.Trace.model, (Object)model.toString());
            builder.setAttribute(RuleConstants.Trace.instanceId, (Object)instanceId);
        }));
    }

    protected Flux<Task> doStart(Collection<ScheduleJob> jobs) {
        return Flux.defer(() -> Flux.fromIterable((Iterable)jobs).flatMap(this::scheduleTask).collectList().flatMapIterable(Function.identity()).flatMap(task -> task.start().thenReturn(task), 16, Integer.MAX_VALUE));
    }

    private Flux<Task> getTaskBySnapshot(TaskSnapshot snapshot) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> scheduler.getTask(snapshot.getId()));
    }

    private Flux<Scheduler> selectScheduler(ScheduleJob job) {
        return (Flux)this.schedulerRegistry.getSchedulers().filterWhen(scheduler -> scheduler.canSchedule(job)).as(supports -> this.schedulerSelector.select(supports, job));
    }

    private Flux<Task> scheduleTask(ScheduleJob job) {
        return this.selectScheduler(job).switchIfEmpty((Publisher)Mono.error(() -> new I18nSupportException.NoStackTrace("error.rule.engine.unsupported_executor", new Object[]{job.getExecutor()}))).flatMap(scheduler -> scheduler.schedule(job));
    }

    public Flux<Task> getTasks(String instance) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> scheduler.getSchedulingTask(instance));
    }

    public Flux<Worker> getWorkers() {
        return this.schedulerRegistry.getSchedulers().flatMap(Scheduler::getWorkers);
    }

    @Generated
    public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotRepository repository, SchedulerSelector schedulerSelector) {
        this.schedulerRegistry = schedulerRegistry;
        this.repository = repository;
        this.schedulerSelector = schedulerSelector;
    }
}

