/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.trino.Session;
import io.trino.connector.CatalogName;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.NodeRequirements;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSelector;
import io.trino.metadata.InternalNode;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

@ThreadSafe
public class FixedCountNodeAllocatorService
implements NodeAllocatorService {
    private static final Logger log = Logger.get(FixedCountNodeAllocatorService.class);
    private static final int MAXIMUM_ALLOCATIONS_PER_NODE = 1;
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, Threads.daemonThreadsNamed((String)"fixed-count-node-allocator"));
    private final NodeScheduler nodeScheduler;
    private final Set<FixedCountNodeAllocator> allocators = Sets.newConcurrentHashSet();
    private final AtomicBoolean started = new AtomicBoolean();

    @Inject
    public FixedCountNodeAllocatorService(NodeScheduler nodeScheduler) {
        this.nodeScheduler = Objects.requireNonNull(nodeScheduler, "nodeScheduler is null");
    }

    @PostConstruct
    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            return;
        }
        this.executor.scheduleWithFixedDelay(() -> {
            try {
                this.updateNodes();
            }
            catch (Throwable e) {
                log.warn(e, "Error updating nodes");
            }
        }, 5L, 5L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void stop() {
        this.executor.shutdownNow();
    }

    @VisibleForTesting
    void updateNodes() {
        this.allocators.forEach(FixedCountNodeAllocator::updateNodes);
    }

    @Override
    public NodeAllocator getNodeAllocator(Session session) {
        Objects.requireNonNull(session, "session is null");
        return this.getNodeAllocator(session, 1);
    }

    @VisibleForTesting
    NodeAllocator getNodeAllocator(Session session, int maximumAllocationsPerNode) {
        FixedCountNodeAllocator allocator = new FixedCountNodeAllocator(session, maximumAllocationsPerNode);
        this.allocators.add(allocator);
        return allocator;
    }

    private static class PendingAcquire {
        private final NodeRequirements nodeRequirements;
        private final SettableFuture<InternalNode> future;

        private PendingAcquire(NodeRequirements nodeRequirements, SettableFuture<InternalNode> future) {
            this.nodeRequirements = Objects.requireNonNull(nodeRequirements, "nodeRequirements is null");
            this.future = Objects.requireNonNull(future, "future is null");
        }

        public NodeRequirements getNodeRequirements() {
            return this.nodeRequirements;
        }

        public SettableFuture<InternalNode> getFuture() {
            return this.future;
        }
    }

    private class FixedCountNodeAllocator
    implements NodeAllocator {
        private final Session session;
        private final int maximumAllocationsPerNode;
        @GuardedBy(value="this")
        private final Map<Optional<CatalogName>, NodeSelector> nodeSelectorCache = new HashMap<Optional<CatalogName>, NodeSelector>();
        @GuardedBy(value="this")
        private final Map<InternalNode, Integer> allocationCountMap = new HashMap<InternalNode, Integer>();
        @GuardedBy(value="this")
        private final List<PendingAcquire> pendingAcquires = new LinkedList<PendingAcquire>();

        public FixedCountNodeAllocator(Session session, int maximumAllocationsPerNode) {
            this.session = Objects.requireNonNull(session, "session is null");
            this.maximumAllocationsPerNode = maximumAllocationsPerNode;
        }

        @Override
        public synchronized NodeAllocator.NodeLease acquire(NodeRequirements requirements) {
            try {
                Optional<InternalNode> node = this.tryAcquireNode(requirements);
                if (node.isPresent()) {
                    return new FixedCountNodeLease((ListenableFuture<InternalNode>)Futures.immediateFuture((Object)node.get()));
                }
            }
            catch (RuntimeException e) {
                return new FixedCountNodeLease((ListenableFuture<InternalNode>)Futures.immediateFailedFuture((Throwable)e));
            }
            SettableFuture future = SettableFuture.create();
            PendingAcquire pendingAcquire = new PendingAcquire(requirements, (SettableFuture<InternalNode>)future);
            this.pendingAcquires.add(pendingAcquire);
            return new FixedCountNodeLease((ListenableFuture<InternalNode>)future);
        }

        public void updateNodes() {
            this.processPendingAcquires();
        }

        private synchronized Optional<InternalNode> tryAcquireNode(NodeRequirements requirements) {
            NodeSelector nodeSelector = this.nodeSelectorCache.computeIfAbsent(requirements.getCatalogName(), catalogName -> FixedCountNodeAllocatorService.this.nodeScheduler.createNodeSelector(this.session, (Optional<CatalogName>)catalogName));
            List<InternalNode> nodes = nodeSelector.allNodes();
            if (nodes.isEmpty()) {
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query");
            }
            List nodesMatchingRequirements = (List)nodes.stream().filter(node -> requirements.getAddresses().isEmpty() || requirements.getAddresses().contains(node.getHostAndPort())).collect(ImmutableList.toImmutableList());
            if (nodesMatchingRequirements.isEmpty()) {
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query");
            }
            Optional<InternalNode> selectedNode = nodesMatchingRequirements.stream().filter(node -> this.allocationCountMap.getOrDefault(node, 0) < this.maximumAllocationsPerNode).min(Comparator.comparing(node -> this.allocationCountMap.getOrDefault(node, 0)));
            if (selectedNode.isEmpty()) {
                return Optional.empty();
            }
            this.allocationCountMap.compute(selectedNode.get(), (key, value) -> value == null ? 1 : value + 1);
            return selectedNode;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void releaseNode(InternalNode node) {
            FixedCountNodeAllocator fixedCountNodeAllocator = this;
            synchronized (fixedCountNodeAllocator) {
                int allocationCount = this.allocationCountMap.compute(node, (key, value) -> value == null ? 0 : value - 1);
                Preconditions.checkState((allocationCount >= 0 ? 1 : 0) != 0, (String)"allocation count for node %s is expected to be greater than or equal to zero: %s", (Object)node, (int)allocationCount);
            }
            this.processPendingAcquires();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processPendingAcquires() {
            Verify.verify((!Thread.holdsLock(this) ? 1 : 0) != 0);
            IdentityHashMap<PendingAcquire, InternalNode> assignedNodes = new IdentityHashMap<PendingAcquire, InternalNode>();
            IdentityHashMap<PendingAcquire, RuntimeException> failures = new IdentityHashMap<PendingAcquire, RuntimeException>();
            FixedCountNodeAllocator fixedCountNodeAllocator = this;
            synchronized (fixedCountNodeAllocator) {
                Iterator<PendingAcquire> iterator = this.pendingAcquires.iterator();
                while (iterator.hasNext()) {
                    PendingAcquire pendingAcquire2 = iterator.next();
                    if (pendingAcquire2.getFuture().isCancelled()) {
                        iterator.remove();
                        continue;
                    }
                    try {
                        Optional<InternalNode> node2 = this.tryAcquireNode(pendingAcquire2.getNodeRequirements());
                        if (!node2.isPresent()) continue;
                        iterator.remove();
                        assignedNodes.put(pendingAcquire2, node2.get());
                    }
                    catch (RuntimeException e) {
                        iterator.remove();
                        failures.put(pendingAcquire2, e);
                    }
                }
            }
            assignedNodes.forEach((pendingAcquire, node) -> {
                SettableFuture<InternalNode> future = pendingAcquire.getFuture();
                future.set(node);
                if (future.isCancelled()) {
                    this.releaseNode((InternalNode)node);
                }
            });
            failures.forEach((pendingAcquire, failure) -> {
                SettableFuture<InternalNode> future = pendingAcquire.getFuture();
                future.setException((Throwable)failure);
            });
        }

        @Override
        public synchronized void close() {
            FixedCountNodeAllocatorService.this.allocators.remove(this);
        }

        private class FixedCountNodeLease
        implements NodeAllocator.NodeLease {
            private final ListenableFuture<InternalNode> node;
            private final AtomicBoolean released = new AtomicBoolean();

            private FixedCountNodeLease(ListenableFuture<InternalNode> node) {
                this.node = Objects.requireNonNull(node, "node is null");
            }

            @Override
            public ListenableFuture<InternalNode> getNode() {
                return this.node;
            }

            @Override
            public void release() {
                if (this.released.compareAndSet(false, true)) {
                    this.node.cancel(true);
                    if (this.node.isDone() && !this.node.isCancelled()) {
                        FixedCountNodeAllocator.this.releaseNode((InternalNode)MoreFutures.getFutureValue(this.node));
                    }
                } else {
                    throw new IllegalStateException("Node " + this.node + " already released");
                }
            }
        }
    }
}

