/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import java.util.HashMap;
import java.util.function.Function;
import lombok.Generated;
import org.apache.commons.codec.digest.DigestUtils;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.defaults.AbstractExecutionContext;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class DefaultTask
implements Task {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTask.class);
    private final String workerId;
    private final String schedulerId;
    private final AbstractExecutionContext context;
    private final TaskExecutor executor;
    private long lastStateTime;
    private long startTime;
    private final String id;

    public DefaultTask(String schedulerId, String workerId, AbstractExecutionContext context, TaskExecutor executor) {
        this.schedulerId = schedulerId;
        this.workerId = workerId;
        this.context = context;
        this.executor = executor;
        this.id = DigestUtils.md5Hex((String)(workerId + ":" + context.getInstanceId() + ":" + context.getJob().getNodeId()));
        executor.onStateChanged((from, to) -> {
            this.lastStateTime = System.currentTimeMillis();
            HashMap<String, Object> data = new HashMap<String, Object>();
            data.put("from", from.name());
            data.put("to", to.name());
            data.put("taskId", this.getId());
            data.put("instanceId", context.getInstanceId());
            data.put("nodeId", context.getJob().getNodeId());
            data.put("timestamp", System.currentTimeMillis());
            Flux.merge((Publisher[])new Publisher[]{context.getEventBus().publish((CharSequence)RuleConstants.Topics.state0(context.getInstanceId(), context.getJob().getNodeId()), data), context.getEventBus().publish(RuleConstants.Topics.event0(context.getInstanceId(), context.getJob().getNodeId(), to.name()), (Object)context.newRuleData(data))}).doOnError(err -> log.error(err.getMessage(), err)).subscribe();
        });
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public String getName() {
        return this.executor.getName();
    }

    @Override
    public ScheduleJob getJob() {
        return this.context.getJob();
    }

    @Override
    public Mono<Void> setJob(ScheduleJob job) {
        return Mono.fromRunnable(() -> {
            ScheduleJob old = this.context.getJob();
            this.context.setJob(job);
            try {
                this.executor.validate();
            }
            catch (Throwable e) {
                this.context.setJob(old);
                throw e;
            }
        });
    }

    @Override
    public Mono<Void> reload() {
        log.debug("reload task[{}]:[{}]", (Object)this.getId(), (Object)this.getJob());
        return ((Mono)Mono.fromRunnable(() -> {
            this.context.reload();
            this.executor.reload();
        }).as((Function)MonoTracer.create((String)RuleConstants.Trace.reloadNodeSpanName(this.getJob().getInstanceId(), this.getJob().getNodeId()), builder -> builder.setAttribute(RuleConstants.Trace.executor, (Object)this.getJob().getExecutor())))).subscribeOn(Schedulers.boundedElastic());
    }

    @Override
    public Mono<Void> start() {
        log.debug("start task[{}]:[{}]", (Object)this.getId(), (Object)this.getJob());
        return ((Mono)Mono.fromRunnable(this.executor::start).doOnSuccess(v -> {
            this.startTime = System.currentTimeMillis();
        }).as((Function)MonoTracer.create((String)RuleConstants.Trace.startNodeSpanName(this.getJob().getInstanceId(), this.getJob().getNodeId()), builder -> builder.setAttribute(RuleConstants.Trace.executor, (Object)this.getJob().getExecutor())))).subscribeOn(Schedulers.boundedElastic());
    }

    @Override
    public Mono<Void> pause() {
        log.debug("pause task[{}]:[{}]", (Object)this.getId(), (Object)this.getJob());
        return Mono.fromRunnable(this.executor::pause);
    }

    @Override
    public Mono<Void> shutdown() {
        log.debug("shutdown task[{}]:[{}]", (Object)this.getId(), (Object)this.getJob());
        return ((Mono)Mono.fromRunnable(this.executor::shutdown).then(Mono.fromRunnable(this.context::doShutdown)).as((Function)MonoTracer.create((String)RuleConstants.Trace.shutdownNodeSpanName(this.getJob().getInstanceId(), this.getJob().getNodeId()), builder -> builder.setAttribute(RuleConstants.Trace.executor, (Object)this.getJob().getExecutor())))).subscribeOn(Schedulers.boundedElastic());
    }

    @Override
    public Mono<Void> execute(RuleData data) {
        log.debug("execute task[{}]:[{}]", (Object)this.getId(), (Object)this.getJob());
        if (this.executor instanceof ExecutableTaskExecutor) {
            return TraceHolder.writeContextTo((Object)data, RuleData::setHeader).flatMap(ruleData -> ((ExecutableTaskExecutor)this.executor).execute((RuleData)ruleData));
        }
        return Mono.empty();
    }

    @Override
    public Mono<Task.State> getState() {
        return Mono.just((Object)((Object)this.executor.getState()));
    }

    @Override
    public Mono<Void> debug(boolean debug) {
        log.debug("set task debug[{}] [{}]:[{}]", new Object[]{debug, this.getId(), this.getJob()});
        return Mono.fromRunnable(() -> this.context.setDebug(debug));
    }

    @Override
    public Mono<Long> getLastStateTime() {
        return Mono.just((Object)this.lastStateTime);
    }

    @Override
    public Mono<Long> getStartTime() {
        return Mono.just((Object)this.startTime);
    }

    @Override
    public Mono<FunctionMetadata> getMetadata() {
        return this.executor.createMetadata();
    }

    @Override
    @Generated
    public String getWorkerId() {
        return this.workerId;
    }

    @Override
    @Generated
    public String getSchedulerId() {
        return this.schedulerId;
    }
}

