/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.overlord.autoscaling;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.autoscaling.AbstractWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.AutoScalingData;
import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementUtil;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.Worker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class PendingTaskBasedWorkerResourceManagementStrategy
extends AbstractWorkerResourceManagementStrategy {
    private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerResourceManagementStrategy.class);
    private final PendingTaskBasedWorkerResourceManagementConfig config;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;
    private final ScalingStats scalingStats;
    private final Object lock = new Object();
    private final Set<String> currentlyProvisioning = Sets.newHashSet();
    private final Set<String> currentlyTerminating = Sets.newHashSet();
    private DateTime lastProvisionTime = new DateTime();
    private DateTime lastTerminateTime = new DateTime();

    @Inject
    public PendingTaskBasedWorkerResourceManagementStrategy(PendingTaskBasedWorkerResourceManagementConfig config, Supplier<WorkerBehaviorConfig> workerConfigRef, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorFactory factory) {
        this(config, workerConfigRef, resourceManagementSchedulerConfig, factory.create(1, "PendingTaskBasedResourceManagement-manager--%d"));
    }

    public PendingTaskBasedWorkerResourceManagementStrategy(PendingTaskBasedWorkerResourceManagementConfig config, Supplier<WorkerBehaviorConfig> workerConfigRef, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorService exec) {
        super(resourceManagementSchedulerConfig, exec);
        this.config = config;
        this.workerConfigRef = workerConfigRef;
        this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean doProvision(WorkerTaskRunner runner) {
        Collection<Task> pendingTasks = runner.getPendingTaskPayloads();
        Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
        Object object = this.lock;
        synchronized (object) {
            boolean didProvision = false;
            WorkerBehaviorConfig workerConfig = (WorkerBehaviorConfig)this.workerConfigRef.get();
            if (workerConfig == null || workerConfig.getAutoScaler() == null) {
                log.error("No workerConfig available, cannot provision new workers.", new Object[0]);
                return false;
            }
            Collection<String> workerNodeIds = PendingTaskBasedWorkerResourceManagementStrategy.getWorkerNodeIDs(Collections2.transform(workers, (Function)new Function<ImmutableWorkerInfo, Worker>(){

                public Worker apply(ImmutableWorkerInfo input) {
                    return input.getWorker();
                }
            }), workerConfig);
            this.currentlyProvisioning.removeAll(workerNodeIds);
            if (this.currentlyProvisioning.isEmpty()) {
                AutoScalingData provisioned;
                for (int want = this.getScaleUpNodeCount(runner.getConfig(), workerConfig, pendingTasks, workers); want > 0; want -= provisioned.getNodeIds().size()) {
                    Object newNodes;
                    provisioned = workerConfig.getAutoScaler().provision();
                    Object object2 = newNodes = provisioned == null ? ImmutableList.of() : provisioned.getNodeIds();
                    if (newNodes.isEmpty()) {
                        log.warn("NewNodes is empty, returning from provision loop", new Object[0]);
                        break;
                    }
                    this.currentlyProvisioning.addAll((Collection<String>)newNodes);
                    this.lastProvisionTime = new DateTime();
                    this.scalingStats.addProvisionEvent(provisioned);
                    didProvision = true;
                }
            } else {
                Duration durSinceLastProvision = new Duration((ReadableInstant)this.lastProvisionTime, (ReadableInstant)new DateTime());
                log.info("%s provisioning. Current wait time: %s", new Object[]{this.currentlyProvisioning, durSinceLastProvision});
                if (durSinceLastProvision.isLongerThan((ReadableDuration)this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node provisioning taking too long!", new Object[0]).addData("millisSinceLastProvision", (Object)durSinceLastProvision.getMillis()).addData("provisioningCount", (Object)this.currentlyProvisioning.size()).emit();
                    workerConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(this.currentlyProvisioning));
                    this.currentlyProvisioning.clear();
                }
            }
            return didProvision;
        }
    }

    private static Collection<String> getWorkerNodeIDs(Collection<Worker> workers, WorkerBehaviorConfig workerConfig) {
        return workerConfig.getAutoScaler().ipToIdLookup(Lists.newArrayList((Iterable)Iterables.transform(workers, (Function)new Function<Worker, String>(){

            public String apply(Worker input) {
                return input.getIp();
            }
        })));
    }

    int getScaleUpNodeCount(WorkerTaskRunnerConfig remoteTaskRunnerConfig, WorkerBehaviorConfig workerConfig, Collection<Task> pendingTasks, Collection<ImmutableWorkerInfo> workers) {
        int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
        int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
        Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(this.config);
        int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
        int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : this.getWorkersNeededToAssignTasks(remoteTaskRunnerConfig, workerConfig, pendingTasks, workers);
        int want = Math.max(minWorkerCount - currValidWorkers, Math.min(this.config.getMaxScalingStep(), moreWorkersNeeded));
        if (want > 0 && currValidWorkers >= maxWorkerCount) {
            log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", new Object[]{currValidWorkers, maxWorkerCount});
            return 0;
        }
        want = Math.min(want, maxWorkerCount - currValidWorkers);
        return want;
    }

    int getWorkersNeededToAssignTasks(WorkerTaskRunnerConfig workerTaskRunnerConfig, WorkerBehaviorConfig workerConfig, Collection<Task> pendingTasks, Collection<ImmutableWorkerInfo> workers) {
        Collection validWorkers = Collections2.filter(workers, ResourceManagementUtil.createValidWorkerPredicate(this.config));
        HashMap workersMap = Maps.newHashMap();
        for (ImmutableWorkerInfo worker : validWorkers) {
            workersMap.put(worker.getWorker().getHost(), worker);
        }
        WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
        int need = 0;
        int capacity = PendingTaskBasedWorkerResourceManagementStrategy.getExpectedWorkerCapacity(workers);
        for (Task task : pendingTasks) {
            ImmutableWorkerInfo workerRunningTask;
            Optional<ImmutableWorkerInfo> selectedWorker = workerSelectStrategy.findWorkerForTask(workerTaskRunnerConfig, (ImmutableMap<String, ImmutableWorkerInfo>)ImmutableMap.copyOf((Map)workersMap), task);
            if (selectedWorker.isPresent()) {
                workerRunningTask = (ImmutableWorkerInfo)selectedWorker.get();
            } else {
                workerRunningTask = PendingTaskBasedWorkerResourceManagementStrategy.createDummyWorker("dummy" + need, capacity, workerTaskRunnerConfig.getMinWorkerVersion());
                ++need;
            }
            workersMap.put(workerRunningTask.getWorker().getHost(), PendingTaskBasedWorkerResourceManagementStrategy.workerWithTask(workerRunningTask, task));
        }
        return need;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean doTerminate(WorkerTaskRunner runner) {
        Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
        Object object = this.lock;
        synchronized (object) {
            WorkerBehaviorConfig workerConfig = (WorkerBehaviorConfig)this.workerConfigRef.get();
            if (workerConfig == null) {
                log.warn("No workerConfig available, cannot terminate workers.", new Object[0]);
                return false;
            }
            if (!this.currentlyProvisioning.isEmpty()) {
                log.debug("Already provisioning nodes, Not Terminating any nodes.", new Object[0]);
                return false;
            }
            boolean didTerminate = false;
            Collection<String> workerNodeIds = PendingTaskBasedWorkerResourceManagementStrategy.getWorkerNodeIDs(runner.getLazyWorkers(), workerConfig);
            HashSet stillExisting = Sets.newHashSet();
            for (String s : this.currentlyTerminating) {
                if (!workerNodeIds.contains(s)) continue;
                stillExisting.add(s);
            }
            this.currentlyTerminating.clear();
            this.currentlyTerminating.addAll(stillExisting);
            if (this.currentlyTerminating.isEmpty()) {
                int maxWorkersToTerminate = this.maxWorkersToTerminate(zkWorkers, workerConfig);
                Predicate<ImmutableWorkerInfo> isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(this.config);
                ArrayList laziestWorkerIps = Lists.newArrayList((Iterable)Collections2.transform(runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate), (Function)new Function<Worker, String>(){

                    public String apply(Worker zkWorker) {
                        return zkWorker.getIp();
                    }
                }));
                if (laziestWorkerIps.isEmpty()) {
                    log.debug("Found no lazy workers", new Object[0]);
                } else {
                    log.info("Terminating %,d lazy workers: %s", new Object[]{laziestWorkerIps.size(), Joiner.on((String)", ").join((Iterable)laziestWorkerIps)});
                    AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps);
                    if (terminated != null) {
                        this.currentlyTerminating.addAll(terminated.getNodeIds());
                        this.lastTerminateTime = new DateTime();
                        this.scalingStats.addTerminateEvent(terminated);
                        didTerminate = true;
                    }
                }
            } else {
                Duration durSinceLastTerminate = new Duration((ReadableInstant)this.lastTerminateTime, (ReadableInstant)new DateTime());
                log.info("%s terminating. Current wait time: %s", new Object[]{this.currentlyTerminating, durSinceLastTerminate});
                if (durSinceLastTerminate.isLongerThan((ReadableDuration)this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node termination taking too long!", new Object[0]).addData("millisSinceLastTerminate", (Object)durSinceLastTerminate.getMillis()).addData("terminatingCount", (Object)this.currentlyTerminating.size()).emit();
                    this.currentlyTerminating.clear();
                }
            }
            return didTerminate;
        }
    }

    private int maxWorkersToTerminate(Collection<ImmutableWorkerInfo> zkWorkers, WorkerBehaviorConfig workerConfig) {
        Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(this.config);
        int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
        int invalidWorkers = zkWorkers.size() - currValidWorkers;
        int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers();
        return invalidWorkers + Math.max(0, Math.min(this.config.getMaxScalingStep(), currValidWorkers - minWorkers));
    }

    @Override
    public ScalingStats getStats() {
        return this.scalingStats;
    }

    private static int getExpectedWorkerCapacity(Collection<ImmutableWorkerInfo> workers) {
        int size = workers.size();
        if (size == 0) {
            return 1;
        }
        return workers.iterator().next().getWorker().getCapacity();
    }

    private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task) {
        return new ImmutableWorkerInfo(immutableWorker.getWorker(), immutableWorker.getCurrCapacityUsed() + 1, (Set<String>)Sets.union(immutableWorker.getAvailabilityGroups(), (Set)Sets.newHashSet((Object[])new String[]{task.getTaskResource().getAvailabilityGroup()})), (Collection<String>)Sets.union(immutableWorker.getRunningTasks(), (Set)Sets.newHashSet((Object[])new String[]{task.getId()})), DateTime.now());
    }

    private static ImmutableWorkerInfo createDummyWorker(String host, int capacity, String version) {
        return new ImmutableWorkerInfo(new Worker(host, "-2", capacity, version), 0, Sets.newHashSet(), Sets.newHashSet(), DateTime.now());
    }
}

