/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.core.runners;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Rethrow;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;

@Singleton
public class RunnerUtils {
    public static final Duration DEFAULT_MAX_WAIT_DURATION = Duration.ofSeconds(15L);
    @Inject
    @Named(value="executionQueue")
    protected QueueInterface<Execution> executionQueue;
    @Inject
    private FlowRepositoryInterface flowRepository;
    @Inject
    private ExecutionService executionService;

    public Execution runOne(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException {
        return this.runOne(tenantId, namespace, flowId, null, null, null, null);
    }

    public Execution runOne(String tenantId, String namespace, String flowId, Integer revision) throws TimeoutException, QueueException {
        return this.runOne(tenantId, namespace, flowId, revision, null, null, null);
    }

    public Execution runOne(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs) throws TimeoutException, QueueException {
        return this.runOne(tenantId, namespace, flowId, revision, inputs, null, null);
    }

    public Execution runOne(String tenantId, String namespace, String flowId, Duration duration) throws TimeoutException, QueueException {
        return this.runOne(tenantId, namespace, flowId, null, null, duration, null);
    }

    public Execution runOne(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException, QueueException {
        return this.runOne(tenantId, namespace, flowId, revision, inputs, duration, null);
    }

    public Execution runOne(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels) throws TimeoutException, QueueException {
        return this.runOne(this.flowRepository.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty()).orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")), inputs, duration, labels);
    }

    public Execution runOne(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs) throws TimeoutException, QueueException {
        return this.runOne(flow, inputs, null, null);
    }

    public Execution runOne(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException, QueueException {
        return this.runOne(flow, inputs, duration, null);
    }

    public Execution runOne(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels) throws TimeoutException, QueueException {
        if (duration == null) {
            duration = Duration.ofSeconds(15L);
        }
        Execution execution = Execution.newExecution(flow, inputs, labels, Optional.empty());
        return this.runOne(execution, flow, duration);
    }

    public Execution runOne(Execution execution, Flow flow, Duration duration) throws TimeoutException, QueueException {
        return this.awaitExecution(this.isTerminatedExecution(execution, flow), Rethrow.throwRunnable(() -> this.executionQueue.emit(execution)), duration);
    }

    public Execution runOneUntilPaused(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException {
        return this.runOneUntilPaused(tenantId, namespace, flowId, null, null, null);
    }

    public Execution runOneUntilPaused(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException, QueueException {
        return this.runOneUntilPaused(this.flowRepository.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty()).orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")), inputs, duration);
    }

    public Execution runOneUntilPaused(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException, QueueException {
        if (duration == null) {
            duration = DEFAULT_MAX_WAIT_DURATION;
        }
        Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());
        return this.awaitExecution(this.isPausedExecution(execution), Rethrow.throwRunnable(() -> this.executionQueue.emit(execution)), duration);
    }

    public Execution runOneUntilRunning(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException {
        return this.runOneUntilRunning(tenantId, namespace, flowId, null, null, null);
    }

    public Execution runOneUntilRunning(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException, QueueException {
        return this.runOneUntilRunning(this.flowRepository.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty()).orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")), inputs, duration);
    }

    public Execution runOneUntilRunning(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException, QueueException {
        if (duration == null) {
            duration = DEFAULT_MAX_WAIT_DURATION;
        }
        Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());
        return this.awaitExecution(this.isRunningExecution(execution), Rethrow.throwRunnable(() -> this.executionQueue.emit(execution)), duration);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public Execution awaitExecution(Predicate<Execution> predicate, Runnable executionEmitter, Duration duration) throws TimeoutException {
        AtomicReference receive = new AtomicReference();
        Runnable cancel = this.executionQueue.receive(null, current -> {
            if (predicate.test((Execution)current.getLeft())) {
                receive.set((Execution)current.getLeft());
            }
        }, false);
        try {
            executionEmitter.run();
            if (duration == null) {
                Await.until(() -> receive.get() != null, Duration.ofMillis(10L));
            } else {
                Await.until(() -> receive.get() != null, Duration.ofMillis(10L), duration);
            }
        }
        finally {
            cancel.run();
        }
        return (Execution)receive.get();
    }

    @VisibleForTesting
    public Execution awaitChildExecution(Flow flow, Execution parentExecution, Runnable executionEmitter, Duration duration) throws TimeoutException {
        return this.awaitExecution(this.isTerminatedChildExecution(parentExecution, flow), executionEmitter, duration);
    }

    private Predicate<Execution> isTerminatedExecution(Execution execution, Flow flow) {
        return e -> e.getId().equals(execution.getId()) && this.executionService.isTerminated(flow, (Execution)e);
    }

    private Predicate<Execution> isPausedExecution(Execution execution) {
        return e -> e.getId().equals(execution.getId()) && e.getState().isPaused() && e.getTaskRunList() != null && e.getTaskRunList().stream().anyMatch(t -> t.getState().isPaused());
    }

    private Predicate<Execution> isRunningExecution(Execution execution) {
        return e -> e.getId().equals(execution.getId()) && e.getState().isRunning() && e.getTaskRunList() != null && e.getTaskRunList().stream().anyMatch(t -> t.getState().isRunning());
    }

    private Predicate<Execution> isTerminatedChildExecution(Execution parentExecution, Flow flow) {
        return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && this.executionService.isTerminated(flow, (Execution)e);
    }
}

