/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc.runner;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.Concurrency;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.flows.sla.SLAMonitor;
import io.kestra.core.models.flows.sla.SLAMonitorStorage;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.ExecutionDelay;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.MultipleConditionEvent;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.SchedulerTriggerStateInterface;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionEnd;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.server.ClusterEvent;
import io.kestra.core.server.Metric;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.LogService;
import io.kestra.core.services.MaintenanceService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.kestra.core.utils.Rethrow;
import io.kestra.core.utils.TruthUtils;
import io.kestra.executor.ExecutorService;
import io.kestra.executor.FlowTriggerService;
import io.kestra.executor.SLAService;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionDelayStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcQueue;
import io.kestra.jdbc.runner.JdbcRunnerEnabled;
import io.kestra.jdbc.runner.JdbcServiceLivenessCoordinator;
import io.kestra.plugin.core.flow.ForEachItem;
import io.kestra.plugin.core.flow.Template;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.Configuration;
import org.jooq.DSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@Singleton
@JdbcRunnerEnabled
public class JdbcExecutor
implements ExecutorInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcExecutor.class);
    private static final ObjectMapper MAPPER = JdbcMapper.of();
    private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
    private ScheduledFuture<?> executionDelayFuture;
    private ScheduledFuture<?> monitorSLAFuture;
    @Inject
    private AbstractJdbcExecutionRepository executionRepository;
    @Inject
    @Named(value="executionQueue")
    private QueueInterface<Execution> executionQueue;
    @Inject
    @Named(value="workerJobQueue")
    private QueueInterface<WorkerJob> workerJobQueue;
    @Inject
    @Named(value="workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
    @Inject
    @Named(value="workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;
    @Inject
    @Named(value="flowQueue")
    private QueueInterface<FlowInterface> flowQueue;
    @Inject
    @Named(value="executionKilledQueue")
    protected QueueInterface<ExecutionKilled> killQueue;
    @Inject
    @Named(value="subflowExecutionResultQueue")
    private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
    @Inject
    @Named(value="subflowExecutionEndQueue")
    private QueueInterface<SubflowExecutionEnd> subflowExecutionEndQueue;
    @Inject
    @Named(value="clusterEventQueue")
    private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
    @Inject
    @Named(value="multipleConditionEventQueue")
    private QueueInterface<MultipleConditionEvent> multipleConditionEventQueue;
    @Inject
    private RunContextFactory runContextFactory;
    @Inject
    private PluginDefaultService pluginDefaultService;
    @Inject
    private Optional<Template.TemplateExecutorInterface> templateExecutorInterface;
    @Inject
    private ExecutorService executorService;
    @Inject
    private MultipleConditionStorageInterface multipleConditionStorage;
    @Inject
    private FlowTriggerService flowTriggerService;
    @Inject
    private MetricRegistry metricRegistry;
    @Inject
    protected FlowListenersInterface flowListeners;
    @Inject
    private ExecutionService executionService;
    @Inject
    private AbstractJdbcExecutionDelayStorage executionDelayStorage;
    @Inject
    private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;
    @Inject
    private AbstractJdbcConcurrencyLimitStorage concurrencyLimitStorage;
    @Inject
    private AbstractJdbcExecutorStateStorage executorStateStorage;
    @Inject
    private FlowTopologyService flowTopologyService;
    protected List<FlowWithSource> allFlows;
    @Inject
    private WorkerGroupService workerGroupService;
    @Inject
    private SkipExecutionService skipExecutionService;
    @Inject
    private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository;
    @Inject
    private LogService logService;
    @Inject
    private SLAMonitorStorage slaMonitorStorage;
    @Inject
    private SLAService slaService;
    @Inject
    private TriggerRepositoryInterface triggerRepository;
    @Inject
    private SchedulerTriggerStateInterface triggerState;
    @Inject
    private VariablesService variablesService;
    @Value(value="${kestra.jdbc.executor.clean.execution-queue:true}")
    private boolean cleanExecutionQueue;
    @Value(value="${kestra.jdbc.executor.clean.worker-queue:true}")
    private boolean cleanWorkerJobQueue;
    private final Tracer tracer;
    private final FlowMetaStoreInterface flowMetaStore;
    private final JdbcServiceLivenessCoordinator serviceLivenessCoordinator;
    private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
    private final AbstractJdbcFlowTopologyRepository flowTopologyRepository;
    private final MaintenanceService maintenanceService;
    private final String id = IdUtils.create();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final AtomicBoolean isPaused = new AtomicBoolean(false);
    private final AtomicReference<Service.ServiceState> state = new AtomicReference();
    private final List<Runnable> receiveCancellations = new ArrayList<Runnable>();
    private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
    private final java.util.concurrent.ExecutorService executionExecutorService;
    private final int numberOfThreads;

    @Inject
    public JdbcExecutor(@Nullable JdbcServiceLivenessCoordinator serviceLivenessCoordinator, FlowMetaStoreInterface flowMetaStore, AbstractJdbcFlowTopologyRepository flowTopologyRepository, ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher, TracerFactory tracerFactory, ExecutorsUtils executorsUtils, MaintenanceService maintenanceService, @Value(value="${kestra.jdbc.executor.thread-count:0}") int threadCount) {
        this.serviceLivenessCoordinator = serviceLivenessCoordinator;
        this.flowMetaStore = flowMetaStore;
        this.flowTopologyRepository = flowTopologyRepository;
        this.eventPublisher = eventPublisher;
        this.tracer = tracerFactory.getTracer(JdbcExecutor.class, "EXECUTOR");
        this.maintenanceService = maintenanceService;
        this.numberOfThreads = threadCount != 0 ? threadCount : Math.max(4, Runtime.getRuntime().availableProcessors());
        this.workerTaskResultExecutorService = executorsUtils.maxCachedThreadPool(this.numberOfThreads, "jdbc-worker-task-result-executor");
        this.executionExecutorService = executorsUtils.maxCachedThreadPool(this.numberOfThreads, "jdbc-execution-executor");
    }

    @PostConstruct
    void initMetrics() {
        this.metricRegistry.gauge("executor.thread.count", "The number of executor threads", (Number)this.numberOfThreads, new String[0]);
    }

    public Set<Metric> getMetrics() {
        if (this.metricRegistry == null) {
            return Collections.emptySet();
        }
        Stream<String> metrics = Stream.of("executor.thread.count");
        return metrics.flatMap(metric -> Optional.ofNullable(this.metricRegistry.findGauge(metric)).stream()).map(Metric::of).collect(Collectors.toSet());
    }

    public void run() {
        this.setState(Service.ServiceState.CREATED);
        if (this.serviceLivenessCoordinator != null) {
            this.serviceLivenessCoordinator.setExecutor(this);
        }
        this.flowListeners.run();
        this.flowListeners.listen(flows -> {
            this.allFlows = flows;
        });
        Await.until(() -> this.allFlows != null, (Duration)Duration.ofMillis(100L), (Duration)Duration.ofMinutes(5L));
        this.receiveCancellations.addFirst(((JdbcQueue)this.executionQueue).receiveBatch(Executor.class, executions -> {
            List<CompletableFuture> futures = executions.stream().map(execution -> CompletableFuture.runAsync(() -> this.executionQueue((Either<Execution, DeserializationException>)execution), this.executionExecutorService)).toList();
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        }));
        this.receiveCancellations.addFirst(((JdbcQueue)this.workerTaskResultQueue).receiveBatch(Executor.class, workerTaskResults -> {
            List<CompletableFuture> futures = workerTaskResults.stream().map(workerTaskResult -> CompletableFuture.runAsync(() -> this.workerTaskResultQueue((Either<WorkerTaskResult, DeserializationException>)workerTaskResult), this.workerTaskResultExecutorService)).toList();
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        }));
        this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
        this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
        this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
        this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
        this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
        this.executionDelayFuture = this.scheduledDelay.scheduleAtFixedRate(this::executionDelaySend, 0L, 1L, TimeUnit.SECONDS);
        this.monitorSLAFuture = this.scheduledDelay.scheduleAtFixedRate(this::executionSLAMonitor, 0L, 1L, TimeUnit.SECONDS);
        Thread.ofVirtual().name("jdbc-delay-exception-watcher").start(() -> {
            block3: {
                Await.until(this.executionDelayFuture::isDone);
                try {
                    this.executionDelayFuture.get();
                }
                catch (CancellationException cancellationException) {
                }
                catch (InterruptedException | ExecutionException e) {
                    if (e.getCause() == null || e.getCause().getClass() == CannotCreateTransactionException.class) break block3;
                    log.error("Executor fatal exception in the scheduledDelay thread", (Throwable)e);
                    this.close();
                    KestraContext.getContext().shutdown();
                }
            }
        });
        Thread.ofVirtual().name("jdbc-sla-monitor-exception-watcher").start(() -> {
            block3: {
                Await.until(this.monitorSLAFuture::isDone);
                try {
                    this.monitorSLAFuture.get();
                }
                catch (CancellationException cancellationException) {
                }
                catch (InterruptedException | ExecutionException e) {
                    if (e.getCause() == null || e.getCause().getClass() == CannotCreateTransactionException.class) break block3;
                    log.error("Executor fatal exception in the scheduledSLAMonitor thread", (Throwable)e);
                    this.close();
                    KestraContext.getContext().shutdown();
                }
            }
        });
        this.receiveCancellations.addFirst(this.flowQueue.receive(FlowTopology.class, either -> {
            FlowInterface flow;
            if (either.isRight()) {
                log.error("Unable to deserialize a flow: {}", (Object)((DeserializationException)((Object)((Object)either.getRight()))).getMessage());
                try {
                    JsonNode jsonNode = MAPPER.readTree(((DeserializationException)((Object)((Object)either.getRight()))).getRecord());
                    flow = (FlowInterface)FlowWithException.from((JsonNode)jsonNode, (Exception)((Exception)either.getRight())).orElseThrow(IOException::new);
                }
                catch (IOException e) {
                    log.error("Unexpected exception when trying to handle a deserialization error", (Throwable)e);
                    return;
                }
            } else {
                flow = (FlowInterface)either.getLeft();
            }
            try {
                this.flowTopologyRepository.save(flow, (flow.isDeleted() ? Stream.empty() : this.flowTopologyService.topology(this.pluginDefaultService.injectVersionDefaults(flow, true), this.allFlows.stream().filter(f -> Objects.equals(f.getTenantId(), flow.getTenantId())).toList())).distinct().toList());
            }
            catch (Exception e) {
                log.error("Unable to save flow topology for flow " + flow.uid(), (Throwable)e);
            }
        }));
        if (this.maintenanceService.isInMaintenanceMode()) {
            this.enterMaintenance();
        } else {
            this.setState(Service.ServiceState.RUNNING);
        }
        log.info("Executor started with {} thread(s)", (Object)this.numberOfThreads);
    }

    private void multipleConditionEventQueue(Either<MultipleConditionEvent, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a multiple condition event: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        MultipleConditionEvent multipleConditionEvent = (MultipleConditionEvent)either.getLeft();
        this.flowTriggerService.computeExecutionsFromFlowTriggerPreconditions(multipleConditionEvent.execution(), multipleConditionEvent.flow(), this.multipleConditionStorage).forEach(exec -> {
            try {
                this.executionQueue.emit(exec);
            }
            catch (QueueException e) {
                log.error("Unable to emit the execution {}", (Object)exec.getId(), (Object)e);
            }
        });
    }

    private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a cluster event: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        ClusterEvent clusterEvent = (ClusterEvent)either.getLeft();
        log.info("Cluster event received: {}", (Object)clusterEvent);
        switch (clusterEvent.eventType()) {
            case MAINTENANCE_ENTER: {
                this.enterMaintenance();
                break;
            }
            case MAINTENANCE_EXIT: {
                this.exitMaintenance();
            }
        }
    }

    private void enterMaintenance() {
        this.executionQueue.pause();
        this.workerTaskResultQueue.pause();
        this.killQueue.pause();
        this.subflowExecutionResultQueue.pause();
        this.flowQueue.pause();
        this.isPaused.set(true);
        this.setState(Service.ServiceState.MAINTENANCE);
    }

    private void exitMaintenance() {
        this.executionQueue.resume();
        this.workerTaskResultQueue.resume();
        this.killQueue.resume();
        this.subflowExecutionResultQueue.resume();
        this.flowQueue.resume();
        this.isPaused.set(false);
        this.setState(Service.ServiceState.RUNNING);
    }

    void reEmitWorkerJobsForWorkers(Configuration configuration, List<String> ids) {
        this.metricRegistry.counter("executor.worker.job.resubmit.count", "The total number of worker jobs resubmitted to the Worker by the Executor", new String[0]).increment((double)ids.size());
        this.workerJobRunningRepository.getWorkerJobWithWorkerDead(configuration.dsl(), ids).forEach(workerJobRunning -> {
            if (workerJobRunning instanceof WorkerTaskRunning) {
                WorkerTaskRunning workerTaskRunning = (WorkerTaskRunning)workerJobRunning;
                if (this.skipExecutionService.skipExecution(workerTaskRunning.getTaskRun())) {
                    log.warn("Skipping execution {}", (Object)workerTaskRunning.getTaskRun().getExecutionId());
                    this.workerJobRunningRepository.deleteByKey(workerTaskRunning.uid());
                } else {
                    try {
                        this.workerJobQueue.emit(workerTaskRunning.getWorkerInstance().workerGroup(), (Object)WorkerTask.builder().taskRun(workerTaskRunning.getTaskRun().onRunningResend()).task(workerTaskRunning.getTask()).runContext(workerTaskRunning.getRunContext()).build());
                        this.logService.logTaskRun(workerTaskRunning.getTaskRun(), Level.WARN, "Re-resubmitting WorkerTask.", new Object[0]);
                    }
                    catch (QueueException e) {
                        this.logService.logTaskRun(workerTaskRunning.getTaskRun(), Level.ERROR, "Unable to re-resubmit WorkerTask.", new Object[]{e});
                    }
                }
            }
            if (workerJobRunning instanceof WorkerTriggerRunning) {
                WorkerTriggerRunning workerTriggerRunning = (WorkerTriggerRunning)workerJobRunning;
                try {
                    this.workerJobQueue.emit(workerTriggerRunning.getWorkerInstance().workerGroup(), (Object)WorkerTrigger.builder().trigger(workerTriggerRunning.getTrigger()).conditionContext(workerTriggerRunning.getConditionContext()).triggerContext(workerTriggerRunning.getTriggerContext()).build());
                    this.logService.logTrigger((TriggerContext)workerTriggerRunning.getTriggerContext(), Level.WARN, "Re-emitting WorkerTrigger.", new Object[0]);
                }
                catch (QueueException e) {
                    this.logService.logTrigger((TriggerContext)workerTriggerRunning.getTriggerContext(), Level.ERROR, "Unable to re-emit WorkerTrigger.", new Object[]{e});
                }
            }
        });
    }

    private void executionQueue(Either<Execution, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize an execution: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        Execution message = (Execution)either.getLeft();
        if (this.skipExecutionService.skipExecution(message)) {
            log.warn("Skipping execution {}", (Object)message.getId());
            return;
        }
        Executor result = this.executionRepository.lock(message.getId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            ExecutorState executorState = (ExecutorState)pair.getRight();
            return (Pair)this.tracer.inCurrentContext(execution, FlowId.uidWithoutRevision((Execution)execution), () -> {
                try {
                    ExecutionRunning executionRunning;
                    ExecutionRunning processed;
                    FlowWithSource flow = this.findFlow(execution);
                    Executor executor = new Executor(execution, null).withFlow(flow);
                    if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
                        ExecutionDelay executionDelay2 = ExecutionDelay.builder().executionId(executor.getExecution().getId()).date(execution.getScheduleDate()).state(State.Type.RUNNING).delayType(ExecutionDelay.DelayType.RESUME_FLOW).build();
                        this.executionDelayStorage.save(executionDelay2);
                        return Pair.of((Object)executor, (Object)executorState);
                    }
                    if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty((List)flow.getSla())) {
                        List<SLAMonitor> monitors = flow.getSla().stream().filter(ExecutionMonitoringSLA.class::isInstance).map(ExecutionMonitoringSLA.class::cast).map(sla -> SLAMonitor.builder().executionId(execution.getId()).slaId(((SLA)sla).getId()).deadline(execution.getState().getStartDate().plus(sla.getDuration())).build()).toList();
                        monitors.forEach(monitor -> this.slaMonitorStorage.save(monitor));
                    }
                    if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null && ((processed = this.concurrencyLimitStorage.countThenProcess((FlowInterface)flow, (arg_0, arg_1) -> this.lambda$executionQueue$18(flow, executionRunning = ExecutionRunning.builder().tenantId(executor.getFlow().getTenantId()).namespace(executor.getFlow().getNamespace()).flowId(executor.getFlow().getId()).execution(executor.getExecution()).concurrencyState(ExecutionRunning.ConcurrencyState.CREATED).build(), execution, arg_0, arg_1))).getExecution().getState().isTerminated() || processed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED)) {
                        return Pair.of((Object)executor.withExecution(processed.getExecution(), "handleConcurrencyLimit"), (Object)executorState);
                    }
                    executor = this.executorService.handleExecutionChangedSLA(executor);
                    if (log.isDebugEnabled()) {
                        this.executorService.log(log, Boolean.valueOf(true), executor);
                    }
                    if (!(executor = this.executorService.process(executor)).getNexts().isEmpty() && this.deduplicateNexts(execution, executorState, executor.getNexts())) {
                        executor.withExecution(this.executorService.onNexts(executor.getExecution(), executor.getNexts()), "onNexts");
                    }
                    if (!executor.getWorkerTasks().isEmpty()) {
                        ArrayList workerTaskResults = new ArrayList();
                        executor.getWorkerTasks().stream().filter(workerTask -> this.deduplicateWorkerTask(execution, executorState, workerTask.getTaskRun())).forEach(Rethrow.throwConsumer(workerTask -> {
                            try {
                                if (!TruthUtils.isTruthy((String)workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
                                    workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
                                } else {
                                    if (workerTask.getTask().isSendToWorkerTask()) {
                                        Optional maybeWorkerGroup = this.workerGroupService.resolveGroupFromJob((FlowInterface)flow, (WorkerJob)workerTask);
                                        String workerGroupKey = maybeWorkerGroup.map(Rethrow.throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey()))).orElse(null);
                                        if (workerTask.getTask() instanceof WorkingDirectory) {
                                            this.workerJobQueue.emit(workerGroupKey, workerTask);
                                        } else {
                                            TaskRun taskRun = workerTask.getTaskRun().withState(State.Type.SUBMITTED);
                                            this.workerJobQueue.emit(workerGroupKey, (Object)workerTask.withTaskRun(taskRun));
                                            workerTaskResults.add(new WorkerTaskResult(taskRun));
                                        }
                                    }
                                    if (workerTask.getTask().isFlowable()) {
                                        List attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts()).map(ArrayList::new).orElseGet(ArrayList::new);
                                        attempts.add(TaskRunAttempt.builder().state(new State().withState(State.Type.RUNNING)).build());
                                        TaskRun updatedTaskRun = workerTask.getTaskRun().withAttempts(attempts).withState(State.Type.RUNNING);
                                        workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
                                    }
                                }
                            }
                            catch (Exception e) {
                                workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
                                workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", new Object[]{workerTask.getTask().getId(), e.getMessage(), e});
                            }
                        }));
                        try {
                            this.executorService.addWorkerTaskResults(executor, workerTaskResults);
                        }
                        catch (InternalException e) {
                            log.error("Unable to add a worker task result to the execution", (Throwable)e);
                        }
                    }
                    if (!executor.getSubflowExecutionResults().isEmpty()) {
                        executor.getSubflowExecutionResults().forEach(Rethrow.throwConsumer(subflowExecutionResult -> this.subflowExecutionResultQueue.emit(subflowExecutionResult)));
                    }
                    if (!executor.getExecutionDelays().isEmpty()) {
                        executor.getExecutionDelays().forEach(executionDelay -> this.executionDelayStorage.save((ExecutionDelay)executionDelay));
                    }
                    if (!executor.getSubflowExecutions().isEmpty()) {
                        List<SubflowExecution> subflowExecutionDedup = executor.getSubflowExecutions().stream().filter(subflowExecution -> this.deduplicateSubflowExecution(execution, executorState, subflowExecution.getParentTaskRun())).toList();
                        subflowExecutionDedup.forEach(Rethrow.throwConsumer(subflowExecution -> {
                            Execution subExecution = subflowExecution.getExecution();
                            String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
                            log.info(log);
                            this.logQueue.emit((Object)LogEntry.of((TaskRun)subflowExecution.getParentTaskRun(), (ExecutionKind)subflowExecution.getExecution().getKind()).toBuilder().level(Level.INFO).message(log).timestamp(subflowExecution.getParentTaskRun().getState().getStartDate()).thread(Thread.currentThread().getName()).build());
                            this.executionQueue.emit((Object)subflowExecution.getExecution());
                        }));
                    }
                    return Pair.of((Object)executor, (Object)executorState);
                }
                catch (QueueException e) {
                    try {
                        Execution failedExecution = this.fail(message, (Exception)((Object)e));
                        this.executionQueue.emit((Object)failedExecution);
                    }
                    catch (QueueException ex) {
                        log.error("Unable to emit the execution {}", (Object)message.getId(), (Object)ex);
                    }
                    Span.current().recordException((Throwable)e).setStatus(StatusCode.ERROR);
                    return null;
                }
            });
        });
        if (result != null) {
            this.toExecution(result);
        }
    }

    private Execution fail(Execution message, Exception e) {
        Execution.FailedExecutionWithLog failedExecution = message.failedExecutionFromExecutor(e);
        try {
            this.logQueue.emitAsync(failedExecution.getLogs());
        }
        catch (QueueException queueException) {
            // empty catch block
        }
        return failedExecution.getExecution().getState().isFailed() ? failedExecution.getExecution() : failedExecution.getExecution().withState(State.Type.FAILED);
    }

    private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
        Executor executor;
        if (either.isRight()) {
            log.error("Unable to deserialize a worker task result: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage(), either.getRight());
            return;
        }
        WorkerTaskResult message = (WorkerTaskResult)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getTaskRun())) {
            log.warn("Skipping execution {}", (Object)message.getTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), message);
        }
        if ((executor = this.executionRepository.lock(message.getTaskRun().getExecutionId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            Executor current = new Executor(execution, null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + message.getTaskRun().getExecutionId() + ", receive " + String.valueOf(message));
            }
            if (execution.hasTaskRunJoinable(message.getTaskRun())) {
                try {
                    this.executorService.addWorkerTaskResult(current, () -> this.findFlow(execution), message);
                    return Pair.of((Object)current, (Object)((ExecutorState)pair.getRight()));
                }
                catch (InternalException e) {
                    return Pair.of((Object)this.handleFailedExecutionFromExecutor(current, (Exception)((Object)e)), (Object)((ExecutorState)pair.getRight()));
                }
            }
            return null;
        })) != null) {
            this.toExecution(executor);
        }
    }

    private void subflowExecutionResultQueue(Either<SubflowExecutionResult, DeserializationException> either) {
        Executor executor;
        if (either.isRight()) {
            log.error("Unable to deserialize a subflow execution result: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        SubflowExecutionResult message = (SubflowExecutionResult)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getExecutionId())) {
            log.warn("Skipping execution {}", (Object)message.getExecutionId());
            return;
        }
        if (this.skipExecutionService.skipExecution(message.getParentTaskRun())) {
            log.warn("Skipping execution {}", (Object)message.getParentTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), message);
        }
        if ((executor = this.executionRepository.lock(message.getParentTaskRun().getExecutionId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            Executor current = new Executor(execution, null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + message.getParentTaskRun().getExecutionId() + ", receive " + String.valueOf(message));
            }
            if (execution.hasTaskRunJoinable(message.getParentTaskRun())) {
                try {
                    TaskRun taskRun;
                    FlowWithSource flow = this.findFlow(execution);
                    Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
                    if (task instanceof ForEachItem.ForEachItemExecutable) {
                        ForEachItem.ForEachItemExecutable forEachItem = (ForEachItem.ForEachItemExecutable)task;
                        RunContext runContext = this.runContextFactory.of((FlowInterface)flow, task, current.getExecution(), message.getParentTaskRun());
                        taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
                        if (taskRun.getState().getCurrent() != message.getState()) {
                            taskRun = taskRun.withState(message.getState());
                        }
                        Map outputs = MapUtils.deepMerge((Map)taskRun.getOutputs(), (Map)message.getParentTaskRun().getOutputs());
                        Variables variables = this.variablesService.of(StorageContext.forTask((TaskRun)taskRun), outputs);
                        taskRun = taskRun.withOutputs(variables);
                        taskRun = ExecutableUtils.manageIterations((Storage)runContext.storage(), (TaskRun)taskRun, (Execution)current.getExecution(), (boolean)forEachItem.getTransmitFailed(), (boolean)forEachItem.isAllowFailure(), (boolean)forEachItem.isAllowWarning());
                    } else {
                        taskRun = message.getParentTaskRun();
                    }
                    Execution newExecution = current.getExecution().withTaskRun(taskRun);
                    if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
                        newExecution = this.executionService.killParentTaskruns(taskRun, newExecution);
                    }
                    current = current.withExecution(newExecution, "joinSubflowExecutionResult");
                    if (taskRun.getState().isTerminated()) {
                        this.metricRegistry.counter("executor.taskrun.ended.count", "The total number of tasks ended by the Executor", this.metricRegistry.tags(message, new String[0])).increment();
                        this.metricRegistry.timer("executor.taskrun.ended.duration", "Task duration inside the Executor", this.metricRegistry.tags(message, new String[0])).record(taskRun.getState().getDuration());
                        log.trace("TaskRun terminated: {}", (Object)taskRun);
                    }
                    return Pair.of((Object)current, (Object)((ExecutorState)pair.getRight()));
                }
                catch (InternalException e) {
                    return Pair.of((Object)this.handleFailedExecutionFromExecutor(current, (Exception)((Object)e)), (Object)((ExecutorState)pair.getRight()));
                }
            }
            return null;
        })) != null) {
            this.toExecution(executor);
        }
    }

    private void subflowExecutionEndQueue(Either<SubflowExecutionEnd, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a subflow execution end: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        SubflowExecutionEnd message = (SubflowExecutionEnd)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getParentExecutionId())) {
            log.warn("Skipping execution {}", (Object)message.getParentExecutionId());
            return;
        }
        if (this.skipExecutionService.skipExecution(message.getChildExecution())) {
            log.warn("Skipping execution {}", (Object)message.getChildExecution().getId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), message);
        }
        this.executionRepository.lock(message.getParentExecutionId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + String.valueOf(message));
            }
            FlowWithSource flow = this.findFlow(execution);
            try {
                ExecutableTask executableTask = (ExecutableTask)flow.findTaskByTaskId(message.getTaskId());
                if (!executableTask.waitForExecution()) {
                    return null;
                }
                TaskRun taskRun = execution.findTaskRunByTaskRunId(message.getTaskRunId()).withState(message.getState()).withOutputs(message.getOutputs());
                FlowInterface childFlow = (FlowInterface)this.flowMetaStore.findByExecution(message.getChildExecution()).orElseThrow();
                RunContext runContext = this.runContextFactory.of(childFlow, (Task)executableTask, message.getChildExecution(), taskRun);
                SubflowExecutionResult subflowExecutionResult = ExecutableUtils.subflowExecutionResultFromChildExecution((RunContext)runContext, (FlowInterface)childFlow, (Execution)message.getChildExecution(), (ExecutableTask)executableTask, (TaskRun)taskRun);
                if (subflowExecutionResult != null) {
                    try {
                        this.subflowExecutionResultQueue.emit((Object)subflowExecutionResult);
                    }
                    catch (QueueException ex) {
                        log.error("Unable to emit the subflow execution result", (Throwable)ex);
                    }
                }
            }
            catch (InternalException e) {
                log.error("Unable to process the subflow execution end", (Throwable)e);
            }
            return null;
        });
    }

    private void killQueue(Either<ExecutionKilled, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a killed execution: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        ExecutionKilled event = (ExecutionKilled)either.getLeft();
        if (event.getState() == ExecutionKilled.State.EXECUTED) {
            return;
        }
        if (!(event instanceof ExecutionKilledExecution)) {
            return;
        }
        ExecutionKilledExecution killedExecution = (ExecutionKilledExecution)event;
        if (this.skipExecutionService.skipExecution(killedExecution.getExecutionId())) {
            log.warn("Skipping execution {}", (Object)killedExecution.getExecutionId());
            return;
        }
        this.metricRegistry.counter("executor.killed.count", "The total number of executions killed events received the Executor", this.metricRegistry.tags((ExecutionKilled)killedExecution)).increment();
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), killedExecution);
        }
        try {
            this.killQueue.emit((Object)((ExecutionKilledExecution.ExecutionKilledExecutionBuilder)((ExecutionKilledExecution.ExecutionKilledExecutionBuilder)ExecutionKilledExecution.builder().executionId(killedExecution.getExecutionId()).isOnKillCascade(Boolean.valueOf(false)).state(ExecutionKilled.State.EXECUTED)).tenantId(killedExecution.getTenantId())).build());
        }
        catch (QueueException e) {
            log.error("Unable to kill the execution {}", (Object)killedExecution.getExecutionId(), (Object)e);
        }
        Executor executor = this.killingOrAfterKillState(killedExecution.getExecutionId(), Optional.ofNullable(killedExecution.getExecutionState()));
        Boolean isOnKillCascade = Optional.ofNullable(killedExecution.getIsOnKillCascade()).orElse(true);
        if (isOnKillCascade.booleanValue()) {
            this.executionService.killSubflowExecutions(event.getTenantId(), killedExecution.getExecutionId()).doOnNext(executionKilled -> {
                try {
                    this.killQueue.emit(executionKilled);
                }
                catch (QueueException e) {
                    log.error("Unable to kill the execution {}", (Object)executionKilled.getExecutionId(), (Object)e);
                }
            }).blockLast();
        }
        if (executor != null) {
            this.toExecution(executor, true);
        }
    }

    private Executor killingOrAfterKillState(String executionId, Optional<State.Type> afterKillState) {
        return this.executionRepository.lock(executionId, pair -> {
            Execution currentExecution = (Execution)pair.getLeft();
            FlowInterface flow = (FlowInterface)this.flowMetaStore.findByExecution(currentExecution).orElseThrow();
            if (currentExecution.getState().isQueued()) {
                this.executionQueuedStorage.remove(currentExecution);
            }
            Execution killing = this.executionService.kill(currentExecution, flow, afterKillState);
            Executor current = new Executor(currentExecution, null).withExecution(killing, "joinKillingExecution");
            return Pair.of((Object)current, (Object)((ExecutorState)pair.getRight()));
        });
    }

    private void toExecution(Executor executor) {
        this.toExecution(executor, false);
    }

    private void toExecution(Executor executor, boolean ignoreFailure) {
        block23: {
            try {
                boolean isTerminated;
                boolean shouldSend = false;
                boolean hasFailure = false;
                if (executor.getException() != null) {
                    executor = this.handleFailedExecutionFromExecutor(executor, executor.getException());
                    shouldSend = true;
                    hasFailure = true;
                } else if (executor.isExecutionUpdated()) {
                    shouldSend = true;
                }
                if (!shouldSend) {
                    Execution execution = executor.getExecution();
                    if (this.executorService.canBePurged(executor)) {
                        this.executorStateStorage.delete(execution);
                    }
                    if (execution.getTrigger() != null && execution.getState().isFailed() && ListUtils.isEmpty((List)execution.getTaskRunList())) {
                        FlowWithSource flow = executor.getFlow();
                        this.triggerRepository.findByExecution(execution).ifPresent(trigger -> this.triggerState.update(this.executionService.resetExecution(flow, execution, trigger)));
                    }
                    return;
                }
                if (log.isDebugEnabled()) {
                    this.executorService.log(log, Boolean.valueOf(false), executor);
                }
                if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
                    executor = executor.withFlow(this.findFlow(executor.getExecution()));
                }
                boolean bl = isTerminated = executor.getFlow() != null && this.executionService.isTerminated((Flow)executor.getFlow(), executor.getExecution());
                if (this.cleanExecutionQueue && isTerminated) {
                    ((JdbcQueue)this.executionQueue).deleteByKey(executor.getExecution().getId());
                }
                if (hasFailure) {
                    this.executionQueue.emit((Object)executor.getExecution());
                } else {
                    ((JdbcQueue)this.executionQueue).emitOnly(null, executor.getExecution());
                }
                Execution execution = executor.getExecution();
                if (!execution.getState().getCurrent().equals((Object)executor.getOriginalState())) {
                    this.processFlowTriggers(execution);
                }
                if (isTerminated) {
                    if (ExecutableUtils.isSubflow((Execution)execution)) {
                        String parentExecutionId = (String)execution.getTrigger().getVariables().get("executionId");
                        String taskRunId = (String)execution.getTrigger().getVariables().get("taskRunId");
                        String taskId = (String)execution.getTrigger().getVariables().get("taskId");
                        Map outputs = (Map)execution.getTrigger().getVariables().get("taskRunOutputs");
                        Variables variables = this.variablesService.of(StorageContext.forExecution((Execution)executor.getExecution()), outputs);
                        SubflowExecutionEnd subflowExecutionEnd = new SubflowExecutionEnd(executor.getExecution(), parentExecutionId, taskRunId, taskId, execution.getState().getCurrent(), variables);
                        this.subflowExecutionEndQueue.emit((Object)subflowExecutionEnd);
                    }
                    if (!ListUtils.isEmpty((List)executor.getFlow().getSla())) {
                        if (executor.getFlow().getSla().stream().anyMatch(ExecutionMonitoringSLA.class::isInstance)) {
                            this.slaMonitorStorage.purge(executor.getExecution().getId());
                        }
                    }
                    if (executor.getFlow().getConcurrency() != null) {
                        boolean queuedThenKilled;
                        boolean bl2 = queuedThenKilled = execution.getState().getCurrent() == State.Type.KILLED && execution.getState().getHistories().stream().anyMatch(h -> h.getState().isQueued()) && execution.getState().getHistories().stream().noneMatch(h -> h.getState().isRunning());
                        if (!queuedThenKilled) {
                            this.concurrencyLimitStorage.decrement((FlowInterface)executor.getFlow());
                            if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
                                FlowWithSource finalFlow = executor.getFlow();
                                this.executionQueuedStorage.pop(executor.getFlow().getTenantId(), executor.getFlow().getNamespace(), executor.getFlow().getId(), Rethrow.throwBiConsumer((dslContext, queued) -> {
                                    Execution newExecution = queued.withState(State.Type.RUNNING);
                                    this.concurrencyLimitStorage.increment((DSLContext)dslContext, (FlowInterface)finalFlow);
                                    this.executionQueue.emit((Object)newExecution);
                                    this.metricRegistry.counter("executor.execution.popped.count", "The total number of executions popped by the Executor", this.metricRegistry.tags(newExecution)).increment();
                                    this.processFlowTriggers(newExecution);
                                }));
                            }
                        }
                    }
                    if (execution.getTrigger() != null) {
                        FlowWithSource flow = executor.getFlow();
                        this.triggerRepository.findByExecution(execution).ifPresent(trigger -> this.triggerState.update(this.executionService.resetExecution(flow, execution, trigger)));
                    }
                    if (this.cleanWorkerJobQueue && !ListUtils.isEmpty((List)executor.getExecution().getTaskRunList())) {
                        List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream().map(taskRun -> taskRun.getId()).toList();
                        ((JdbcQueue)this.workerTaskResultQueue).deleteByKeys(taskRunKeys);
                        ((JdbcQueue)this.workerJobQueue).deleteByKeys(taskRunKeys);
                    }
                }
            }
            catch (QueueException e) {
                if (ignoreFailure) break block23;
                this.executionRepository.lock(executor.getExecution().getId(), pair -> {
                    Execution execution = (Execution)pair.getLeft();
                    Execution failedExecution = this.fail(execution, (Exception)((Object)e));
                    try {
                        this.executionQueue.emit((Object)failedExecution);
                    }
                    catch (QueueException ex) {
                        log.error("Unable to emit the execution {}", (Object)execution.getId(), (Object)ex);
                    }
                    return null;
                });
            }
        }
    }

    private void processFlowTriggers(Execution execution) throws QueueException {
        this.flowTriggerService.withFlowTriggersOnly(this.allFlows.stream()).filter(f -> ListUtils.emptyOnNull((List)f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null).map(f -> f.getFlow()).distinct().flatMap(f -> this.flowTriggerService.computeExecutionsFromFlowTriggerConditions(execution, f).stream()).forEach(Rethrow.throwConsumer(exec -> this.executionQueue.emit(exec)));
        this.flowTriggerService.withFlowTriggersOnly(this.allFlows.stream()).filter(f -> ListUtils.emptyOnNull((List)f.getTrigger().getConditions()).stream().anyMatch(c -> c instanceof MultipleCondition) || f.getTrigger().getPreconditions() != null).map(f -> new MultipleConditionEvent(f.getFlow(), execution)).distinct().forEach(Rethrow.throwConsumer(multipleCondition -> this.multipleConditionEventQueue.emit(multipleCondition)));
    }

    private FlowWithSource findFlow(Execution execution) {
        FlowInterface flow = (FlowInterface)this.flowMetaStore.findByExecution(execution).orElseThrow();
        FlowWithSource flowWithSource = this.pluginDefaultService.injectDefaults(flow, execution);
        if (this.templateExecutorInterface.isPresent()) {
            try {
                flowWithSource = Template.injectTemplate((Flow)flowWithSource, (Execution)execution, (tenantId, namespace, id) -> this.templateExecutorInterface.get().findById(tenantId, namespace, id).orElse(null));
            }
            catch (InternalException e) {
                log.warn("Failed to inject template", (Throwable)e);
            }
        }
        return flowWithSource;
    }

    private void executionDelaySend() {
        if (this.shutdown.get() || this.isPaused.get()) {
            return;
        }
        this.executionDelayStorage.get(executionDelay -> {
            Executor result = this.executionRepository.lock(executionDelay.getExecutionId(), pair -> {
                Executor executor = new Executor((Execution)pair.getLeft(), null);
                this.metricRegistry.counter("executor.execution.delay.ended.count", "The total number of execution delays ended (resumed) by the Executor", this.metricRegistry.tags(executor.getExecution())).increment();
                try {
                    if (executionDelay.getDelayType().equals((Object)ExecutionDelay.DelayType.RESUME_FLOW) && !((Execution)pair.getLeft()).getState().isTerminated()) {
                        if (executionDelay.getTaskRunId() == null) {
                            Execution markAsExecution = ((Execution)pair.getKey()).withState(executionDelay.getState());
                            executor = executor.withExecution(markAsExecution, "pausedRestart");
                        } else {
                            FlowInterface flow = (FlowInterface)this.flowMetaStore.findByExecution((Execution)pair.getLeft()).orElseThrow();
                            Execution markAsExecution = this.executionService.markAs((Execution)pair.getKey(), flow, executionDelay.getTaskRunId(), executionDelay.getState());
                            executor = executor.withExecution(markAsExecution, "pausedRestart");
                        }
                    } else if (executionDelay.getDelayType().equals((Object)ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
                        Execution newAttempt = this.executionService.retryTask((Execution)pair.getKey(), (Flow)this.findFlow((Execution)pair.getKey()), executionDelay.getTaskRunId());
                        executor = executor.withExecution(newAttempt, "retryFailedTask");
                    } else if (executionDelay.getDelayType().equals((Object)ExecutionDelay.DelayType.RESTART_FAILED_FLOW)) {
                        Execution newExecution = this.executionService.replay(executor.getExecution(), null, null);
                        executor = executor.withExecution(newExecution, "retryFailedFlow");
                    } else if (executionDelay.getDelayType().equals((Object)ExecutionDelay.DelayType.CONTINUE_FLOWABLE)) {
                        Execution execution = this.executionService.retryWaitFor(executor.getExecution(), executionDelay.getTaskRunId());
                        executor = executor.withExecution(execution, "continueLoop");
                    }
                }
                catch (Exception e) {
                    executor = this.handleFailedExecutionFromExecutor(executor, e);
                }
                return Pair.of((Object)executor, (Object)((ExecutorState)pair.getRight()));
            });
            if (result != null) {
                this.toExecution(result);
            }
        });
    }

    private void executionSLAMonitor() {
        if (this.shutdown.get() || this.isPaused.get()) {
            return;
        }
        this.slaMonitorStorage.processExpired(Instant.now(), slaMonitor -> {
            Executor result = this.executionRepository.lock(slaMonitor.getExecutionId(), pair -> {
                FlowWithSource flow = this.findFlow((Execution)pair.getLeft());
                Executor executor = new Executor((Execution)pair.getLeft(), null).withFlow(flow);
                Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
                if (sla.isEmpty()) {
                    log.debug("Cannot find the SLA '{}' in the flow for execution '{}', ignoring it.", (Object)slaMonitor.getSlaId(), (Object)slaMonitor.getExecutionId());
                    return null;
                }
                this.metricRegistry.counter("executor.sla.expired.count", "The total number of expired SLA (i.e. executions with SLA of type MAX_DURATION that took longer than the SLA) evaluated by the Executor", this.metricRegistry.tags(executor.getExecution())).increment();
                try {
                    RunContext runContext = this.runContextFactory.of((FlowInterface)executor.getFlow(), executor.getExecution());
                    Optional violation = this.slaService.evaluateExecutionMonitoringSLA(runContext, executor.getExecution(), sla.get());
                    if (violation.isPresent()) {
                        log.info("Processing expired SLA monitor '{}' for execution '{}'.", (Object)slaMonitor.getSlaId(), (Object)slaMonitor.getExecutionId());
                        executor = this.executorService.processViolation(runContext, executor, (Violation)violation.get());
                        this.metricRegistry.counter("executor.sla.violation.count", "The total number of expired SLA (i.e. executions with SLA of type MAX_DURATION that took longer than the SLA) evaluated by the Executor", this.metricRegistry.tags(executor.getExecution())).increment();
                    }
                }
                catch (Exception e) {
                    executor = this.handleFailedExecutionFromExecutor(executor, e);
                }
                return Pair.of((Object)executor, (Object)((ExecutorState)pair.getRight()));
            });
            if (result != null) {
                this.toExecution(result);
            }
        });
    }

    private boolean deduplicateNexts(Execution execution, ExecutorState executorState, List<TaskRun> taskRuns) {
        return taskRuns.stream().anyMatch(taskRun -> {
            String deduplicationKey = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue() + "-" + (taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0) + taskRun.getIteration();
            if (executorState.getChildDeduplication().containsKey(deduplicationKey)) {
                log.warn("Duplicate Nexts on execution '{}' with key '{}'", (Object)execution.getId(), (Object)deduplicationKey);
                return false;
            }
            executorState.getChildDeduplication().put(deduplicationKey, taskRun.getId());
            return true;
        });
    }

    private boolean deduplicateWorkerTask(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String deduplicationKey = taskRun.getId() + (taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0) + taskRun.getIteration();
        State.Type current = (State.Type)executorState.getWorkerTaskDeduplication().get(deduplicationKey);
        if (current == taskRun.getState().getCurrent()) {
            log.warn("Duplicate WorkerTask on execution '{}' for taskRun '{}', value '{}', taskId '{}' on state {}", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId(), current});
            return false;
        }
        executorState.getWorkerTaskDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateSubflowExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String deduplicationKey = this.deduplicationKey(taskRun);
        State.Type current = (State.Type)executorState.getSubflowExecutionDeduplication().get(deduplicationKey);
        if (current == taskRun.getState().getCurrent()) {
            log.warn("Duplicate SubflowExecution on execution '{}' for taskRun '{}', value '{}', taskId '{}', attempt '{}' on state {}", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId(), taskRun.getAttempts() == null ? null : Integer.valueOf(taskRun.getAttempts().size() + 1), current});
            return false;
        }
        executorState.getSubflowExecutionDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
        return true;
    }

    private String deduplicationKey(TaskRun taskRun) {
        return taskRun.getId() + (String)(taskRun.getAttempts() != null ? "-" + taskRun.getAttempts().size() : "") + (String)(taskRun.getIteration() == null ? "" : "-" + taskRun.getIteration());
    }

    private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
        Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
        try {
            this.logQueue.emitAsync(failedExecutionWithLog.getLogs());
        }
        catch (QueueException queueException) {
            // empty catch block
        }
        return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
    }

    @PreDestroy
    public void close() {
        if (this.shutdown.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("Terminating");
            }
            this.setState(Service.ServiceState.TERMINATING);
            this.receiveCancellations.forEach(Runnable::run);
            ExecutorsUtils.closeScheduledThreadPool((ScheduledExecutorService)this.scheduledDelay, (Duration)Duration.ofSeconds(5L), List.of(this.executionDelayFuture, this.monitorSLAFuture));
            this.setState(Service.ServiceState.TERMINATED_GRACEFULLY);
            if (log.isDebugEnabled()) {
                log.debug("Closed ({})", (Object)this.state.get().name());
            }
        }
    }

    private void setState(Service.ServiceState state) {
        this.state.set(state);
        this.eventPublisher.publishEvent((Object)new ServiceStateChangeEvent((Service)this));
    }

    public String getId() {
        return this.id;
    }

    public ServiceType getType() {
        return ServiceType.EXECUTOR;
    }

    public Service.ServiceState getState() {
        return this.state.get();
    }

    private /* synthetic */ Pair lambda$executionQueue$18(FlowWithSource flow, ExecutionRunning executionRunning, Execution execution, DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {
        ExecutionRunning computed = this.executorService.processExecutionRunning((FlowInterface)flow, concurrencyLimit.getRunning().intValue(), executionRunning.withExecution(execution));
        if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
            return Pair.of((Object)computed, (Object)concurrencyLimit.withRunning(Integer.valueOf(concurrencyLimit.getRunning() + 1)));
        }
        if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
            this.executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning((ExecutionRunning)computed));
        }
        return Pair.of((Object)computed, (Object)concurrencyLimit);
    }
}

