/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.scheduler.SchedulerSelector;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskSnapshot;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.TaskSnapshotRepository;
import org.jetlinks.rule.engine.defaults.ScheduleJobCompiler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClusterRuleEngine
implements RuleEngine {
    private static final Logger log = LoggerFactory.getLogger(ClusterRuleEngine.class);
    private final SchedulerRegistry schedulerRegistry;
    private final TaskSnapshotRepository repository;
    private final SchedulerSelector schedulerSelector;

    public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotRepository repository) {
        this(schedulerRegistry, repository, SchedulerSelector.selectAll);
    }

    public Mono<Void> shutdown(String instanceId) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> scheduler.shutdown(instanceId)).then(this.repository.removeTaskByInstanceId(instanceId)).then();
    }

    public Flux<Task> startRule(String instanceId, RuleModel model) {
        Map jobs = new ScheduleJobCompiler(instanceId, model).compile().stream().collect(Collectors.toMap(ScheduleJob::getNodeId, Function.identity()));
        ArrayList startedTask = new ArrayList(jobs.size());
        return this.repository.findByInstanceId(instanceId).flatMap(snapshot -> {
            ScheduleJob job = (ScheduleJob)jobs.get(snapshot.getJob().getNodeId());
            if (job == null) {
                return this.getTaskBySnapshot((TaskSnapshot)snapshot).flatMap(Task::shutdown).then(this.repository.removeTaskById(snapshot.getId())).then(Mono.empty());
            }
            return this.getTaskBySnapshot((TaskSnapshot)snapshot).flatMap(task -> task.setJob(job).then(task.reload()).thenReturn(task)).switchIfEmpty((Publisher)Flux.defer(() -> this.doStart(Collections.singleton(job)).flatMap(task -> this.repository.saveTaskSnapshots((Publisher<TaskSnapshot>)task.dump()).thenReturn(task))));
        }).switchIfEmpty(this.doStart(jobs.values())).doOnNext(startedTask::add).onErrorResume(err -> Flux.fromIterable((Iterable)startedTask).flatMap(Task::shutdown).then(Mono.error((Throwable)err)));
    }

    protected Flux<Task> doStart(Collection<ScheduleJob> jobs) {
        return Flux.defer(() -> Flux.fromIterable((Iterable)jobs).flatMap(this::scheduleTask).collectList().flatMapIterable(Function.identity()).flatMap(task -> task.start().thenReturn(task))).collectList().map(Flux::fromIterable).flatMapMany(tasks -> this.repository.saveTaskSnapshots((Publisher<TaskSnapshot>)tasks.flatMap(Task::dump)).thenMany((Publisher)tasks));
    }

    private Flux<Task> getTaskBySnapshot(TaskSnapshot snapshot) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> scheduler.getSchedulingTask(snapshot.getInstanceId())).filter(task -> task.isSameTask(snapshot));
    }

    private Flux<Task> scheduleTask(ScheduleJob job) {
        return ((Flux)this.schedulerRegistry.getSchedulers().filterWhen(scheduler -> scheduler.canSchedule(job)).as(supports -> this.schedulerSelector.select(supports, job))).switchIfEmpty((Publisher)Mono.error(() -> new UnsupportedOperationException("no scheduler for " + job.getExecutor()))).flatMap(scheduler -> scheduler.schedule(job));
    }

    public Flux<Task> getTasks(String instance) {
        return this.schedulerRegistry.getSchedulers().flatMap(scheduler -> scheduler.getSchedulingTask(instance));
    }

    public Flux<Worker> getWorkers() {
        return this.schedulerRegistry.getSchedulers().flatMap(Scheduler::getWorkers);
    }

    public ClusterRuleEngine(SchedulerRegistry schedulerRegistry, TaskSnapshotRepository repository, SchedulerSelector schedulerSelector) {
        this.schedulerRegistry = schedulerRegistry;
        this.repository = repository;
        this.schedulerSelector = schedulerSelector;
    }
}

