/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.context.Context;
import java.util.function.BiConsumer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

public abstract class AbstractTaskExecutor
implements ExecutableTaskExecutor {
    private static final Logger log = LoggerFactory.getLogger(AbstractTaskExecutor.class);
    protected static final AttributeKey<String> executor_name = AttributeKey.stringKey((String)"name");
    protected ExecutionContext context;
    protected volatile Task.State state = Task.State.shutdown;
    protected volatile Disposable disposable;
    private MonoTracer<Object> tracer;
    private FluxTracer<Object> fluxTracer;
    private BiConsumer<Task.State, Task.State> stateListener = (from, to) -> log.debug("task [{}] state changed from {} to {}.", new Object[]{this.context.getJob(), from, to});

    public AbstractTaskExecutor(ExecutionContext context) {
        this.context = context;
    }

    protected String createSpanName() {
        return "/rule-runtime/" + this.context.getJob().getExecutor() + "/" + this.context.getInstanceId() + "/" + this.context.getJob().getNodeId();
    }

    protected <T> MonoTracer<T> createMonoTracer() {
        return (MonoTracer)MonoTracer.builder().spanName(this.createSpanName()).onSubscription(builder -> builder.setAttribute(executor_name, (Object)this.getName())).defaultContext(Context::root).build();
    }

    protected <T> FluxTracer<T> createFluxTracer() {
        return (FluxTracer)FluxTracer.builder().spanName(this.createSpanName()).onSubscription(builder -> builder.setAttribute(executor_name, (Object)this.getName())).defaultContext(Context::root).build();
    }

    protected <T> MonoTracer<T> tracer() {
        Object _tracer = this.tracer;
        return _tracer == null ? (_tracer = (this.tracer = this.createMonoTracer())) : _tracer;
    }

    protected <T> FluxTracer<T> traceFlux() {
        Object _tracer = this.fluxTracer;
        return _tracer == null ? (_tracer = (this.fluxTracer = this.createFluxTracer())) : _tracer;
    }

    @Override
    public abstract String getName();

    protected abstract Disposable doStart();

    protected void changeState(Task.State state) {
        if (this.state == state) {
            return;
        }
        this.state = state;
        this.stateListener.accept(this.state, this.state);
    }

    @Override
    public synchronized void start() {
        if (this.disposable != null && !this.disposable.isDisposed()) {
            this.changeState(Task.State.running);
            return;
        }
        this.disposable = this.doStart();
        this.changeState(Task.State.running);
    }

    @Override
    public void reload() {
    }

    @Override
    public void pause() {
        this.changeState(Task.State.paused);
    }

    @Override
    public synchronized void shutdown() {
        this.changeState(Task.State.shutdown);
        if (this.disposable != null) {
            this.disposable.dispose();
        }
    }

    @Override
    public void onStateChanged(BiConsumer<Task.State, Task.State> listener) {
        this.stateListener = this.stateListener.andThen(listener);
    }

    @Override
    public void validate() {
    }

    @Override
    public Mono<Void> execute(RuleData ruleData) {
        return ((Mono)this.context.getOutput().write(ruleData).as(this.tracer())).then();
    }

    public ExecutionContext getContext() {
        return this.context;
    }

    @Override
    public Task.State getState() {
        return this.state;
    }
}

