/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.api.worker.WorkerSelector;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class LocalScheduler
implements Scheduler {
    private final String id;
    private WorkerSelector workerSelector = defaultSelector;
    static final WorkerSelector defaultSelector = (workers1, rule) -> workers1.take(1L);
    private final Map<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
    private final Map<String, Map<String, List<Task>>> executors = new ConcurrentHashMap<String, Map<String, List<Task>>>();

    public LocalScheduler(String id) {
        this.id = id;
    }

    @Override
    public Flux<Worker> getWorkers() {
        return Flux.fromIterable(this.workers.values());
    }

    @Override
    public Mono<Worker> getWorker(String workerId) {
        return Mono.justOrEmpty((Object)this.workers.get(workerId));
    }

    @Override
    public Mono<Boolean> canSchedule(ScheduleJob job) {
        return this.findWorker(job.getExecutor(), job).hasElements();
    }

    protected Flux<Worker> findWorker(String executor, ScheduleJob schedulingRule) {
        return this.workerSelector.select((Flux<Worker>)Flux.fromIterable(this.workers.values()).filterWhen(exe -> exe.getSupportExecutors().map(list -> list.contains(executor)).defaultIfEmpty((Object)false)), schedulingRule);
    }

    @Override
    public Flux<Task> schedule(ScheduleJob job) {
        List<Task> tasks = this.getExecutor(job.getInstanceId(), job.getNodeId());
        if (tasks.isEmpty()) {
            return this.createExecutor(job);
        }
        return Flux.fromIterable(tasks).flatMap(task -> task.setJob(job).then(task.reload()).thenReturn(task));
    }

    @Override
    public Mono<Void> shutdown(String instanceId) {
        return this.getSchedulingTask(instanceId).flatMap(Task::shutdown).then(Mono.fromRunnable(() -> this.getExecutor(instanceId).clear()));
    }

    private Flux<Task> createExecutor(ScheduleJob job) {
        return this.findWorker(job.getExecutor(), job).switchIfEmpty((Publisher)Mono.error(() -> new UnsupportedOperationException("unsupported executor:" + job.getExecutor()))).flatMap(worker -> worker.createTask(this.id, job)).doOnNext(task -> this.getExecutor(job.getInstanceId(), job.getNodeId()).add((Task)task));
    }

    @Override
    public Flux<Task> getSchedulingTask(String instanceId) {
        return Flux.fromIterable(this.getExecutor(instanceId).values()).flatMapIterable(Function.identity());
    }

    @Override
    public Flux<Task> getSchedulingTasks() {
        return Flux.fromIterable(this.executors.values()).flatMapIterable(Map::values).flatMapIterable(Function.identity());
    }

    @Override
    public Mono<Long> totalTask() {
        return this.getSchedulingTasks().count();
    }

    private List<Task> getExecutor(String instanceId, String nodeId) {
        return this.getExecutor(instanceId).computeIfAbsent(nodeId, ignore -> new CopyOnWriteArrayList());
    }

    private Map<String, List<Task>> getExecutor(String instanceId) {
        return this.executors.computeIfAbsent(instanceId, ignore -> new ConcurrentHashMap());
    }

    public void addWorker(Worker worker) {
        this.workers.put(worker.getId(), worker);
    }

    @Override
    public String getId() {
        return this.id;
    }

    public void setWorkerSelector(WorkerSelector workerSelector) {
        this.workerSelector = workerSelector;
    }
}

