/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.worker;

import ch.qos.logback.classic.spi.ILoggingEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.executions.ExecutionKilledTrigger;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.RealtimeTriggerInterface;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.models.triggers.TriggerService;
import io.kestra.core.queues.MessageTooBigException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.UnsupportedMessageException;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextInitializer;
import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.runners.RunContextLoggerFactory;
import io.kestra.core.runners.Worker;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.server.ClusterEvent;
import io.kestra.core.server.Metric;
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.server.WorkerTaskRestartStrategy;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.MaintenanceService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.trace.TraceUtils;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Rethrow;
import io.kestra.core.utils.TruthUtils;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.kestra.worker.AbstractWorkerCallable;
import io.kestra.worker.AbstractWorkerTriggerCallable;
import io.kestra.worker.WorkerSecurityService;
import io.kestra.worker.WorkerTaskCallable;
import io.kestra.worker.WorkerTriggerCallable;
import io.kestra.worker.WorkerTriggerRealtimeCallable;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import lombok.Generated;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@Introspected
public class DefaultWorker
implements Worker {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultWorker.class);
    private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
    private static final String SERVICE_PROPS_WORKER_GROUP = "worker.group";
    @Inject
    @Named(value="workerJobQueue")
    private WorkerJobQueueInterface workerJobQueue;
    @Inject
    @Named(value="workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
    @Inject
    @Named(value="workerTriggerResultQueue")
    private QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
    @Inject
    @Named(value="executionKilledQueue")
    private QueueInterface<ExecutionKilled> executionKilledQueue;
    @Inject
    @Named(value="workerTaskMetricQueue")
    private QueueInterface<MetricEntry> metricEntryQueue;
    @Inject
    @Named(value="triggerQueue")
    private QueueInterface<Trigger> triggerQueue;
    @Inject
    @Named(value="workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;
    @Inject
    @Named(value="clusterEventQueue")
    private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
    @Inject
    private MetricRegistry metricRegistry;
    @Inject
    private ServerConfig serverConfig;
    @Inject
    private LogService logService;
    @Inject
    private RunContextInitializer runContextInitializer;
    @Inject
    private RunContextLoggerFactory runContextLoggerFactory;
    @Inject
    private WorkerSecurityService workerSecurityService;
    @Inject
    private VariablesService variablesService;
    private final Set<String> killedExecution = ConcurrentHashMap.newKeySet();
    private final Map<Long, AtomicInteger> metricRunningCount = new ConcurrentHashMap<Long, AtomicInteger>();
    @VisibleForTesting
    private final Map<String, AtomicInteger> evaluateTriggerRunningCount = new ConcurrentHashMap<String, AtomicInteger>();
    private final List<AbstractWorkerCallable> workerCallableReferences = new ArrayList<AbstractWorkerCallable>();
    private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
    private final AtomicBoolean skipGracefulTermination = new AtomicBoolean(false);
    private final String workerGroup;
    private final String workerGroupKey;
    private final String id;
    private final ExecutorService executorService;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final AtomicBoolean init = new AtomicBoolean(false);
    private final AtomicReference<Service.ServiceState> state = new AtomicReference();
    private final List<Runnable> receiveCancellations = new ArrayList<Runnable>();
    private final Integer numThreads;
    private final AtomicInteger pendingJobCount = new AtomicInteger(0);
    private final AtomicInteger runningJobCount = new AtomicInteger(0);
    @Inject
    private TracerFactory tracerFactory;
    private Tracer tracer;
    @Inject
    private MaintenanceService maintenanceService;

    @Inject
    public DefaultWorker(@Parameter String workerId, @Parameter Integer numThreads, @Nullable @Parameter String workerGroupKey, ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher, WorkerGroupService workerGroupService, ExecutorsUtils executorsUtils) {
        this.id = workerId;
        this.numThreads = numThreads;
        this.workerGroupKey = workerGroupKey;
        this.workerGroup = workerGroupService.resolveGroupFromKey(workerGroupKey);
        this.eventPublisher = eventPublisher;
        this.executorService = executorsUtils.maxCachedThreadPool(numThreads.intValue(), "worker");
        this.setState(Service.ServiceState.CREATED);
    }

    @PostConstruct
    void initMetricsAndTracer() {
        if (this.init.compareAndSet(false, true)) {
            String[] stringArray;
            if (this.workerGroup == null) {
                stringArray = new String[]{};
            } else {
                String[] stringArray2 = new String[2];
                stringArray2[0] = "worker_group";
                stringArray = stringArray2;
                stringArray2[1] = this.workerGroup;
            }
            String[] tags = stringArray;
            this.metricRegistry.gauge("worker.job.thread", "The number of worker threads", (Number)this.numThreads, tags);
            this.metricRegistry.gauge("worker.job.pending", "The number of jobs (tasks or triggers) pending to be run by the Worker", (Number)this.pendingJobCount, tags);
            this.metricRegistry.gauge("worker.job.running", "The number of jobs (tasks or triggers) currently running inside the Worker", (Number)this.runningJobCount, tags);
            this.tracer = this.tracerFactory.getTracer(DefaultWorker.class, "WORKER");
        }
    }

    public Set<Metric> getMetrics() {
        if (this.metricRegistry == null) {
            return Collections.emptySet();
        }
        Stream<String> metrics = Stream.of("worker.job.thread", "worker.job.pending", "worker.job.running");
        return metrics.flatMap(metric -> Optional.ofNullable(this.metricRegistry.findGauge(metric)).stream()).map(Metric::of).collect(Collectors.toSet());
    }

    public void run() {
        this.receiveCancellations.addFirst(this.executionKilledQueue.receive(executionKilled -> {
            if (executionKilled == null || !executionKilled.isLeft()) {
                return;
            }
            ExecutionKilled.State state = ((ExecutionKilled)executionKilled.getLeft()).getState();
            if (state != null && state != ExecutionKilled.State.EXECUTED) {
                return;
            }
            this.metricRegistry.counter("worker.killed.count", "The total number of executions killed events received the Executor", this.metricRegistry.tags((ExecutionKilled)executionKilled.getLeft())).increment();
            DefaultWorker defaultWorker = this;
            synchronized (defaultWorker) {
                Object patt0$temp = executionKilled.getLeft();
                if (patt0$temp instanceof ExecutionKilledExecution) {
                    ExecutionKilledExecution executionKilledExecution = (ExecutionKilledExecution)patt0$temp;
                    this.killedExecution.add(executionKilledExecution.getExecutionId());
                    this.workerCallableReferences.stream().filter(workerCallable -> workerCallable instanceof WorkerTaskCallable).map(workerCallable -> (WorkerTaskCallable)workerCallable).filter(workerCallable -> executionKilledExecution.isEqual(workerCallable.getWorkerTask())).forEach(AbstractWorkerCallable::kill);
                } else {
                    Object patt1$temp = executionKilled.getLeft();
                    if (patt1$temp instanceof ExecutionKilledTrigger) {
                        ExecutionKilledTrigger executionKilledTrigger = (ExecutionKilledTrigger)patt1$temp;
                        this.workerCallableReferences.stream().filter(workerCallable -> workerCallable instanceof AbstractWorkerTriggerCallable).map(workerCallable -> (AbstractWorkerTriggerCallable)workerCallable).filter(workerCallable -> executionKilledTrigger.isEqual((TriggerContext)workerCallable.getWorkerTrigger().getTriggerContext())).forEach(AbstractWorkerCallable::kill);
                    }
                }
            }
        }));
        this.receiveCancellations.addFirst(this.workerJobQueue.subscribe(this.id, this.workerGroup, either -> {
            this.pendingJobCount.incrementAndGet();
            this.executorService.execute(() -> {
                this.pendingJobCount.decrementAndGet();
                this.runningJobCount.incrementAndGet();
                try {
                    if (either.isRight()) {
                        log.error("Unable to deserialize a worker job: {}", (Object)((DeserializationException)either.getRight()).getMessage());
                        this.handleDeserializationError((DeserializationException)either.getRight());
                        return;
                    }
                    WorkerJob workerTask = (WorkerJob)either.getLeft();
                    if (workerTask instanceof WorkerTask) {
                        WorkerTask task = (WorkerTask)workerTask;
                        this.handleTask(task);
                    } else if (workerTask instanceof WorkerTrigger) {
                        WorkerTrigger trigger = (WorkerTrigger)workerTask;
                        this.handleTrigger(trigger);
                    }
                }
                finally {
                    this.runningJobCount.decrementAndGet();
                }
            });
        }));
        this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
        if (this.maintenanceService.isInMaintenanceMode()) {
            this.enterMaintenance();
        } else {
            this.setState(Service.ServiceState.RUNNING);
        }
        if (this.workerGroupKey != null) {
            log.info("Worker started with {} thread(s) in group '{}'", (Object)this.numThreads, (Object)this.workerGroupKey);
        } else {
            log.info("Worker started with {} thread(s)", (Object)this.numThreads);
        }
    }

    private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a cluster event: {}", (Object)((DeserializationException)either.getRight()).getMessage());
            return;
        }
        ClusterEvent clusterEvent = (ClusterEvent)either.getLeft();
        log.info("Cluster event received: {}", (Object)clusterEvent);
        switch (clusterEvent.eventType()) {
            case MAINTENANCE_ENTER: {
                this.enterMaintenance();
                break;
            }
            case MAINTENANCE_EXIT: {
                this.exitMaintenance();
            }
        }
    }

    private void enterMaintenance() {
        this.executionKilledQueue.pause();
        this.workerJobQueue.pause();
        this.setState(Service.ServiceState.MAINTENANCE);
    }

    private void exitMaintenance() {
        this.executionKilledQueue.resume();
        this.workerJobQueue.resume();
        this.setState(Service.ServiceState.RUNNING);
    }

    private void setState(Service.ServiceState state) {
        this.state.set(state);
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put(SERVICE_PROPS_WORKER_GROUP, this.workerGroup);
        this.eventPublisher.publishEvent((Object)new ServiceStateChangeEvent((Service)this, properties));
    }

    private void handleDeserializationError(DeserializationException deserializationException) {
        if (deserializationException.getRecord() != null) {
            try {
                String type;
                JsonNode json = MAPPER.readTree(deserializationException.getRecord());
                String string = type = json.get("type") != null ? json.get("type").asText() : null;
                if ("task".equals(type)) {
                    TaskRun taskRun = (TaskRun)MAPPER.treeToValue((TreeNode)json.get("taskRun"), TaskRun.class);
                    this.workerTaskResultQueue.emit((Object)new WorkerTaskResult(taskRun.fail()));
                } else if ("trigger".equals(type)) {
                    TriggerContext triggerContext = (TriggerContext)MAPPER.treeToValue((TreeNode)json.get("triggerContext"), TriggerContext.class);
                    WorkerTriggerResult workerTriggerResult = WorkerTriggerResult.builder().triggerContext(triggerContext).execution(Optional.empty()).build();
                    this.workerTriggerResultQueue.emit((Object)workerTriggerResult);
                }
            }
            catch (QueueException | IOException e) {
                log.error("Unexpected exception when trying to handle a deserialization error", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleTask(WorkerTask workerTask) {
        if (workerTask.getTask() instanceof RunnableTask) {
            this.run(workerTask, true);
        } else {
            Task task = workerTask.getTask();
            if (task instanceof WorkingDirectory) {
                WorkingDirectory workingDirectory = (WorkingDirectory)task;
                DefaultRunContext runContext = this.runContextInitializer.forWorkingDirectory((DefaultRunContext)workerTask.getRunContext(), workerTask);
                DefaultRunContext workingDirectoryRunContext = runContext.clone();
                try {
                    try {
                        workingDirectory.preExecuteTasks((RunContext)workingDirectoryRunContext, workerTask.getTaskRun());
                    }
                    catch (Exception e) {
                        workingDirectoryRunContext.logger().error("Failed preExecuteTasks on WorkingDirectory: {}", (Object)e.getMessage(), (Object)e);
                        WorkerTask failed = workerTask.withTaskRun(workerTask.fail());
                        try {
                            this.workerTaskResultQueue.emit((Object)new WorkerTaskResult(failed.getTaskRun()));
                        }
                        catch (QueueException ex) {
                            log.error("Unable to emit the worker task result for task {} taskrun {}", new Object[]{failed.getTask().getId(), failed.getTaskRun().getId(), e});
                        }
                        this.logTerminated(failed);
                        this.logTerminated(workerTask);
                        runContext.cleanup();
                        return;
                    }
                    for (Task currentTask : workingDirectory.getTasks()) {
                        if (Boolean.TRUE.equals(currentTask.getDisabled())) continue;
                        WorkerTask currentWorkerTask = workingDirectory.workerTask(workerTask.getTaskRun(), currentTask, (RunContext)this.runContextInitializer.forPlugin(runContext, (Plugin)currentTask));
                        WorkerTaskResult workerTaskResult = null;
                        try {
                            if (!TruthUtils.isTruthy((String)runContext.render(currentWorkerTask.getTask().getRunIf()))) {
                                workerTaskResult = new WorkerTaskResult(currentWorkerTask.getTaskRun().withState(State.Type.SKIPPED));
                                this.workerTaskResultQueue.emit((Object)workerTaskResult);
                            } else {
                                workerTaskResult = this.run(currentWorkerTask, false);
                            }
                        }
                        catch (IllegalVariableEvaluationException e) {
                            RunContextLogger contextLogger = this.runContextLoggerFactory.create(currentWorkerTask);
                            contextLogger.logger().error("Failed evaluating runIf: {}", (Object)e.getMessage(), (Object)e);
                            try {
                                this.workerTaskResultQueue.emit((Object)new WorkerTaskResult(workerTask.fail()));
                            }
                            catch (QueueException ex) {
                                log.error("Unable to emit the worker task result for task {} taskrun {}", new Object[]{currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e});
                            }
                        }
                        catch (QueueException e) {
                            log.error("Unable to emit the worker task result for task {} taskrun {}", new Object[]{currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e});
                        }
                        if (workerTaskResult == null || workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) break;
                        runContext = this.runContextInitializer.forWorker(runContext.clone(), workerTaskResult, workerTask.getTaskRun());
                    }
                    try {
                        workingDirectory.postExecuteTasks((RunContext)workingDirectoryRunContext, workerTask.getTaskRun());
                    }
                    catch (Exception e) {
                        workingDirectoryRunContext.logger().error("Failed postExecuteTasks on WorkingDirectory: {}", (Object)e.getMessage(), (Object)e);
                        try {
                            this.workerTaskResultQueue.emit((Object)new WorkerTaskResult(workerTask.fail()));
                        }
                        catch (QueueException ex) {
                            log.error("Unable to emit the worker task result for task {} taskrun {}", new Object[]{workerTask.getTask().getId(), workerTask.getTaskRun().getId(), e});
                        }
                    }
                }
                finally {
                    this.logTerminated(workerTask);
                    runContext.cleanup();
                }
            } else {
                throw new RuntimeException("Unable to process the task '" + workerTask.getTask().getId() + "' as it's not a runnable task");
            }
        }
    }

    private void publishTriggerExecution(WorkerTrigger workerTrigger, Optional<Execution> evaluate) {
        FlowInterface flow;
        this.metricRegistry.counter("worker.trigger.execution.count", "The total number of triggers evaluated by the Worker", this.metricRegistry.tags(workerTrigger, this.workerGroup, new String[0])).increment();
        if (log.isDebugEnabled()) {
            this.logService.logTrigger((TriggerContext)workerTrigger.getTriggerContext(), Level.DEBUG, "[type: {}] {}", new Object[]{workerTrigger.getTrigger().getType(), evaluate.map(execution -> "New execution '" + execution.getId() + "'").orElse("Empty evaluation")});
        }
        if ((flow = workerTrigger.getConditionContext().getFlow()).getLabels() != null) {
            evaluate = evaluate.map(execution -> {
                List executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList();
                executionLabels.addAll(LabelService.labelsExcludingSystem((FlowInterface)flow));
                return execution.withLabels(executionLabels);
            });
        }
        try {
            this.workerTriggerResultQueue.emit((Object)WorkerTriggerResult.builder().execution(evaluate).triggerContext((TriggerContext)workerTrigger.getTriggerContext()).trigger(workerTrigger.getTrigger()).build());
        }
        catch (QueueException e) {
            this.handleTriggerError(workerTrigger, e);
        }
    }

    private void handleTriggerError(WorkerTrigger workerTrigger, Throwable e) {
        this.metricRegistry.counter("worker.trigger.error.count", "The total number of trigger evaluations that failed inside the Worker", this.metricRegistry.tags(workerTrigger, this.workerGroup, new String[0])).increment();
        this.logError(workerTrigger, e);
        try {
            Execution execution;
            Execution execution2 = execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution((AbstractTrigger)workerTrigger.getTrigger(), (ConditionContext)workerTrigger.getConditionContext(), (TriggerContext)workerTrigger.getTriggerContext(), (Output)null).withState(State.Type.FAILED) : null;
            if (execution != null) {
                try {
                    this.logQueue.emitAsync(RunContextLogger.logEntries((ILoggingEvent)Execution.loggingEventFromException((Throwable)e), (LogEntry)LogEntry.of((Execution)execution)));
                }
                catch (QueueException queueException) {
                    // empty catch block
                }
            }
            this.workerTriggerResultQueue.emit((Object)WorkerTriggerResult.builder().triggerContext((TriggerContext)workerTrigger.getTriggerContext()).trigger(workerTrigger.getTrigger()).execution(Optional.ofNullable(execution)).build());
        }
        catch (QueueException ex) {
            log.error("Unable to send the worker trigger result {}.{}.{}", new Object[]{workerTrigger.getTriggerContext().getNamespace(), workerTrigger.getTriggerContext().getFlowId(), workerTrigger.getTriggerContext().getTriggerId(), ex});
        }
    }

    private void handleRealtimeTriggerError(WorkerTrigger workerTrigger, Throwable e) {
        this.metricRegistry.counter("worker.trigger.error.count", "The total number of trigger evaluations that failed inside the Worker", this.metricRegistry.tags(workerTrigger, this.workerGroup, new String[0])).increment();
        Execution execution = TriggerService.generateRealtimeExecution((AbstractTrigger)workerTrigger.getTrigger(), (ConditionContext)workerTrigger.getConditionContext(), (TriggerContext)workerTrigger.getTriggerContext(), null).withState(State.Type.FAILED);
        Logger logger = workerTrigger.getConditionContext().getRunContext().logger();
        this.logService.logExecution(execution, logger, Level.ERROR, "[date: {}] Realtime trigger failed to be created in the worker with error: {}", new Object[]{workerTrigger.getTriggerContext().getDate(), e != null ? e.getMessage() : "unknown", e});
        if (logger.isTraceEnabled() && e != null) {
            logger.trace(Throwables.getStackTraceAsString((Throwable)e));
        }
        try {
            this.workerTriggerResultQueue.emit((Object)WorkerTriggerResult.builder().execution(Optional.of(execution)).triggerContext((TriggerContext)workerTrigger.getTriggerContext()).trigger(workerTrigger.getTrigger()).build());
        }
        catch (QueueException ex) {
            log.error("Unable to send the worker trigger result {}.{}.{}", new Object[]{workerTrigger.getTriggerContext().getNamespace(), workerTrigger.getTriggerContext().getFlowId(), workerTrigger.getTriggerContext().getTriggerId(), ex});
        }
    }

    private void handleTrigger(WorkerTrigger workerTrigger) {
        this.metricRegistry.counter("worker.trigger.started.count", "The total number of trigger evaluations started by the Worker", this.metricRegistry.tags(workerTrigger, this.workerGroup, new String[0])).increment();
        Trigger trigger = workerTrigger.getTriggerContext();
        trigger.setWorkerId(this.id);
        try {
            this.triggerQueue.emit((Object)trigger);
        }
        catch (QueueException e) {
            this.handleTriggerError(workerTrigger, e);
        }
        this.metricRegistry.timer("worker.trigger.duration", "Trigger evaluation duration inside the Worker", this.metricRegistry.tags(workerTrigger, this.workerGroup, new String[0])).record(() -> {
            DefaultRunContext runContext;
            StopWatch stopWatch;
            block7: {
                stopWatch = new StopWatch();
                stopWatch.start();
                this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> (AtomicInteger)this.metricRegistry.gauge("worker.trigger.running.count", "The number of triggers currently evaluating inside the Worker", (Number)new AtomicInteger(0), this.metricRegistry.tags(workerTrigger, this.workerGroup, new String[0])));
                this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
                runContext = (DefaultRunContext)workerTrigger.getConditionContext().getRunContext();
                this.runContextInitializer.forWorker(runContext, workerTrigger);
                try {
                    AbstractWorkerTriggerCallable workerCallable;
                    this.logService.logTrigger((TriggerContext)workerTrigger.getTriggerContext(), runContext.logger(), Level.INFO, "Type {} started", new Object[]{workerTrigger.getTrigger().getType()});
                    AbstractTrigger patt0$temp = workerTrigger.getTrigger();
                    if (patt0$temp instanceof PollingTriggerInterface) {
                        PollingTriggerInterface pollingTrigger = (PollingTriggerInterface)patt0$temp;
                        workerCallable = new WorkerTriggerCallable((RunContext)runContext, workerTrigger, pollingTrigger);
                        State.Type state = this.callJob(workerCallable);
                        if (workerCallable.getException() != null || !state.equals((Object)State.Type.SUCCESS)) {
                            this.handleTriggerError(workerTrigger, workerCallable.getException());
                        }
                        if (!state.equals((Object)State.Type.FAILED)) {
                            this.publishTriggerExecution(workerTrigger, ((WorkerTriggerCallable)workerCallable).getEvaluate());
                        }
                        break block7;
                    }
                    AbstractTrigger patt1$temp = workerTrigger.getTrigger();
                    if (!(patt1$temp instanceof RealtimeTriggerInterface)) break block7;
                    RealtimeTriggerInterface streamingTrigger = (RealtimeTriggerInterface)patt1$temp;
                    workerCallable = new WorkerTriggerRealtimeCallable((RunContext)runContext, workerTrigger, streamingTrigger, throwable -> this.handleTriggerError(workerTrigger, (Throwable)throwable), execution -> this.publishTriggerExecution(workerTrigger, Optional.of(execution)));
                    State.Type state = this.callJob(workerCallable);
                    if (workerCallable.getException() == null && state.equals((Object)State.Type.SUCCESS)) break block7;
                    this.handleRealtimeTriggerError(workerTrigger, workerCallable.getException());
                }
                catch (Exception e) {
                    try {
                        this.handleTriggerError(workerTrigger, e);
                    }
                    catch (Throwable throwable2) {
                        this.logService.logTrigger((TriggerContext)workerTrigger.getTriggerContext(), runContext.logger(), Level.INFO, "Type {} completed in {}", new Object[]{workerTrigger.getTrigger().getType(), DurationFormatUtils.formatDurationHMS((long)stopWatch.getTime(TimeUnit.MILLISECONDS))});
                        workerTrigger.getConditionContext().getRunContext().cleanup();
                        throw throwable2;
                    }
                    this.logService.logTrigger((TriggerContext)workerTrigger.getTriggerContext(), runContext.logger(), Level.INFO, "Type {} completed in {}", new Object[]{workerTrigger.getTrigger().getType(), DurationFormatUtils.formatDurationHMS((long)stopWatch.getTime(TimeUnit.MILLISECONDS))});
                    workerTrigger.getConditionContext().getRunContext().cleanup();
                }
            }
            this.logService.logTrigger((TriggerContext)workerTrigger.getTriggerContext(), runContext.logger(), Level.INFO, "Type {} completed in {}", new Object[]{workerTrigger.getTrigger().getType(), DurationFormatUtils.formatDurationHMS((long)stopWatch.getTime(TimeUnit.MILLISECONDS))});
            workerTrigger.getConditionContext().getRunContext().cleanup();
            this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(-1);
        });
        this.metricRegistry.counter("worker.trigger.ended.count", "The total number of trigger evaluations ended by the Worker", this.metricRegistry.tags(workerTrigger, this.workerGroup, new String[0])).increment();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) {
        Optional<Object> hash;
        DefaultRunContext runContext;
        block43: {
            this.metricRegistry.counter("worker.started.count", "The total number of tasks started by the Worker", this.metricRegistry.tags(workerTask, this.workerGroup, new String[0])).increment();
            if (workerTask.getTaskRun().getState().getCurrent() == State.Type.CREATED) {
                this.metricRegistry.timer("worker.queued.duration", "Task queued duration inside the Worker", this.metricRegistry.tags(workerTask, this.workerGroup, new String[0])).record(Duration.between(workerTask.getTaskRun().getState().getStartDate(), Instant.now()));
            }
            if (!Boolean.TRUE.equals(workerTask.getTaskRun().getForceExecution()) && this.killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
                WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.KILLED));
                try {
                    this.workerTaskResultQueue.emit((Object)workerTaskResult);
                }
                catch (QueueException ex) {
                    log.error("Unable to emit the worker task result for task {} taskrun {}", new Object[]{workerTask.getTask().getId(), workerTask.getTaskRun().getId(), ex});
                }
                this.logTerminated(workerTask);
                return workerTaskResult;
            }
            this.logService.logTaskRun(workerTask.getTaskRun(), Level.INFO, "Type {} started", new Object[]{workerTask.getTask().getClass().getSimpleName()});
            workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING));
            runContext = this.runContextInitializer.forWorker((DefaultRunContext)workerTask.getRunContext(), workerTask);
            hash = Optional.empty();
            if (workerTask.getTask().getTaskCache() != null && workerTask.getTask().getTaskCache().getEnabled().booleanValue()) {
                runContext.logger().debug("Task output caching is enabled for task '{}''", (Object)workerTask.getTask().getId());
                hash = this.hashTask((RunContext)runContext, workerTask.getTask());
                if (hash.isPresent()) {
                    try {
                        Optional cacheFile = runContext.storage().getCacheFile((String)hash.get(), workerTask.getTaskRun().getValue(), workerTask.getTask().getTaskCache().getTtl());
                        if (!cacheFile.isPresent()) break block43;
                        runContext.logger().info("Skipping task execution for task '{}' as there is an existing cache entry for it", (Object)workerTask.getTask().getId());
                        try (ZipInputStream archive = new ZipInputStream((InputStream)cacheFile.get());){
                            if (archive.getNextEntry() != null) {
                                byte[] cache = archive.readAllBytes();
                                Map outputMap = (Map)JacksonMapper.ofIon().readValue(cache, JacksonMapper.MAP_TYPE_REFERENCE);
                                Variables variables = this.variablesService.of(StorageContext.forTask((TaskRun)workerTask.getTaskRun()), outputMap);
                                TaskRunAttempt attempt = TaskRunAttempt.builder().state(new State().withState(State.Type.SUCCESS)).workerId(this.id).build();
                                List<TaskRunAttempt> attempts = this.addAttempt(workerTask, attempt);
                                TaskRun taskRun = workerTask.getTaskRun().withAttempts(attempts).withOutputs(variables).withState(State.Type.SUCCESS);
                                WorkerTaskResult workerTaskResult = new WorkerTaskResult(taskRun);
                                this.workerTaskResultQueue.emit((Object)workerTaskResult);
                                WorkerTaskResult workerTaskResult2 = workerTaskResult;
                                return workerTaskResult2;
                            }
                        }
                    }
                    catch (QueueException | IOException | RuntimeException e) {
                        runContext.logger().error("Unexpected exception while loading the cache for task '{}', the task will be executed instead.", (Object)workerTask.getTask().getId(), (Object)e);
                    }
                }
            }
        }
        try {
            workerTask = this.runAttempt((RunContext)runContext, workerTask);
            TaskRunAttempt lastAttempt = workerTask.getTaskRun().lastAttempt();
            if (lastAttempt == null) {
                throw new IllegalStateException("Can find lastAttempt on taskRun '" + workerTask.getTaskRun().toString(true) + "'");
            }
            State.Type state = lastAttempt.getState().getCurrent();
            if (this.shutdown.get() && this.serverConfig.workerTaskRestartStrategy() != WorkerTaskRestartStrategy.NEVER && state.isFailed()) {
                List dynamicWorkerResults = workerTask.getRunContext().dynamicWorkerResults();
                List<TaskRun> dynamicTaskRuns = this.dynamicWorkerResults(dynamicWorkerResults);
                WorkerTaskResult variables = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
                return variables;
            }
            if (workerTask.getTask().getRetry() != null && workerTask.getTask().getRetry().getWarningOnRetry().booleanValue() && workerTask.getTaskRun().attemptNumber() > 1 && state == State.Type.SUCCESS) {
                state = State.Type.WARNING;
            }
            if (workerTask.getTask().isAllowFailure() && !workerTask.getTaskRun().shouldBeRetried(workerTask.getTask().getRetry()) && state.isFailed()) {
                state = State.Type.WARNING;
            }
            if (workerTask.getTask().isAllowWarning() && State.Type.WARNING.equals((Object)state)) {
                state = State.Type.SUCCESS;
            }
            List dynamicWorkerResults = workerTask.getRunContext().dynamicWorkerResults();
            List<TaskRun> dynamicTaskRuns = this.dynamicWorkerResults(dynamicWorkerResults);
            workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
            WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
            this.workerTaskResultQueue.emit((Object)workerTaskResult);
            if (workerTask.getTask().getTaskCache() != null && workerTask.getTask().getTaskCache().getEnabled().booleanValue() && hash.isPresent() && (state == State.Type.SUCCESS || state == State.Type.WARNING)) {
                runContext.logger().info("Uploading a cache entry for task '{}'", (Object)workerTask.getTask().getId());
                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
                     ZipOutputStream archive = new ZipOutputStream(bos);){
                    ZipEntry zipEntry = new ZipEntry("outputs.ion");
                    archive.putNextEntry(zipEntry);
                    archive.write(JacksonMapper.ofIon().writeValueAsBytes((Object)workerTask.getTaskRun().getOutputs()));
                    archive.closeEntry();
                    archive.finish();
                    Path archiveFile = runContext.workingDir().createTempFile(".zip");
                    Files.write(archiveFile, bos.toByteArray(), new OpenOption[0]);
                    URI uri = runContext.storage().putCacheFile(archiveFile.toFile(), (String)hash.get(), workerTask.getTaskRun().getValue());
                    runContext.logger().debug("Caching entry uploaded in URI {}", (Object)uri);
                }
                catch (IOException | RuntimeException e) {
                    runContext.logger().error("Unexpected exception while uploading the cache entry for task '{}', the task not be cached.", (Object)workerTask.getTask().getId(), (Object)e);
                }
            }
            WorkerTaskResult workerTaskResult3 = workerTaskResult;
            return workerTaskResult3;
        }
        catch (QueueException e) {
            TaskRun failed = workerTask.fail();
            if (e instanceof MessageTooBigException) {
                failed = failed.withOutputs(Variables.empty());
            }
            if (e instanceof UnsupportedMessageException) {
                failed = failed.withOutputs(Variables.empty());
            }
            WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
            RunContextLogger contextLogger = this.runContextLoggerFactory.create(workerTask);
            contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", (Object)e.getMessage(), (Object)e);
            try {
                this.workerTaskResultQueue.emit((Object)workerTaskResult);
            }
            catch (QueueException ex) {
                log.error("Unable to emit the worker task result for task {} taskrun {}", new Object[]{workerTask.getTask().getId(), workerTask.getTaskRun().getId(), e});
            }
            WorkerTaskResult workerTaskResult4 = workerTaskResult;
            return workerTaskResult4;
        }
        finally {
            this.logTerminated(workerTask);
            if (cleanUp.booleanValue()) {
                workerTask.getRunContext().cleanup();
            }
        }
    }

    private Optional<String> hashTask(RunContext runContext, Task task) {
        try {
            Map map = JacksonMapper.toMap((Object)task);
            Map<String, String> variables = Map.of("workingDir", "workingDir");
            Map rMap = runContext.render(map, variables);
            byte[] json = JacksonMapper.ofJson().writeValueAsBytes((Object)rMap);
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            digest.update(json);
            byte[] bytes = digest.digest();
            return Optional.of(HexFormat.of().formatHex(bytes));
        }
        catch (JsonProcessingException | IllegalVariableEvaluationException | RuntimeException | NoSuchAlgorithmException e) {
            runContext.logger().error("Unable to create the cache key for the task '{}'", (Object)task.getId(), (Object)e);
            return Optional.empty();
        }
    }

    private List<TaskRun> dynamicWorkerResults(List<WorkerTaskResult> dynamicWorkerResults) {
        return dynamicWorkerResults.stream().map(WorkerTaskResult::getTaskRun).map(taskRun -> taskRun.withDynamic(Boolean.valueOf(true))).toList();
    }

    private void logTerminated(WorkerTask workerTask) {
        this.metricRegistry.counter("worker.ended.count", "The total number of tasks ended by the Worker", this.metricRegistry.tags(workerTask, this.workerGroup, new String[0])).increment();
        this.metricRegistry.timer("worker.ended.duration", "Task run duration inside the Worker", this.metricRegistry.tags(workerTask, this.workerGroup, new String[0])).record(workerTask.getTaskRun().getState().getDuration());
        this.logService.logTaskRun(workerTask.getTaskRun(), Level.INFO, "Type {} with state {} completed in {}", new Object[]{workerTask.getTask().getClass().getSimpleName(), workerTask.getTaskRun().getState().getCurrent(), workerTask.getTaskRun().getState().humanDuration()});
    }

    private void logError(WorkerTrigger workerTrigger, Throwable e) {
        Logger logger = workerTrigger.getConditionContext().getRunContext().logger();
        if (e instanceof InterruptedException || e != null && e.getCause() instanceof InterruptedException) {
            this.logService.logTrigger((TriggerContext)workerTrigger.getTriggerContext(), logger, Level.WARN, "[date: {}] Trigger evaluation interrupted in the worker", new Object[]{workerTrigger.getTriggerContext().getDate()});
        } else {
            this.logService.logTrigger((TriggerContext)workerTrigger.getTriggerContext(), logger, Level.WARN, "[date: {}] Trigger evaluation failed in the worker with error: {}", new Object[]{workerTrigger.getTriggerContext().getDate(), e != null ? e.getMessage() : "unknown", e});
        }
        if (logger.isTraceEnabled() && e != null) {
            logger.trace(Throwables.getStackTraceAsString((Throwable)e));
        }
    }

    private WorkerTask runAttempt(RunContext runContext, WorkerTask workerTask) throws QueueException {
        Logger logger = runContext.logger();
        Task task = workerTask.getTask();
        if (!(task instanceof RunnableTask)) {
            State.Type state = State.Type.fail((Task)workerTask.getTask());
            TaskRunAttempt attempt = TaskRunAttempt.builder().state(new State().withState(state)).workerId(this.id).build();
            List<TaskRunAttempt> attempts = this.addAttempt(workerTask, attempt);
            TaskRun taskRun = workerTask.getTaskRun().withAttempts(attempts);
            logger.error("Unable to execute the task '" + workerTask.getTask().getId() + "': only runnable tasks can be executed by the worker but the task is of type " + String.valueOf(workerTask.getTask().getClass()));
            return workerTask.withTaskRun(taskRun);
        }
        RunnableTask task2 = (RunnableTask)task;
        TaskRunAttempt.TaskRunAttemptBuilder builder = TaskRunAttempt.builder().state(new State().withState(State.Type.RUNNING)).workerId(this.id);
        this.workerTaskResultQueue.emit((Object)new WorkerTaskResult(workerTask.getTaskRun().withAttempts(this.addAttempt(workerTask, builder.build()))));
        AtomicInteger metricRunningCount = this.getMetricRunningCount(workerTask);
        metricRunningCount.incrementAndGet();
        WorkerTaskCallable workerTaskCallable = new WorkerTaskCallable(workerTask, task2, runContext, this.metricRegistry);
        State.Type state = this.callJob(workerTaskCallable);
        metricRunningCount.decrementAndGet();
        TaskRunAttempt taskRunAttempt = builder.build().withState(state).withLogFile(runContext.logFileURI());
        runContext.metrics().forEach(metric -> {
            try {
                this.metricEntryQueue.emit((Object)MetricEntry.of((TaskRun)workerTask.getTaskRun(), (AbstractMetricEntry)metric, (ExecutionKind)workerTask.getExecutionKind()));
            }
            catch (QueueException queueException) {
                // empty catch block
            }
        });
        List<TaskRunAttempt> attempts = this.addAttempt(workerTask, taskRunAttempt);
        TaskRun taskRun = workerTask.getTaskRun().withAttempts(attempts);
        try {
            Variables variables = this.variablesService.of(StorageContext.forTask((TaskRun)taskRun), workerTaskCallable.getTaskOutput());
            taskRun = taskRun.withOutputs(variables);
        }
        catch (Exception e) {
            logger.warn("Unable to save output on taskRun '{}'", (Object)taskRun, (Object)e);
        }
        return workerTask.withTaskRun(taskRun);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private State.Type callJob(AbstractWorkerCallable workerJobCallable) {
        DefaultWorker defaultWorker = this;
        synchronized (defaultWorker) {
            this.workerCallableReferences.add(workerJobCallable);
        }
        try {
            defaultWorker = (State.Type)this.tracer.inCurrentContext(workerJobCallable.runContext, workerJobCallable.getType(), Attributes.of((AttributeKey)TraceUtils.ATTR_UID, (Object)workerJobCallable.getUid()), () -> this.workerSecurityService.callInSecurityContext(workerJobCallable));
            return defaultWorker;
        }
        catch (Exception e) {
            workerJobCallable.exception = e;
            State.Type type = State.Type.FAILED;
            return type;
        }
        finally {
            DefaultWorker defaultWorker2 = this;
            synchronized (defaultWorker2) {
                this.workerCallableReferences.remove(workerJobCallable);
            }
        }
    }

    private List<TaskRunAttempt> addAttempt(WorkerTask workerTask, TaskRunAttempt taskRunAttempt) {
        return ImmutableList.builder().addAll(workerTask.getTaskRun().getAttempts() == null ? new ArrayList() : workerTask.getTaskRun().getAttempts()).add((Object)taskRunAttempt).build();
    }

    public AtomicInteger getMetricRunningCount(WorkerTask workerTask) {
        Object[] tags = this.metricRegistry.tags(workerTask, this.workerGroup, new String[0]);
        Arrays.sort(tags);
        long index = Hashing.hashToLong((String)String.join((CharSequence)"-", (CharSequence[])tags));
        return this.metricRunningCount.computeIfAbsent(index, l -> (AtomicInteger)this.metricRegistry.gauge("worker.running.count", "The number of tasks currently running inside the Worker", (Number)new AtomicInteger(0), this.metricRegistry.tags(workerTask, this.workerGroup, new String[0])));
    }

    @PreDestroy
    public void close() {
        if (this.shutdown.compareAndSet(false, true)) {
            this.closeWorker(this.serverConfig.terminationGracePeriod());
        }
    }

    @VisibleForTesting
    public void closeWorker(Duration timeout) {
        boolean terminatedGracefully;
        if (log.isDebugEnabled()) {
            log.debug("Terminating");
        }
        this.setState(Service.ServiceState.TERMINATING);
        try {
            this.workerJobQueue.close();
        }
        catch (IOException e) {
            log.error("Failed to close the WorkerJobQueue");
        }
        if (!this.skipGracefulTermination.get()) {
            terminatedGracefully = this.waitForTasksCompletion(timeout);
        } else {
            log.info("Terminating now and skip waiting for tasks completions.");
            this.receiveCancellations.forEach(Runnable::run);
            this.executorService.shutdownNow();
            this.closeQueue();
            terminatedGracefully = false;
        }
        Service.ServiceState state = terminatedGracefully ? Service.ServiceState.TERMINATED_GRACEFULLY : Service.ServiceState.TERMINATED_FORCED;
        this.setState(state);
        if (log.isDebugEnabled()) {
            log.debug("Closed ({}).", (Object)state.name());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean waitForTasksCompletion(Duration timeout) {
        ArrayList<AbstractWorkerCallable> callables;
        Instant deadline = Instant.now().plus(timeout);
        DefaultWorker defaultWorker = this;
        synchronized (defaultWorker) {
            callables = new ArrayList<AbstractWorkerCallable>(this.workerCallableReferences);
        }
        callables.forEach(AbstractWorkerCallable::signalStop);
        AtomicReference shutdownState = new AtomicReference();
        Thread.ofVirtual().name("worker-shutdown").start(() -> {
            try {
                this.receiveCancellations.forEach(Runnable::run);
                this.executorService.shutdown();
                long remaining = Math.max(0L, Instant.now().until(deadline, ChronoUnit.MILLIS));
                this.awaitForRealtimeTriggers(callables, Duration.ofMillis(remaining));
                boolean gracefullyShutdown = this.executorService.awaitTermination(remaining, TimeUnit.MILLISECONDS);
                if (!gracefullyShutdown) {
                    log.warn("Worker still has some pending threads after `terminationGracePeriod`. Forcing shutdown now.");
                    this.executorService.shutdownNow();
                }
                shutdownState.set(gracefullyShutdown ? Service.ServiceState.TERMINATED_GRACEFULLY : Service.ServiceState.TERMINATED_FORCED);
            }
            catch (InterruptedException e) {
                log.error("Failed to shutdown the worker. Thread was interrupted");
                shutdownState.set(Service.ServiceState.TERMINATED_FORCED);
            }
        });
        Await.until(() -> {
            Service.ServiceState serviceState = (Service.ServiceState)shutdownState.get();
            if (serviceState == Service.ServiceState.TERMINATED_FORCED || serviceState == Service.ServiceState.TERMINATED_GRACEFULLY) {
                log.info("All working threads are terminated.");
                this.closeQueue();
                return true;
            }
            if (this.workerCallableReferences.isEmpty()) {
                log.debug("All worker threads is terminated.");
            } else {
                log.warn("Waiting for all worker threads to terminate (remaining: {}).", (Object)this.workerCallableReferences.size());
            }
            return false;
        }, (Duration)Duration.ofSeconds(1L));
        return shutdownState.get() == Service.ServiceState.TERMINATED_GRACEFULLY;
    }

    private void awaitForRealtimeTriggers(List<AbstractWorkerCallable> callables, Duration timeout) {
        Instant deadline = Instant.now().plus(timeout);
        for (AbstractWorkerCallable callable : callables) {
            long remaining;
            WorkerTriggerRealtimeCallable t;
            if (!(callable instanceof WorkerTriggerRealtimeCallable) || (t = (WorkerTriggerRealtimeCallable)callable).awaitStop(Duration.ofMillis(remaining = Math.max(0L, Instant.now().until(deadline, ChronoUnit.MILLIS))))) continue;
            String type = t.getWorkerTrigger().getTrigger().getType();
            log.debug("Failed to stop trigger '{}' before timeout elapsed.", (Object)type);
            t.interrupt();
            this.logService.logTrigger((TriggerContext)t.getWorkerTrigger().getTriggerContext(), t.getWorkerTrigger().getConditionContext().getRunContext().logger(), Level.INFO, "Type {} interrupted", new Object[]{type});
        }
    }

    private void closeQueue() {
        try {
            this.workerTaskResultQueue.close();
            this.workerTriggerResultQueue.close();
        }
        catch (IOException e) {
            log.error("Failed to close the queue", (Throwable)e);
        }
    }

    @VisibleForTesting
    public void shutdown() {
        this.shutdown.compareAndSet(false, true);
        this.receiveCancellations.forEach(Runnable::run);
        this.executorService.shutdownNow();
    }

    public List<WorkerJob> getWorkerThreadTasks() {
        return this.workerCallableReferences.stream().map(Rethrow.throwFunction(workerCallable -> {
            if (workerCallable instanceof WorkerTaskCallable) {
                WorkerTaskCallable workerTaskCallable = (WorkerTaskCallable)workerCallable;
                return workerTaskCallable.workerTask;
            }
            if (workerCallable instanceof AbstractWorkerTriggerCallable) {
                AbstractWorkerTriggerCallable workerTriggerCallable = (AbstractWorkerTriggerCallable)workerCallable;
                return workerTriggerCallable.workerTrigger;
            }
            throw new IllegalArgumentException("Invalid Callable type: '" + workerCallable.getClass().getName() + "'");
        })).toList();
    }

    public void skipGracefulTermination(boolean skipGracefulTermination) {
        this.skipGracefulTermination.set(skipGracefulTermination);
    }

    public String getId() {
        return this.id;
    }

    public ServiceType getType() {
        return ServiceType.WORKER;
    }

    public Service.ServiceState getState() {
        return this.state.get();
    }

    @Generated
    public Map<Long, AtomicInteger> getMetricRunningCount() {
        return this.metricRunningCount;
    }

    @Generated
    public Map<String, AtomicInteger> getEvaluateTriggerRunningCount() {
        return this.evaluateTriggerRunningCount;
    }

    @Generated
    public String getWorkerGroup() {
        return this.workerGroup;
    }

    @Generated
    public Integer getNumThreads() {
        return this.numThreads;
    }
}

