/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc.runner;

import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.flows.Concurrency;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.DefaultFlowExecutor;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.ExecutionDelay;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.services.AbstractFlowTriggerService;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.tasks.flows.ForEachItem;
import io.kestra.core.tasks.flows.Template;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutionDelayStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.AbstractJdbcWorkerTaskExecutionStorage;
import io.kestra.jdbc.runner.JdbcQueue;
import io.kestra.jdbc.runner.JdbcRunnerEnabled;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.DSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@Singleton
@JdbcRunnerEnabled
public class JdbcExecutor
implements ExecutorInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcExecutor.class);
    private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();
    private final ScheduledExecutorService schedulerHeartbeat = Executors.newSingleThreadScheduledExecutor();
    private Boolean isShutdown = false;
    @Inject
    private ApplicationContext applicationContext;
    @Inject
    private FlowRepositoryInterface flowRepository;
    @Inject
    private AbstractJdbcExecutionRepository executionRepository;
    @Inject
    @Named(value="executionQueue")
    private QueueInterface<Execution> executionQueue;
    @Inject
    @Named(value="workerJobQueue")
    private QueueInterface<WorkerJob> workerTaskQueue;
    @Inject
    @Named(value="workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
    @Inject
    @Named(value="workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;
    @Inject
    private RunContextFactory runContextFactory;
    @Inject
    private TaskDefaultService taskDefaultService;
    @Inject
    private Optional<Template.TemplateExecutorInterface> templateExecutorInterface;
    @Inject
    private ExecutorService executorService;
    @Inject
    private ConditionService conditionService;
    @Inject
    private MultipleConditionStorageInterface multipleConditionStorage;
    @Inject
    private AbstractFlowTriggerService flowTriggerService;
    @Inject
    private MetricRegistry metricRegistry;
    @Inject
    protected FlowListenersInterface flowListeners;
    @Inject
    private AbstractJdbcWorkerTaskExecutionStorage workerTaskExecutionStorage;
    @Inject
    private ExecutionService executionService;
    @Inject
    private AbstractJdbcExecutionDelayStorage executionDelayStorage;
    @Inject
    private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;
    @Inject
    private AbstractJdbcExecutorStateStorage executorStateStorage;
    @Inject
    private FlowTopologyService flowTopologyService;
    @Inject
    private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
    @Inject
    private AbstractJdbcWorkerInstanceRepository workerInstanceRepository;
    protected List<Flow> allFlows;
    @Inject
    @Named(value="flowQueue")
    private QueueInterface<Flow> flowQueue;
    @Inject
    private WorkerGroupService workerGroupService;
    @Inject
    private SkipExecutionService skipExecutionService;
    @Inject
    private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository;
    @Value(value="${kestra.heartbeat.frequency}")
    private Duration frequency;

    public void run() {
        this.flowListeners.run();
        this.flowListeners.listen(flows -> {
            this.allFlows = flows;
        });
        Await.until(() -> this.allFlows != null, (Duration)Duration.ofMillis(100L), (Duration)Duration.ofMinutes(5L));
        this.applicationContext.registerSingleton((Object)new DefaultFlowExecutor(this.flowListeners, this.flowRepository));
        this.executionQueue.receive(Executor.class, this::executionQueue);
        this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue);
        ScheduledFuture<?> handle = this.schedulerDelay.scheduleAtFixedRate(this::executionDelaySend, 0L, 1L, TimeUnit.SECONDS);
        this.schedulerHeartbeat.scheduleAtFixedRate(this::workersUpdate, this.frequency.toSeconds(), this.frequency.toSeconds(), TimeUnit.SECONDS);
        Thread schedulerDelayThread = new Thread(() -> {
            block2: {
                Await.until(handle::isDone);
                try {
                    handle.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    if (e.getCause().getClass() == CannotCreateTransactionException.class) break block2;
                    log.error("Executor fatal exception", (Throwable)e);
                    this.applicationContext.close();
                    Runtime.getRuntime().exit(1);
                }
            }
        }, "jdbc-delay");
        schedulerDelayThread.start();
        this.flowQueue.receive(FlowTopology.class, either -> {
            Flow flow;
            if (either == null || either.isRight() || either.getLeft() == null || either.getLeft() instanceof FlowWithException) {
                return;
            }
            this.flowTopologyRepository.save(flow, ((flow = (Flow)either.getLeft()).isDeleted() ? Stream.empty() : this.flowTopologyService.topology(flow, this.allFlows.stream())).distinct().collect(Collectors.toList()));
        });
    }

    protected void workersUpdate() {
        this.workerInstanceRepository.lockedWorkersUpdate(context -> {
            List<WorkerInstance> workersToDelete = this.workerInstanceRepository.findAllToDelete((DSLContext)context);
            List<String> workersToDeleteUuids = workersToDelete.stream().map(worker -> worker.getWorkerUuid().toString()).collect(Collectors.toList());
            this.workerJobRunningRepository.getWorkerJobWithWorkerDead((DSLContext)context, workersToDeleteUuids).forEach(workerJobRunning -> {
                if (workerJobRunning instanceof WorkerTaskRunning) {
                    WorkerTaskRunning workerTaskRunning = (WorkerTaskRunning)workerJobRunning;
                    this.workerTaskQueue.emit((Object)WorkerTask.builder().taskRun(workerTaskRunning.getTaskRun()).task(workerTaskRunning.getTask()).runContext(workerTaskRunning.getRunContext()).build());
                    log.warn("[namespace: {}] [flow: {}] [execution: {}] [taskrun: {}] WorkerTask is being resend", new Object[]{workerTaskRunning.getTaskRun().getNamespace(), workerTaskRunning.getTaskRun().getFlowId(), workerTaskRunning.getTaskRun().getExecutionId(), workerTaskRunning.getTaskRun().getId()});
                } else if (workerJobRunning instanceof WorkerTriggerRunning) {
                    WorkerTriggerRunning workerTriggerRunning = (WorkerTriggerRunning)workerJobRunning;
                    this.workerTaskQueue.emit((Object)WorkerTrigger.builder().trigger(workerTriggerRunning.getTrigger()).conditionContext(workerTriggerRunning.getConditionContext()).triggerContext(workerTriggerRunning.getTriggerContext()).build());
                    log.warn("[namespace: {}] [flow: {}] [trigger: {}] WorkerTrigger is being resend", new Object[]{workerTriggerRunning.getTriggerContext().getNamespace(), workerTriggerRunning.getTriggerContext().getFlowId(), workerTriggerRunning.getTriggerContext().getTriggerId()});
                } else {
                    throw new IllegalArgumentException("Object is of type " + workerJobRunning.getClass() + " which should never occurs");
                }
            });
            workersToDelete.forEach(worker -> this.workerInstanceRepository.delete((DSLContext)context, (WorkerInstance)worker));
            return null;
        });
    }

    private void executionQueue(Either<Execution, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize an execution: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        Execution message = (Execution)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getId())) {
            log.warn("Skipping execution {}", (Object)message.getId());
            return;
        }
        Executor result = this.executionRepository.lock(message.getId(), pair -> {
            Execution execution = this.mergeExecution((Execution)pair.getLeft(), message);
            ExecutorState executorState = (ExecutorState)pair.getRight();
            Flow flow = this.transform(this.flowRepository.findByExecution(execution), execution);
            Executor executor = new Executor(execution, null).withFlow(flow);
            if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
                ExecutionCount count = this.executionRepository.executionCounts(flow.getTenantId(), List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())), List.of(State.Type.RUNNING, State.Type.PAUSED), null, null).get(0);
                if ((executor = this.executorService.checkConcurrencyLimit(executor, flow, execution, count.getCount().longValue())).getExecutionQueued() != null) {
                    this.executionQueuedStorage.save(executor.getExecutionQueued());
                    return Pair.of((Object)executor, (Object)executorState);
                }
                if (executor.getExecution().getState().isTerminated()) {
                    return Pair.of((Object)executor, (Object)executorState);
                }
            }
            if (log.isDebugEnabled()) {
                this.executorService.log(log, Boolean.valueOf(true), executor);
            }
            if (!(executor = this.executorService.process(executor)).getNexts().isEmpty() && this.deduplicateNexts(execution, executorState, executor.getNexts())) {
                executor.withExecution(this.executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()), "onNexts");
            }
            if (!executor.getWorkerTasks().isEmpty()) {
                List<WorkerTask> workerTasksDedup = executor.getWorkerTasks().stream().filter(workerTask -> this.deduplicateWorkerTask(execution, executorState, workerTask.getTaskRun())).toList();
                workerTasksDedup.stream().filter(workerTask -> workerTask.getTask().isSendToWorkerTask()).forEach(workerTask -> this.workerTaskQueue.emit(this.workerGroupService.resolveGroupFromJob((WorkerJob)workerTask), workerTask));
                workerTasksDedup.stream().filter(workerTask -> workerTask.getTask().isFlowable()).map(workerTask -> new WorkerTaskResult(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)))).forEach(arg_0 -> this.workerTaskResultQueue.emit(arg_0));
            }
            if (!executor.getWorkerTaskResults().isEmpty()) {
                executor.getWorkerTaskResults().forEach(arg_0 -> this.workerTaskResultQueue.emit(arg_0));
            }
            if (!executor.getExecutionDelays().isEmpty()) {
                executor.getExecutionDelays().forEach(executionDelay -> this.executionDelayStorage.save((ExecutionDelay)executionDelay));
            }
            if (!executor.getWorkerTaskExecutions().isEmpty()) {
                this.workerTaskExecutionStorage.save(executor.getWorkerTaskExecutions());
                List<WorkerTaskExecution> workerTasksExecutionDedup = executor.getWorkerTaskExecutions().stream().filter(workerTaskExecution -> this.deduplicateWorkerTaskExecution(execution, executorState, workerTaskExecution.getTaskRun(), workerTaskExecution.getIteration())).toList();
                workerTasksExecutionDedup.forEach(workerTaskExecution -> {
                    String log = "Create new execution for flow '" + workerTaskExecution.getExecution().getNamespace() + "'.'" + workerTaskExecution.getExecution().getFlowId() + "' with id '" + workerTaskExecution.getExecution().getId() + "'";
                    log.info(log);
                    this.logQueue.emit((Object)LogEntry.of((TaskRun)workerTaskExecution.getTaskRun()).toBuilder().level(Level.INFO).message(log).timestamp(workerTaskExecution.getTaskRun().getState().getStartDate()).thread(Thread.currentThread().getName()).build());
                    this.executionQueue.emit((Object)workerTaskExecution.getExecution());
                    if (((ExecutableTask)workerTaskExecution.getTask()).waitForExecution()) {
                        this.sendWorkerTaskResultForWorkerTaskExecution(execution, (WorkerTaskExecution<?>)workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING));
                    }
                });
            }
            if (this.conditionService.isTerminatedWithListeners(flow, execution) && this.deduplicateFlowTrigger(execution, executorState)) {
                this.flowTriggerService.computeExecutionsFromFlowTriggers(execution, this.allFlows, Optional.of(this.multipleConditionStorage)).forEach(arg_0 -> this.executionQueue.emit(arg_0));
            }
            if (this.conditionService.isTerminatedWithListeners(flow, execution)) {
                this.workerTaskExecutionStorage.get(execution.getId()).ifPresent(workerTaskExecution -> {
                    if (((ExecutableTask)workerTaskExecution.getTask()).waitForExecution()) {
                        this.sendWorkerTaskResultForWorkerTaskExecution(execution, (WorkerTaskExecution<?>)workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING).withState(execution.getState().getCurrent()));
                    }
                    this.workerTaskExecutionStorage.delete((WorkerTaskExecution<?>)workerTaskExecution);
                });
                if (flow.getConcurrency() != null && flow.getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
                    this.executionQueuedStorage.pop(flow.getTenantId(), flow.getNamespace(), flow.getId(), queued -> this.executionQueue.emit((Object)queued.withState(State.Type.RUNNING)));
                }
            }
            return Pair.of((Object)executor, (Object)executorState);
        });
        if (result != null) {
            this.toExecution(result);
        }
    }

    private void sendWorkerTaskResultForWorkerTaskExecution(Execution execution, WorkerTaskExecution<?> workerTaskExecution, TaskRun taskRun) {
        Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
        ExecutableTask executableTask = (ExecutableTask)workerTaskExecution.getTask();
        RunContext runContext = this.runContextFactory.of(workerTaskFlow, workerTaskExecution.getTask(), execution, workerTaskExecution.getTaskRun());
        try {
            Optional maybeWorkerTaskResult = executableTask.createWorkerTaskResult(runContext, taskRun, workerTaskFlow, execution);
            maybeWorkerTaskResult.ifPresent(workerTaskResult -> this.workerTaskResultQueue.emit(workerTaskResult));
        }
        catch (Exception e) {
            log.error("Unable to create the Worker Task Result", (Throwable)e);
        }
    }

    private Execution mergeExecution(Execution locked, Execution message) {
        Execution newExecution = locked;
        if (message.getTaskRunList() != null) {
            for (TaskRun taskRun : message.getTaskRunList()) {
                try {
                    TaskRun existing = newExecution.findTaskRunByTaskRunId(taskRun.getId());
                    if (existing == null || !taskRun.getState().maxDate().isAfter(existing.getState().maxDate())) continue;
                    newExecution = newExecution.withTaskRun(taskRun);
                }
                catch (InternalException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return newExecution;
    }

    private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
        Executor executor;
        if (either.isRight()) {
            log.error("Unable to deserialize a worker task result: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        WorkerTaskResult message = (WorkerTaskResult)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) {
            log.warn("Skipping execution {}", (Object)message.getTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), message);
        }
        if ((executor = this.executionRepository.lock(message.getTaskRun().getExecutionId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            Executor current = new Executor(execution, null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + message.getTaskRun().getExecutionId() + ", receive " + message);
            }
            if (execution.hasTaskRunJoinable(message.getTaskRun())) {
                try {
                    TaskRun taskRun;
                    Task task;
                    Flow flow = this.flowRepository.findByExecution(current.getExecution());
                    Execution newExecution = this.executorService.addDynamicTaskRun(current.getExecution(), flow, message);
                    if (newExecution != null) {
                        current = current.withExecution(newExecution, "addDynamicTaskRun");
                    }
                    if ((task = flow.findTaskByTaskId(message.getTaskRun().getTaskId())) instanceof ForEachItem) {
                        ForEachItem forEachItem = (ForEachItem)task;
                        taskRun = ExecutableUtils.manageIterations((TaskRun)message.getTaskRun(), (Execution)current.getExecution(), (boolean)forEachItem.getTransmitFailed());
                    } else {
                        taskRun = message.getTaskRun();
                    }
                    newExecution = current.getExecution().withTaskRun(taskRun);
                    current = current.withExecution(newExecution, "joinWorkerResult");
                    if (taskRun.getState().isTerminated()) {
                        this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(message, new String[0])).increment();
                        this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(message, new String[0])).record(taskRun.getState().getDuration());
                        log.trace("TaskRun terminated: {}", (Object)taskRun);
                        this.workerJobRunningRepository.deleteByKey(taskRun.getId());
                    }
                    return Pair.of((Object)current, (Object)((ExecutorState)pair.getRight()));
                }
                catch (InternalException e) {
                    return Pair.of((Object)this.handleFailedExecutionFromExecutor(current, (Exception)((Object)e)), (Object)((ExecutorState)pair.getRight()));
                }
            }
            return null;
        })) != null) {
            this.toExecution(executor);
        }
    }

    private void toExecution(Executor executor) {
        boolean shouldSend = false;
        boolean hasFailure = false;
        if (executor.getException() != null) {
            executor = this.handleFailedExecutionFromExecutor(executor, executor.getException());
            shouldSend = true;
            hasFailure = true;
        } else if (executor.isExecutionUpdated()) {
            shouldSend = true;
        }
        if (!shouldSend) {
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(false), executor);
        }
        if (hasFailure) {
            this.executionQueue.emit((Object)executor.getExecution());
        } else {
            ((JdbcQueue)this.executionQueue).emitOnly(null, executor.getExecution());
        }
        if (this.executorService.canBePurged(executor)) {
            this.executorStateStorage.delete(executor.getExecution());
        }
    }

    private Flow transform(Flow flow, Execution execution) {
        if (this.templateExecutorInterface.isPresent()) {
            try {
                flow = Template.injectTemplate((Flow)flow, (Execution)execution, (tenantId, namespace, id) -> this.templateExecutorInterface.get().findById(tenantId, namespace, id).orElse(null));
            }
            catch (InternalException e) {
                log.warn("Failed to inject template", (Throwable)e);
            }
        }
        return this.taskDefaultService.injectDefaults(flow, execution);
    }

    private void executionDelaySend() {
        if (this.isShutdown.booleanValue()) {
            return;
        }
        this.executionDelayStorage.get(executionDelay -> {
            Executor result = this.executionRepository.lock(executionDelay.getExecutionId(), pair -> {
                Executor executor = new Executor((Execution)pair.getLeft(), null);
                try {
                    if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent() == State.Type.PAUSED) {
                        Execution markAsExecution = this.executionService.markAs((Execution)pair.getKey(), executionDelay.getTaskRunId(), executionDelay.getState());
                        executor = executor.withExecution(markAsExecution, "pausedRestart");
                    }
                }
                catch (Exception e) {
                    executor = this.handleFailedExecutionFromExecutor(executor, e);
                }
                return Pair.of((Object)executor, (Object)((ExecutorState)pair.getRight()));
            });
            if (result != null) {
                this.toExecution(result);
            }
        });
    }

    private boolean deduplicateNexts(Execution execution, ExecutorState executorState, List<TaskRun> taskRuns) {
        return taskRuns.stream().anyMatch(taskRun -> {
            String deduplicationKey = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue();
            if (executorState.getChildDeduplication().containsKey(deduplicationKey)) {
                log.trace("Duplicate Nexts on execution '{}' with key '{}'", (Object)execution.getId(), (Object)deduplicationKey);
                return false;
            }
            executorState.getChildDeduplication().put(deduplicationKey, taskRun.getId());
            return true;
        });
    }

    private boolean deduplicateWorkerTask(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String deduplicationKey = taskRun.getId();
        State.Type current = (State.Type)executorState.getWorkerTaskDeduplication().get(deduplicationKey);
        if (current == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTask on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executorState.getWorkerTaskDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateWorkerTaskExecution(Execution execution, ExecutorState executorState, TaskRun taskRun, Integer iteration) {
        String deduplicationKey = taskRun.getId() + (String)(iteration == null ? "" : "-" + iteration);
        State.Type current = (State.Type)executorState.getWorkerTaskExecutionDeduplication().get(deduplicationKey);
        if (current == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTaskExecution on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executorState.getWorkerTaskExecutionDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateFlowTrigger(Execution execution, ExecutorState executorState) {
        Boolean flowTriggerDeduplication = executorState.getFlowTriggerDeduplication();
        if (flowTriggerDeduplication.booleanValue()) {
            log.trace("Duplicate Flow Trigger on execution '{}'", (Object)execution.getId());
            return false;
        }
        executorState.setFlowTriggerDeduplication(Boolean.valueOf(true));
        return true;
    }

    private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
        Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
        try {
            failedExecutionWithLog.getLogs().forEach(arg_0 -> this.logQueue.emitAsync(arg_0));
            return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
        }
        catch (Exception ex) {
            log.error("Failed to produce {}", (Object)e.getMessage(), (Object)ex);
            return executor;
        }
    }

    public void close() throws IOException {
        this.isShutdown = true;
        this.schedulerDelay.shutdown();
        this.schedulerHeartbeat.shutdown();
        this.executionQueue.close();
        this.workerTaskQueue.close();
        this.workerTaskResultQueue.close();
        this.logQueue.close();
    }
}

