/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.queueing;

import com.google.common.annotations.VisibleForTesting;
import com.wavefront.agent.data.DataSubmissionTask;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.queueing.QueueProcessor;
import com.wavefront.agent.queueing.TaskQueue;
import com.wavefront.common.Managed;
import com.wavefront.common.NamedThreadFactory;
import com.wavefront.common.Pair;
import com.wavefront.common.TaggedMetricName;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

public class QueueController<T extends DataSubmissionTask<T>>
extends TimerTask
implements Managed {
    private static final Logger logger = Logger.getLogger(QueueController.class.getCanonicalName());
    private static final int TIME_DIFF_THRESHOLD_SECS = 60;
    private static final double MIN_ADJ_FACTOR = 0.25;
    private static final double MAX_ADJ_FACTOR = 1.5;
    private static ScheduledExecutorService executor;
    protected final HandlerKey handlerKey;
    protected final List<QueueProcessor<T>> processorTasks;
    protected final Supplier<Long> timeProvider;
    private final Consumer<Integer> backlogSizeSink;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final AtomicLong currentWeight = new AtomicLong();
    private final AtomicInteger queueSize = new AtomicInteger();

    public QueueController(HandlerKey handlerKey, List<QueueProcessor<T>> processorTasks, @Nullable Consumer<Integer> backlogSizeSink) {
        this(handlerKey, processorTasks, backlogSizeSink, System::currentTimeMillis);
    }

    QueueController(HandlerKey handlerKey, List<QueueProcessor<T>> processorTasks, @Nullable Consumer<Integer> backlogSizeSink, Supplier<Long> timeProvider) {
        this.handlerKey = handlerKey;
        this.processorTasks = processorTasks;
        this.backlogSizeSink = backlogSizeSink;
        this.timeProvider = timeProvider == null ? System::currentTimeMillis : timeProvider;
        Metrics.newGauge((MetricName)new TaggedMetricName("buffer", "task-count", new String[]{"port", handlerKey.getHandle(), "content", handlerKey.getEntityType().toString()}), (Gauge)new Gauge<Integer>(){

            public Integer value() {
                return QueueController.this.queueSize.get();
            }
        });
        Metrics.newGauge((MetricName)new TaggedMetricName("buffer", handlerKey.getEntityType() + "-count", new String[]{"port", handlerKey.getHandle()}), (Gauge)new Gauge<Long>(){

            public Long value() {
                return QueueController.this.currentWeight.get();
            }
        });
    }

    @VisibleForTesting
    static <T extends DataSubmissionTask<T>> void adjustTimingFactors(List<QueueProcessor<T>> processors) {
        List<Pair> sortedProcessors = processors.stream().map(x -> new Pair(x, (Object)x.getHeadTaskTimestamp())).filter(x -> (Long)x._2 < Long.MAX_VALUE).sorted(Comparator.comparing(o -> (Long)o._2)).collect(Collectors.toList());
        if (sortedProcessors.size() > 1) {
            long minTs = (Long)((Pair)sortedProcessors.get((int)0))._2;
            long maxTs = (Long)((Pair)sortedProcessors.get((int)(sortedProcessors.size() - 1)))._2;
            if (maxTs - minTs > 60000L) {
                sortedProcessors.forEach(x -> ((QueueProcessor)x._1).setTimingFactor(0.25 + (double)((Long)x._2 - minTs) / (double)(maxTs - minTs) * 1.25));
            } else {
                processors.forEach(x -> x.setTimingFactor(1.0));
            }
        }
    }

    @Override
    public void run() {
        int backlog = this.processorTasks.stream().mapToInt(x -> x.getTaskQueue().size()).sum();
        if (this.backlogSizeSink != null) {
            this.backlogSizeSink.accept(backlog);
        }
        QueueController.adjustTimingFactors(this.processorTasks);
    }

    private void printQueueStats() {
        int backlog = this.processorTasks.stream().mapToInt(x -> x.getTaskQueue().size()).sum();
        this.queueSize.set(backlog);
        long actualWeight = 0L;
        for (QueueProcessor<T> task : this.processorTasks) {
            TaskQueue<T> taskQueue = task.getTaskQueue();
            if (taskQueue == null || taskQueue.weight() == null) continue;
            actualWeight += taskQueue.weight().longValue();
        }
        long previousWeight = this.currentWeight.getAndSet(actualWeight);
        if (previousWeight != 0L || actualWeight != 0L) {
            logger.info("[" + this.handlerKey.getHandle() + "] " + this.handlerKey.getEntityType() + " backlog status: " + this.queueSize + " tasks, " + this.currentWeight + " " + this.handlerKey.getEntityType());
            if (actualWeight == 0L) {
                logger.info("[" + this.handlerKey.getHandle() + "] " + this.handlerKey.getEntityType() + " backlog has been cleared!");
            }
        }
    }

    @Override
    public void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            if (executor == null || executor.isShutdown()) {
                executor = Executors.newScheduledThreadPool(2, (ThreadFactory)new NamedThreadFactory("QueueController"));
            }
            executor.scheduleAtFixedRate(this::run, 1L, 1L, TimeUnit.SECONDS);
            executor.scheduleAtFixedRate(this::printQueueStats, 10L, 10L, TimeUnit.SECONDS);
            this.processorTasks.forEach(QueueProcessor::start);
        }
    }

    @Override
    public void stop() {
        if (this.isRunning.compareAndSet(true, false)) {
            executor.shutdown();
            this.processorTasks.forEach(QueueProcessor::stop);
        }
    }

    public void truncateBuffers() {
        this.processorTasks.forEach(tQueueProcessor -> {
            logger.info("[" + this.handlerKey.getHandle() + "] " + this.handlerKey.getEntityType() + "-- size before truncate: " + tQueueProcessor.getTaskQueue().size());
            try {
                tQueueProcessor.getTaskQueue().clear();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            logger.info("[" + this.handlerKey.getHandle() + "] " + this.handlerKey.getEntityType() + "--> size after truncate: " + tQueueProcessor.getTaskQueue().size());
        });
    }
}

