/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.stats.TDigest;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.ErrorCodes;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.NodeRequirements;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.PartitionMemoryEstimator;
import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory;
import io.trino.memory.ClusterMemoryManager;
import io.trino.memory.MemoryInfo;
import io.trino.memory.MemoryManagerConfig;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.memory.MemoryPoolInfo;
import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.assertj.core.util.VisibleForTesting;

@ThreadSafe
public class BinPackingNodeAllocatorService
implements NodeAllocatorService,
NodeAllocator,
PartitionMemoryEstimatorFactory {
    private static final Logger log = Logger.get(BinPackingNodeAllocatorService.class);
    @VisibleForTesting
    static final int PROCESS_PENDING_ACQUIRES_DELAY_SECONDS = 5;
    private final InternalNodeManager nodeManager;
    private final Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier;
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, Threads.daemonThreadsNamed((String)"bin-packing-node-allocator"));
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean stopped = new AtomicBoolean();
    private final Semaphore processSemaphore = new Semaphore(0);
    private final AtomicReference<Map<String, MemoryPoolInfo>> nodePoolMemoryInfos = new AtomicReference<ImmutableMap>(ImmutableMap.of());
    private final AtomicReference<Optional<DataSize>> maxNodePoolSize = new AtomicReference(Optional.empty());
    private final boolean scheduleOnCoordinator;
    private final boolean memoryRequirementIncreaseOnWorkerCrashEnabled;
    private final DataSize taskRuntimeMemoryEstimationOverhead;
    private final Ticker ticker;
    private final ConcurrentMap<String, Long> allocatedMemory = new ConcurrentHashMap<String, Long>();
    private final Deque<PendingAcquire> pendingAcquires = new ConcurrentLinkedDeque<PendingAcquire>();
    private final Set<BinPackingNodeLease> fulfilledAcquires = Sets.newConcurrentHashSet();
    private final Duration allowedNoMatchingNodePeriod;

    @Inject
    public BinPackingNodeAllocatorService(InternalNodeManager nodeManager, ClusterMemoryManager clusterMemoryManager, NodeSchedulerConfig nodeSchedulerConfig, MemoryManagerConfig memoryManagerConfig) {
        this(nodeManager, clusterMemoryManager::getWorkerMemoryInfo, nodeSchedulerConfig.isIncludeCoordinator(), memoryManagerConfig.isFaultTolerantExecutionMemoryRequirementIncreaseOnWorkerCrashEnabled(), Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()), memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(), Ticker.systemTicker());
    }

    @VisibleForTesting
    BinPackingNodeAllocatorService(InternalNodeManager nodeManager, Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier, boolean scheduleOnCoordinator, boolean memoryRequirementIncreaseOnWorkerCrashEnabled, Duration allowedNoMatchingNodePeriod, DataSize taskRuntimeMemoryEstimationOverhead, Ticker ticker) {
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.workerMemoryInfoSupplier = Objects.requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null");
        this.scheduleOnCoordinator = scheduleOnCoordinator;
        this.memoryRequirementIncreaseOnWorkerCrashEnabled = memoryRequirementIncreaseOnWorkerCrashEnabled;
        this.allowedNoMatchingNodePeriod = Objects.requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null");
        this.taskRuntimeMemoryEstimationOverhead = Objects.requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null");
        this.ticker = Objects.requireNonNull(ticker, "ticker is null");
    }

    @PostConstruct
    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.executor.schedule(() -> {
                while (!this.stopped.get()) {
                    try {
                        this.processSemaphore.tryAcquire(5L, TimeUnit.SECONDS);
                        this.processSemaphore.drainPermits();
                        this.processPendingAcquires();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    catch (Exception e) {
                        log.warn((Throwable)e, "Error updating nodes");
                    }
                }
            }, 0L, TimeUnit.SECONDS);
        }
        this.refreshNodePoolMemoryInfos();
        this.executor.scheduleWithFixedDelay(this::refreshNodePoolMemoryInfos, 1L, 1L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void stop() {
        this.stopped.set(true);
        this.executor.shutdownNow();
    }

    @VisibleForTesting
    void refreshNodePoolMemoryInfos() {
        ImmutableMap.Builder newNodePoolMemoryInfos = ImmutableMap.builder();
        Map<String, Optional<MemoryInfo>> workerMemoryInfos = this.workerMemoryInfoSupplier.get();
        long maxNodePoolSizeBytes = -1L;
        for (Map.Entry<String, Optional<MemoryInfo>> entry : workerMemoryInfos.entrySet()) {
            if (entry.getValue().isEmpty()) continue;
            MemoryPoolInfo poolInfo = entry.getValue().get().getPool();
            newNodePoolMemoryInfos.put((Object)entry.getKey(), (Object)poolInfo);
            maxNodePoolSizeBytes = Math.max(poolInfo.getMaxBytes(), maxNodePoolSizeBytes);
        }
        this.maxNodePoolSize.set(maxNodePoolSizeBytes == -1L ? Optional.empty() : Optional.of(DataSize.ofBytes((long)maxNodePoolSizeBytes)));
        this.nodePoolMemoryInfos.set((Map<String, MemoryPoolInfo>)newNodePoolMemoryInfos.buildOrThrow());
    }

    @VisibleForTesting
    synchronized void processPendingAcquires() {
        Iterator<PendingAcquire> iterator = this.pendingAcquires.iterator();
        BinPackingSimulation simulation = new BinPackingSimulation(this.nodeManager.getActiveNodesSnapshot(), this.nodePoolMemoryInfos.get(), this.fulfilledAcquires, this.allocatedMemory, this.scheduleOnCoordinator, this.taskRuntimeMemoryEstimationOverhead);
        while (iterator.hasNext()) {
            PendingAcquire pendingAcquire = iterator.next();
            if (pendingAcquire.getFuture().isCancelled()) {
                iterator.remove();
                continue;
            }
            BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire);
            switch (result.getStatus()) {
                case RESERVED: {
                    InternalNode reservedNode = result.getNode().orElseThrow();
                    this.fulfilledAcquires.add(pendingAcquire.getLease());
                    this.updateAllocatedMemory(reservedNode, pendingAcquire.getMemoryLease());
                    pendingAcquire.getFuture().set((Object)reservedNode);
                    if (pendingAcquire.getFuture().isCancelled()) {
                        pendingAcquire.getLease().deallocateMemory(reservedNode);
                        this.fulfilledAcquires.remove(pendingAcquire.getLease());
                        this.wakeupProcessPendingAcquires();
                    }
                    iterator.remove();
                    break;
                }
                case NONE_MATCHING: {
                    Duration noMatchingNodePeriod = pendingAcquire.markNoMatchingNodeFound();
                    if (noMatchingNodePeriod.compareTo(this.allowedNoMatchingNodePeriod) <= 0) break;
                    pendingAcquire.getFuture().setException((Throwable)new TrinoException((ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query"));
                    iterator.remove();
                    break;
                }
                case NOT_ENOUGH_RESOURCES_NOW: {
                    pendingAcquire.resetNoMatchingNodeFound();
                    break;
                }
                default: {
                    throw new IllegalArgumentException("unknown status: " + result.getStatus());
                }
            }
        }
    }

    private void wakeupProcessPendingAcquires() {
        this.processSemaphore.release();
    }

    @Override
    public NodeAllocator getNodeAllocator(Session session) {
        return this;
    }

    @Override
    public NodeAllocator.NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement) {
        BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes());
        PendingAcquire pendingAcquire = new PendingAcquire(nodeRequirements, memoryRequirement, nodeLease, this.ticker);
        this.pendingAcquires.add(pendingAcquire);
        this.wakeupProcessPendingAcquires();
        return nodeLease;
    }

    @Override
    public void close() {
    }

    private void updateAllocatedMemory(InternalNode node, long delta) {
        this.allocatedMemory.compute(node.getNodeIdentifier(), (key, oldValue) -> {
            long newValue;
            Verify.verify((delta > 0L || oldValue != null && oldValue >= -delta ? 1 : 0) != 0, (String)"tried to release more than allocated (%s vs %s) for node %s", (Object)(-delta), (Object)oldValue, (Object)key);
            long l = newValue = oldValue == null ? delta : oldValue + delta;
            if (newValue == 0L) {
                return null;
            }
            return newValue;
        });
    }

    @Override
    public PartitionMemoryEstimator createPartitionMemoryEstimator() {
        return new ExponentialGrowthPartitionMemoryEstimator();
    }

    private boolean shouldIncreaseMemoryRequirement(ErrorCode errorCode) {
        return ErrorCodes.isOutOfMemoryError(errorCode) || this.memoryRequirementIncreaseOnWorkerCrashEnabled && ErrorCodes.isWorkerCrashAssociatedError(errorCode);
    }

    private static class BinPackingSimulation {
        private final InternalNodeManager.NodesSnapshot nodesSnapshot;
        private final List<InternalNode> allNodesSorted;
        private final Map<String, Long> nodesRemainingMemory;
        private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;
        private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
        private final boolean scheduleOnCoordinator;

        public BinPackingSimulation(InternalNodeManager.NodesSnapshot nodesSnapshot, Map<String, MemoryPoolInfo> nodeMemoryPoolInfos, Set<BinPackingNodeLease> fulfilledAcquires, Map<String, Long> preReservedMemory, boolean scheduleOnCoordinator, DataSize taskRuntimeMemoryEstimationOverhead) {
            MemoryPoolInfo memoryPoolInfo;
            this.nodesSnapshot = Objects.requireNonNull(nodesSnapshot, "nodesSnapshot is null");
            this.allNodesSorted = (List)nodesSnapshot.getAllNodes().stream().sorted(Comparator.comparing(InternalNode::getNodeIdentifier)).collect(ImmutableList.toImmutableList());
            Objects.requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null");
            this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos);
            Objects.requireNonNull(preReservedMemory, "preReservedMemory is null");
            this.scheduleOnCoordinator = scheduleOnCoordinator;
            HashMap<String, Object> realtimeTasksMemoryPerNode = new HashMap<String, Object>();
            for (InternalNode node : nodesSnapshot.getAllNodes()) {
                MemoryPoolInfo memoryPoolInfo2 = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
                if (memoryPoolInfo2 == null) {
                    realtimeTasksMemoryPerNode.put(node.getNodeIdentifier(), ImmutableMap.of());
                    continue;
                }
                realtimeTasksMemoryPerNode.put(node.getNodeIdentifier(), memoryPoolInfo2.getTaskMemoryReservations());
            }
            HashMultimap fulfilledAcquiresByNode = HashMultimap.create();
            for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
                InternalNode node = fulfilledAcquire.getAssignedNode();
                fulfilledAcquiresByNode.put((Object)node.getNodeIdentifier(), (Object)fulfilledAcquire);
            }
            this.nodesRemainingMemory = new HashMap<String, Long>();
            for (InternalNode node : nodesSnapshot.getAllNodes()) {
                memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
                if (memoryPoolInfo == null) {
                    this.nodesRemainingMemory.put(node.getNodeIdentifier(), 0L);
                    continue;
                }
                long nodeReservedMemory = preReservedMemory.getOrDefault(node.getNodeIdentifier(), 0L);
                this.nodesRemainingMemory.put(node.getNodeIdentifier(), memoryPoolInfo.getMaxBytes() - nodeReservedMemory);
            }
            this.nodesRemainingMemoryRuntimeAdjusted = new HashMap<String, Long>();
            for (InternalNode node : nodesSnapshot.getAllNodes()) {
                memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
                if (memoryPoolInfo == null) {
                    this.nodesRemainingMemoryRuntimeAdjusted.put(node.getNodeIdentifier(), 0L);
                    continue;
                }
                Map realtimeNodeMemory = (Map)realtimeTasksMemoryPerNode.get(node.getNodeIdentifier());
                Set nodeFulfilledAcquires = fulfilledAcquiresByNode.get((Object)node.getNodeIdentifier());
                long nodeUsedMemoryRuntimeAdjusted = 0L;
                for (BinPackingNodeLease lease : nodeFulfilledAcquires) {
                    long realtimeTaskMemory = 0L;
                    if (lease.getAttachedTaskId().isPresent()) {
                        realtimeTaskMemory = realtimeNodeMemory.getOrDefault(lease.getAttachedTaskId().get().toString(), 0L);
                        realtimeTaskMemory += taskRuntimeMemoryEstimationOverhead.toBytes();
                    }
                    long reservedTaskMemory = lease.getMemoryLease();
                    nodeUsedMemoryRuntimeAdjusted += Math.max(realtimeTaskMemory, reservedTaskMemory);
                }
                nodeUsedMemoryRuntimeAdjusted = Math.max(nodeUsedMemoryRuntimeAdjusted, memoryPoolInfo.getReservedBytes());
                this.nodesRemainingMemoryRuntimeAdjusted.put(node.getNodeIdentifier(), memoryPoolInfo.getMaxBytes() - nodeUsedMemoryRuntimeAdjusted);
            }
        }

        public ReserveResult tryReserve(PendingAcquire acquire) {
            NodeRequirements requirements = acquire.getNodeRequirements();
            Optional<Set> catalogNodes = requirements.getCatalogHandle().map(this.nodesSnapshot::getConnectorNodes);
            List candidates = (List)this.allNodesSorted.stream().filter(node -> catalogNodes.isEmpty() || ((Set)catalogNodes.get()).contains(node)).filter(node -> {
                if (requirements.getAddresses().contains(node.getHostAndPort())) {
                    return true;
                }
                if (requirements.getAddresses().isEmpty()) {
                    return this.scheduleOnCoordinator || !node.isCoordinator();
                }
                return false;
            }).collect(ImmutableList.toImmutableList());
            if (candidates.isEmpty()) {
                return ReserveResult.NONE_MATCHING;
            }
            InternalNode selectedNode = candidates.stream().max(Comparator.comparing(node -> this.nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier()))).orElseThrow();
            if (this.nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || this.isNodeEmpty(selectedNode.getNodeIdentifier())) {
                this.subtractFromRemainingMemory(selectedNode.getNodeIdentifier(), acquire.getMemoryLease());
                return ReserveResult.reserved(selectedNode);
            }
            InternalNode fallbackNode = candidates.stream().max(Comparator.comparing(node -> this.nodesRemainingMemory.get(node.getNodeIdentifier()))).orElseThrow();
            this.subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease());
            return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
        }

        private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease) {
            this.nodesRemainingMemoryRuntimeAdjusted.compute(nodeIdentifier, (key, free) -> free - memoryLease);
            this.nodesRemainingMemory.compute(nodeIdentifier, (key, free) -> free - memoryLease);
        }

        private boolean isNodeEmpty(String nodeIdentifier) {
            return this.nodeMemoryPoolInfos.containsKey(nodeIdentifier) && this.nodesRemainingMemory.get(nodeIdentifier).equals(this.nodeMemoryPoolInfos.get(nodeIdentifier).getMaxBytes());
        }

        public static class ReserveResult {
            public static final ReserveResult NONE_MATCHING = new ReserveResult(ReservationStatus.NONE_MATCHING, Optional.empty());
            public static final ReserveResult NOT_ENOUGH_RESOURCES_NOW = new ReserveResult(ReservationStatus.NOT_ENOUGH_RESOURCES_NOW, Optional.empty());
            private final ReservationStatus status;
            private final Optional<InternalNode> node;

            public static ReserveResult reserved(InternalNode node) {
                return new ReserveResult(ReservationStatus.RESERVED, Optional.of(node));
            }

            private ReserveResult(ReservationStatus status, Optional<InternalNode> node) {
                this.status = Objects.requireNonNull(status, "status is null");
                this.node = Objects.requireNonNull(node, "node is null");
                Preconditions.checkArgument((node.isPresent() == (status == ReservationStatus.RESERVED) ? 1 : 0) != 0, (Object)"node must be set iff status is RESERVED");
            }

            public ReservationStatus getStatus() {
                return this.status;
            }

            public Optional<InternalNode> getNode() {
                return this.node;
            }
        }

        public static enum ReservationStatus {
            NONE_MATCHING,
            NOT_ENOUGH_RESOURCES_NOW,
            RESERVED;

        }
    }

    private static class PendingAcquire {
        private final NodeRequirements nodeRequirements;
        private final DataSize memoryRequirement;
        private final BinPackingNodeLease lease;
        private final Stopwatch noMatchingNodeStopwatch;

        private PendingAcquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, BinPackingNodeLease lease, Ticker ticker) {
            this.nodeRequirements = Objects.requireNonNull(nodeRequirements, "nodeRequirements is null");
            this.memoryRequirement = Objects.requireNonNull(memoryRequirement, "memoryRequirement is null");
            this.lease = Objects.requireNonNull(lease, "lease is null");
            this.noMatchingNodeStopwatch = Stopwatch.createUnstarted((Ticker)ticker);
        }

        public NodeRequirements getNodeRequirements() {
            return this.nodeRequirements;
        }

        public BinPackingNodeLease getLease() {
            return this.lease;
        }

        public SettableFuture<InternalNode> getFuture() {
            return this.lease.getNodeSettableFuture();
        }

        public long getMemoryLease() {
            return this.memoryRequirement.toBytes();
        }

        public Duration markNoMatchingNodeFound() {
            if (!this.noMatchingNodeStopwatch.isRunning()) {
                this.noMatchingNodeStopwatch.start();
            }
            return this.noMatchingNodeStopwatch.elapsed();
        }

        public void resetNoMatchingNodeFound() {
            this.noMatchingNodeStopwatch.reset();
        }
    }

    private class BinPackingNodeLease
    implements NodeAllocator.NodeLease {
        private final SettableFuture<InternalNode> node = SettableFuture.create();
        private final AtomicBoolean released = new AtomicBoolean();
        private final AtomicBoolean memoryDeallocated = new AtomicBoolean();
        private final long memoryLease;
        private final AtomicReference<TaskId> taskId = new AtomicReference();

        private BinPackingNodeLease(long memoryLease) {
            this.memoryLease = memoryLease;
        }

        @Override
        public ListenableFuture<InternalNode> getNode() {
            return this.node;
        }

        InternalNode getAssignedNode() {
            try {
                return (InternalNode)Futures.getDone(this.node);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        SettableFuture<InternalNode> getNodeSettableFuture() {
            return this.node;
        }

        @Override
        public void attachTaskId(TaskId taskId) {
            if (!this.taskId.compareAndSet(null, taskId)) {
                throw new IllegalStateException("cannot attach taskId " + taskId + "; already attached to " + this.taskId.get());
            }
        }

        public Optional<TaskId> getAttachedTaskId() {
            return Optional.ofNullable(this.taskId.get());
        }

        public long getMemoryLease() {
            return this.memoryLease;
        }

        @Override
        public void release() {
            if (this.released.compareAndSet(false, true)) {
                this.node.cancel(true);
                if (this.node.isDone() && !this.node.isCancelled()) {
                    this.deallocateMemory((InternalNode)MoreFutures.getFutureValue(this.node));
                    Preconditions.checkState((boolean)BinPackingNodeAllocatorService.this.fulfilledAcquires.remove(this), (String)"node lease %s not found in fulfilledAcquires %s", (Object)this, BinPackingNodeAllocatorService.this.fulfilledAcquires);
                    BinPackingNodeAllocatorService.this.wakeupProcessPendingAcquires();
                }
            } else {
                throw new IllegalStateException("Node " + this.node + " already released");
            }
        }

        public void deallocateMemory(InternalNode node) {
            if (this.memoryDeallocated.compareAndSet(false, true)) {
                BinPackingNodeAllocatorService.this.updateAllocatedMemory(node, -this.memoryLease);
            }
        }
    }

    private class ExponentialGrowthPartitionMemoryEstimator
    implements PartitionMemoryEstimator {
        private final TDigest memoryUsageDistribution = new TDigest();

        private ExponentialGrowthPartitionMemoryEstimator() {
        }

        @Override
        public PartitionMemoryEstimator.MemoryRequirements getInitialMemoryRequirements(Session session, DataSize defaultMemoryLimit) {
            DataSize memory = (DataSize)Ordering.natural().max((Object)defaultMemoryLimit, (Object)this.getEstimatedMemoryUsage(session));
            memory = this.capMemoryToMaxNodeSize(memory);
            return new PartitionMemoryEstimator.MemoryRequirements(memory);
        }

        @Override
        public PartitionMemoryEstimator.MemoryRequirements getNextRetryMemoryRequirements(Session session, PartitionMemoryEstimator.MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode) {
            DataSize previousMemory = previousMemoryRequirements.getRequiredMemory();
            DataSize newMemory = (DataSize)Ordering.natural().max((Object)peakMemoryUsage, (Object)previousMemory);
            if (BinPackingNodeAllocatorService.this.shouldIncreaseMemoryRequirement(errorCode)) {
                double growthFactor = SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor(session);
                newMemory = DataSize.of((long)((long)((double)newMemory.toBytes() * growthFactor)), (DataSize.Unit)DataSize.Unit.BYTE);
            }
            newMemory = (DataSize)Ordering.natural().max((Object)newMemory, (Object)this.getEstimatedMemoryUsage(session));
            newMemory = this.capMemoryToMaxNodeSize(newMemory);
            return new PartitionMemoryEstimator.MemoryRequirements(newMemory);
        }

        private DataSize capMemoryToMaxNodeSize(DataSize memory) {
            Optional<DataSize> currentMaxNodePoolSize = BinPackingNodeAllocatorService.this.maxNodePoolSize.get();
            if (currentMaxNodePoolSize.isEmpty()) {
                return memory;
            }
            return (DataSize)Ordering.natural().min((Object)memory, (Object)currentMaxNodePoolSize.get());
        }

        @Override
        public synchronized void registerPartitionFinished(Session session, PartitionMemoryEstimator.MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional<ErrorCode> errorCode) {
            if (success) {
                this.memoryUsageDistribution.add((double)peakMemoryUsage.toBytes());
            }
            if (!success && errorCode.isPresent() && BinPackingNodeAllocatorService.this.shouldIncreaseMemoryRequirement(errorCode.get())) {
                double growthFactor = SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor(session);
                long previousRequiredBytes = previousMemoryRequirements.getRequiredMemory().toBytes();
                long previousPeakBytes = peakMemoryUsage.toBytes();
                this.memoryUsageDistribution.add((double)Math.max(previousRequiredBytes, previousPeakBytes) * growthFactor);
            }
        }

        private synchronized DataSize getEstimatedMemoryUsage(Session session) {
            double estimationQuantile = SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile(session);
            double estimation = this.memoryUsageDistribution.valueAt(estimationQuantile);
            if (Double.isNaN(estimation)) {
                return DataSize.ofBytes((long)0L);
            }
            return DataSize.ofBytes((long)((long)estimation));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private String memoryUsageDistributionInfo() {
            List values;
            ImmutableList quantiles = ImmutableList.of((Object)0.01, (Object)0.05, (Object)0.1, (Object)0.2, (Object)0.5, (Object)0.8, (Object)0.9, (Object)0.95, (Object)0.99);
            ExponentialGrowthPartitionMemoryEstimator exponentialGrowthPartitionMemoryEstimator = this;
            synchronized (exponentialGrowthPartitionMemoryEstimator) {
                values = this.memoryUsageDistribution.valuesAt((List)quantiles);
            }
            return Streams.zip(quantiles.stream(), values.stream(), (quantile, value) -> quantile + "=" + value).collect(Collectors.joining(", ", "[", "]"));
        }

        public String toString() {
            return "memoryUsageDistribution=" + this.memoryUsageDistributionInfo();
        }
    }
}

