/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.concurrent.Threads;
import io.airlift.units.Duration;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.metadata.AllNodes;
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.weakref.jmx.Managed;

public class ClusterSizeMonitor {
    private final InternalNodeManager nodeManager;
    private final boolean includeCoordinator;
    private final ScheduledExecutorService executor;
    private final Consumer<AllNodes> listener = this::updateAllNodes;
    @GuardedBy(value="this")
    private int currentCount;
    @GuardedBy(value="this")
    private final PriorityQueue<MinNodesFuture> futuresQueue = new PriorityQueue<MinNodesFuture>(Comparator.comparing(MinNodesFuture::executionMinCount));

    @Inject
    public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig) {
        this(nodeManager, nodeSchedulerConfig.isIncludeCoordinator());
    }

    public ClusterSizeMonitor(InternalNodeManager nodeManager, boolean includeCoordinator) {
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.includeCoordinator = includeCoordinator;
        this.executor = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed((String)"node-monitor-%s"));
    }

    @PostConstruct
    public void start() {
        this.nodeManager.addNodeChangeListener(this.listener);
        this.updateAllNodes(this.nodeManager.getAllNodes());
    }

    @PreDestroy
    public void stop() {
        this.nodeManager.removeNodeChangeListener(this.listener);
        this.executor.shutdown();
    }

    public synchronized ListenableFuture<Void> waitForMinimumWorkers(int executionMinCount, Duration executionMaxWait) {
        Preconditions.checkArgument((executionMinCount > 0 ? 1 : 0) != 0, (Object)"executionMinCount should be greater than 0");
        Objects.requireNonNull(executionMaxWait, "executionMaxWait is null");
        if (this.currentCount >= executionMinCount) {
            return Futures.immediateVoidFuture();
        }
        SettableFuture future = SettableFuture.create();
        MinNodesFuture minNodesFuture = new MinNodesFuture(executionMinCount, (SettableFuture<Void>)future);
        this.futuresQueue.add(minNodesFuture);
        ScheduledFuture<?> timeoutTask = this.executor.schedule(() -> {
            ClusterSizeMonitor clusterSizeMonitor = this;
            synchronized (clusterSizeMonitor) {
                future.setException((Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES, String.format("Insufficient active worker nodes. Waited %s for at least %s workers, but only %s workers are active", executionMaxWait, executionMinCount, this.currentCount)));
            }
        }, executionMaxWait.toMillis(), TimeUnit.MILLISECONDS);
        future.addListener(() -> {
            timeoutTask.cancel(true);
            this.removeFuture(minNodesFuture);
        }, (Executor)this.executor);
        return future;
    }

    private synchronized void removeFuture(MinNodesFuture minNodesFuture) {
        this.futuresQueue.remove(minNodesFuture);
    }

    private synchronized void updateAllNodes(AllNodes allNodes) {
        MinNodesFuture minNodesFuture;
        this.currentCount = this.includeCoordinator ? allNodes.getActiveNodes().size() : Sets.difference(allNodes.getActiveNodes(), allNodes.getActiveCoordinators()).size();
        ImmutableList.Builder listenersBuilder = ImmutableList.builder();
        while (!this.futuresQueue.isEmpty() && (minNodesFuture = this.futuresQueue.peek()).executionMinCount() <= this.currentCount) {
            listenersBuilder.add(minNodesFuture.future());
            Preconditions.checkState((this.futuresQueue.poll() == minNodesFuture ? 1 : 0) != 0, (Object)"Unexpected modifications to MinNodesFuture queue");
        }
        ImmutableList listeners = listenersBuilder.build();
        this.executor.submit(() -> ClusterSizeMonitor.lambda$updateAllNodes$0((List)listeners));
    }

    @Managed
    public synchronized int getRequiredWorkers() {
        return this.futuresQueue.stream().map(MinNodesFuture::executionMinCount).max(Integer::compareTo).orElse(0);
    }

    private static /* synthetic */ void lambda$updateAllNodes$0(List listeners) {
        listeners.forEach(listener -> listener.set(null));
    }

    private record MinNodesFuture(int executionMinCount, SettableFuture<Void> future) {
        MinNodesFuture {
            Objects.requireNonNull(future, "future is null");
        }
    }
}

