/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.defaults;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jetlinks.core.Lazy;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.utils.ExceptionUtils;
import org.jetlinks.rule.engine.api.CompositeLogger;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleEngineHooks;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scope.GlobalScope;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Input;
import org.jetlinks.rule.engine.api.task.Output;
import org.jetlinks.rule.engine.defaults.EventLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public abstract class AbstractExecutionContext
implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(AbstractExecutionContext.class);
    public static final String RECORD_DATA_TO_HEADER = "record_data_to_header";
    public static final String RECORD_DATA_TO_HEADER_KEY = "record_data_to_header_key";
    public static final String RECORD_DATA_TO_HEADER_KEY_PREFIX = "rd:";
    private final org.jetlinks.rule.engine.api.Logger logger;
    private ScheduleJob job;
    private final EventBus eventBus;
    private Input input;
    private Output output;
    private Map<String, Output> eventOutputs;
    private final Function<ScheduleJob, Input> inputFactory;
    private final Function<ScheduleJob, Output> outputFactory;
    private final Function<ScheduleJob, Map<String, Output>> eventOutputsFactory;
    private volatile List<Runnable> shutdownListener;
    private final GlobalScope globalScope;
    private boolean recordDataToHeader;
    private String recordDataToHeaderKey;
    private boolean debug;

    public AbstractExecutionContext(String workerId, ScheduleJob job, EventBus eventBus, org.jetlinks.rule.engine.api.Logger logger, Function<ScheduleJob, Input> inputFactory, Function<ScheduleJob, Output> outputFactory, Function<ScheduleJob, Map<String, Output>> eventOutputsFactory, GlobalScope globalScope) {
        this.job = job;
        this.eventBus = eventBus;
        this.inputFactory = inputFactory;
        this.outputFactory = outputFactory;
        this.eventOutputsFactory = eventOutputsFactory;
        this.logger = CompositeLogger.of(logger, new EventLogger(eventBus, job.getInstanceId(), job.getNodeId(), workerId));
        this.globalScope = globalScope;
        this.init();
    }

    @Override
    public String getInstanceId() {
        return this.job.getInstanceId();
    }

    @Override
    public <T> Mono<T> fireEvent(@Nonnull String event, @Nonnull Supplier<RuleData> supplier) {
        Lazy builder = Lazy.of(() -> {
            RuleData data = (RuleData)supplier.get();
            data.setHeader("ruleConf", this.getJob().getRuleConfiguration());
            data.setHeader("jobExecutor", this.getJob().getExecutor());
            data.setHeader("modelType", this.getJob().getModelType());
            return data;
        });
        Mono then = this.eventBus.publish(RuleConstants.Topics.event0(this.job.getInstanceId(), this.job.getNodeId(), event), (Supplier)builder).then(Mono.empty());
        Output output = this.eventOutputs.get(event);
        if (output != null) {
            return output.write((RuleData)builder.get()).then(then);
        }
        return then;
    }

    @Override
    public <T> Mono<T> fireEvent(@Nonnull String event, @Nonnull RuleData data) {
        data.setHeader("ruleConf", this.getJob().getRuleConfiguration());
        data.setHeader("jobExecutor", this.getJob().getExecutor());
        data.setHeader("modelType", this.getJob().getModelType());
        log.trace("fire job task [{}] event [{}] ", (Object)this.job, (Object)event);
        Mono then = this.eventBus.publish(RuleConstants.Topics.event0(this.job.getInstanceId(), this.job.getNodeId(), event), (Object)data).then(Mono.empty());
        Output output = this.eventOutputs.get(event);
        if (output != null) {
            return output.write(data).then(then);
        }
        return then;
    }

    @Override
    public <T> Mono<T> onError(@Nullable Throwable e, @Nullable RuleData sourceData) {
        return this.fireEvent("error", () -> this.createErrorData(e, sourceData));
    }

    private RuleData createErrorData(Throwable e, RuleData source) {
        HashMap<String, Object> error = new HashMap<String, Object>();
        if (e != null) {
            error.put("type", e.getClass().getSimpleName());
            error.put("message", e.getMessage());
            error.put("stack", ExceptionUtils.getStackTrace((Throwable)e));
        }
        HashMap<String, String> sourceInfo = new HashMap<String, String>();
        sourceInfo.put("id", this.getJob().getNodeId());
        sourceInfo.put("type", this.getJob().getExecutor());
        sourceInfo.put("name", this.getJob().getName());
        error.put("source", sourceInfo);
        HashMap<String, HashMap<String, Object>> value = new HashMap<String, HashMap<String, Object>>();
        if (source != null) {
            source.acceptMap(value::putAll);
        }
        value.put(value.containsKey("error") ? "_error" : "error", error);
        return this.newRuleData(source == null ? value : source.newData(value));
    }

    @Override
    public RuleData newRuleData(Object data) {
        RuleData ruleData = RuleData.create(data);
        if (this.recordDataToHeader) {
            ruleData.setHeader(this.recordDataToHeaderKey, ruleData.getData());
        }
        ruleData.setHeader("sourceNode", this.getJob().getNodeId());
        return ruleData;
    }

    @Override
    public RuleData newRuleData(RuleData source, Object data) {
        RuleData ruleData = source.newData(data);
        if (this.recordDataToHeader) {
            ruleData.setHeader(this.recordDataToHeaderKey, ruleData.getData());
        }
        ruleData.setHeader("sourceNode", this.getJob().getNodeId());
        return ruleData;
    }

    @Override
    public Mono<Void> shutdown(String code, String message) {
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("code", code);
        data.put("message", message);
        return this.eventBus.publish(RuleConstants.Topics.shutdown(this.job.getInstanceId(), this.job.getNodeId()), data).then();
    }

    public void doShutdown() {
        List<Runnable> shutdownListener = this.shutdownListener;
        if (shutdownListener == null) {
            return;
        }
        for (Runnable runnable : shutdownListener) {
            try {
                runnable.run();
            }
            catch (Exception e) {
                this.logger.warn(e.getMessage(), e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onShutdown(Runnable runnable) {
        if (this.shutdownListener == null) {
            AbstractExecutionContext abstractExecutionContext = this;
            synchronized (abstractExecutionContext) {
                if (this.shutdownListener == null) {
                    this.shutdownListener = new CopyOnWriteArrayList<Runnable>();
                }
            }
        }
        this.shutdownListener.add(runnable);
    }

    @Override
    public GlobalScope global() {
        return this.globalScope;
    }

    private void init() {
        this.input = RuleEngineHooks.wrapInput(this.inputFactory.apply(this.job));
        this.output = RuleEngineHooks.wrapOutput(this.outputFactory.apply(this.job));
        this.eventOutputs = this.eventOutputsFactory.apply(this.job).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> RuleEngineHooks.wrapOutput((Output)e.getValue())));
        if (this.eventOutputs.isEmpty()) {
            this.eventOutputs = Collections.emptyMap();
        }
        this.recordDataToHeader = this.job.getConfiguration(RECORD_DATA_TO_HEADER).map(v -> "true".equals(String.valueOf(v))).orElse(Boolean.getBoolean("rule.engine.record_data_to_header"));
        this.recordDataToHeaderKey = this.recordDataToHeader ? RECORD_DATA_TO_HEADER_KEY_PREFIX + this.job.getConfiguration(RECORD_DATA_TO_HEADER_KEY).map(String::valueOf).orElse(this.job.getNodeId()) : null;
    }

    public void reload() {
        this.init();
    }

    @Override
    public org.jetlinks.rule.engine.api.Logger getLogger() {
        return this.logger;
    }

    public void setJob(ScheduleJob job) {
        this.job = job;
    }

    @Override
    public ScheduleJob getJob() {
        return this.job;
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    @Override
    public Input getInput() {
        return this.input;
    }

    @Override
    public Output getOutput() {
        return this.output;
    }

    public void setDebug(boolean debug) {
        this.debug = debug;
    }

    @Override
    public boolean isDebug() {
        return this.debug;
    }
}

