/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.queueing;

import com.google.common.annotations.VisibleForTesting;
import com.wavefront.agent.api.APIContainer;
import com.wavefront.agent.data.DataSubmissionTask;
import com.wavefront.agent.data.EntityPropertiesFactory;
import com.wavefront.agent.data.EventDataSubmissionTask;
import com.wavefront.agent.data.LineDelimitedDataSubmissionTask;
import com.wavefront.agent.data.LogDataSubmissionTask;
import com.wavefront.agent.data.SourceTagSubmissionTask;
import com.wavefront.agent.data.TaskInjector;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.queueing.QueueController;
import com.wavefront.agent.queueing.QueueProcessor;
import com.wavefront.agent.queueing.QueueingFactory;
import com.wavefront.agent.queueing.TaskQueue;
import com.wavefront.agent.queueing.TaskQueueFactory;
import com.wavefront.common.NamedThreadFactory;
import com.wavefront.data.ReportableEntityType;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public class QueueingFactoryImpl
implements QueueingFactory {
    private final Map<HandlerKey, ScheduledExecutorService> executors = new ConcurrentHashMap<HandlerKey, ScheduledExecutorService>();
    private final Map<HandlerKey, Map<Integer, QueueProcessor<?>>> queueProcessors = new ConcurrentHashMap();
    private final Map<HandlerKey, QueueController<?>> queueControllers = new ConcurrentHashMap();
    private final TaskQueueFactory taskQueueFactory;
    private final APIContainer apiContainer;
    private final UUID proxyId;
    private final Map<String, EntityPropertiesFactory> entityPropsFactoryMap;

    public QueueingFactoryImpl(APIContainer apiContainer, UUID proxyId, TaskQueueFactory taskQueueFactory, Map<String, EntityPropertiesFactory> entityPropsFactoryMap) {
        this.apiContainer = apiContainer;
        this.proxyId = proxyId;
        this.taskQueueFactory = taskQueueFactory;
        this.entityPropsFactoryMap = entityPropsFactoryMap;
    }

    <T extends DataSubmissionTask<T>> QueueProcessor<T> getQueueProcessor(@Nonnull HandlerKey handlerKey, ScheduledExecutorService executorService, int threadNum) {
        TaskQueue taskQueue = this.taskQueueFactory.getTaskQueue(handlerKey, threadNum);
        return this.queueProcessors.computeIfAbsent(handlerKey, x -> new TreeMap()).computeIfAbsent(threadNum, x -> new QueueProcessor(handlerKey, taskQueue, this.getTaskInjector(handlerKey, taskQueue), executorService, this.entityPropsFactoryMap.get(handlerKey.getTenantName()).get(handlerKey.getEntityType()), this.entityPropsFactoryMap.get(handlerKey.getTenantName()).getGlobalProperties()));
    }

    @Override
    public <T extends DataSubmissionTask<T>> QueueController<T> getQueueController(@Nonnull HandlerKey handlerKey, int numThreads) {
        ScheduledExecutorService executor = this.executors.computeIfAbsent(handlerKey, x -> Executors.newScheduledThreadPool(numThreads, (ThreadFactory)new NamedThreadFactory("queueProcessor-" + handlerKey.getEntityType() + "-" + handlerKey.getHandle())));
        List queueProcessors = IntStream.range(0, numThreads).mapToObj(i -> this.getQueueProcessor(handlerKey, executor, i)).collect(Collectors.toList());
        return this.queueControllers.computeIfAbsent(handlerKey, x -> new QueueController(handlerKey, queueProcessors, backlogSize -> this.entityPropsFactoryMap.get(handlerKey.getTenantName()).get(handlerKey.getEntityType()).reportBacklogSize(handlerKey.getHandle(), (int)backlogSize)));
    }

    private <T extends DataSubmissionTask<T>> TaskInjector<T> getTaskInjector(HandlerKey handlerKey, TaskQueue<T> queue) {
        ReportableEntityType entityType = handlerKey.getEntityType();
        String tenantName = handlerKey.getTenantName();
        switch (entityType) {
            case POINT: 
            case DELTA_COUNTER: 
            case HISTOGRAM: 
            case TRACE: 
            case TRACE_SPAN_LOGS: {
                return task -> ((LineDelimitedDataSubmissionTask)task).injectMembers(this.apiContainer.getProxyV2APIForTenant(tenantName), this.proxyId, this.entityPropsFactoryMap.get(tenantName).get(entityType), queue);
            }
            case SOURCE_TAG: {
                return task -> ((SourceTagSubmissionTask)task).injectMembers(this.apiContainer.getSourceTagAPIForTenant(tenantName), this.entityPropsFactoryMap.get(tenantName).get(entityType), queue);
            }
            case EVENT: {
                return task -> ((EventDataSubmissionTask)task).injectMembers(this.apiContainer.getEventAPIForTenant(tenantName), this.proxyId, this.entityPropsFactoryMap.get(tenantName).get(entityType), queue);
            }
            case LOGS: {
                return task -> ((LogDataSubmissionTask)task).injectMembers(this.apiContainer.getLogAPI(), this.proxyId, this.entityPropsFactoryMap.get(tenantName).get(entityType), queue);
            }
        }
        throw new IllegalArgumentException("Unexpected entity type: " + entityType);
    }

    @VisibleForTesting
    public void flushNow(@Nonnull HandlerKey handlerKey) {
        ReportableEntityType entityType = handlerKey.getEntityType();
        String handle = handlerKey.getHandle();
        for (String tenantName : this.apiContainer.getTenantNameList()) {
            HandlerKey tenantHandlerKey = HandlerKey.of(entityType, handle, tenantName);
            this.queueProcessors.get(tenantHandlerKey).values().forEach(QueueProcessor::run);
        }
    }
}

