/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.pull;

import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.com.google.common.collect.Sets;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.pull.BlockingContainerQueue;
import com.alibaba.schedulerx.worker.pull.ConsumerThread;
import com.alibaba.schedulerx.worker.pull.PullThread;
import java.util.Map;
import java.util.Set;

public enum PullManager {
    INSTANCE;

    private Map<Long, BlockingContainerQueue> queueMap = Maps.newConcurrentMap();
    private Map<Long, PullThread> pullThreadMap = Maps.newConcurrentMap();
    private Map<Long, ConsumerThread[]> consumerThreadMap = Maps.newConcurrentMap();
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private Set<Long> crashedInstanceSet = Sets.newConcurrentHashSet();
    private static Logger LOGGER;

    public void init(long jobInstanceId, int pageSize, int queueSize, int consumerSize, String taskMasterAkkaPath) throws Exception {
        if (!this.queueMap.containsKey(jobInstanceId)) {
            BlockingContainerQueue queue2 = new BlockingContainerQueue(queueSize);
            this.queueMap.put(jobInstanceId, queue2);
            PullThread pullThread = new PullThread(jobInstanceId, pageSize, taskMasterAkkaPath, queue2);
            pullThread.start();
            this.pullThreadMap.put(jobInstanceId, pullThread);
            if (!this.statusReqBatchHandlerPool.contains(jobInstanceId)) {
                ReqQueue reqQueue = new ReqQueue(jobInstanceId, 100000);
                reqQueue.init();
                this.statusReqBatchHandlerPool.start(jobInstanceId, new ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>(jobInstanceId, 1, 1, 3000, reqQueue, taskMasterAkkaPath));
            }
            ConsumerThread[] consumers = new ConsumerThread[consumerSize];
            for (int i = 0; i < consumerSize; ++i) {
                consumers[i] = new ConsumerThread(queue2, ContainerFactory.getContainerPool(), taskMasterAkkaPath);
                new Thread((Runnable)consumers[i], "Schedulerx-ConsumerThread-" + jobInstanceId + "-" + i).start();
            }
            this.consumerThreadMap.put(jobInstanceId, consumers);
        }
    }

    public void crash(long jobInstanceId) {
        this.crashedInstanceSet.add(jobInstanceId);
    }

    public void stop(long jobInstanceId) {
        if (this.pullThreadMap.containsKey(jobInstanceId)) {
            this.pullThreadMap.get(jobInstanceId).stopRunning();
            this.pullThreadMap.remove(jobInstanceId);
        }
        if (this.consumerThreadMap.containsKey(jobInstanceId)) {
            ConsumerThread[] consumers;
            for (ConsumerThread consumer : consumers = this.consumerThreadMap.get(jobInstanceId)) {
                consumer.stopRunning();
            }
            this.consumerThreadMap.remove(jobInstanceId);
        }
        if (this.queueMap.containsKey(jobInstanceId)) {
            this.queueMap.get(jobInstanceId).clear();
            this.queueMap.remove(jobInstanceId);
        }
        this.crashedInstanceSet.remove(jobInstanceId);
    }

    public boolean contains(long jobInstanceId) {
        return this.queueMap.containsKey(jobInstanceId);
    }

    public boolean isCrashed(long jobInstanceId) {
        return this.crashedInstanceSet.contains(jobInstanceId);
    }

    static {
        LOGGER = LogFactory.getLogger(PullManager.class);
    }
}

