/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.timer;

import akka.actor.ActorSelection;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.timer.AbstractTimerTask;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ZombieContainerCheckTimer
extends AbstractTimerTask {
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private ContainerPool containerPool = ContainerFactory.getContainerPool();

    @Override
    public String getName() {
        return "ZombieContainerCheckTimer";
    }

    @Override
    public long getInitialDelay() {
        return 300L;
    }

    @Override
    public long getPeriod() {
        return 600L;
    }

    @Override
    public void run() {
        HashMap<String, ArrayList<Long>> masterPath2JobInstanceIds = Maps.newHashMap();
        for (Map.Entry<Long, ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>> entry : this.statusReqBatchHandlerPool.getHandlers().entrySet()) {
            Long jobInstanceId = entry.getKey();
            String masterPath = entry.getValue().getTaskMasterAkkaPath();
            if (!masterPath2JobInstanceIds.containsKey(masterPath)) {
                masterPath2JobInstanceIds.put(masterPath, Lists.newArrayList(jobInstanceId));
                continue;
            }
            ((List)masterPath2JobInstanceIds.get(masterPath)).add(jobInstanceId);
        }
        for (Map.Entry<Long, ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>> entry : masterPath2JobInstanceIds.entrySet()) {
            String masterCheckPath = ((String)((Object)entry.getKey())).replace("/user/task_routing", "/user/heartbeat_routing");
            List jobInstanceIds = (List)((Object)entry.getValue());
            Worker.ContainerCheckZombieRequest request2 = Worker.ContainerCheckZombieRequest.newBuilder().addAllJobInstanceId(jobInstanceIds).build();
            ActorSelection selection = SchedulerxWorker.actorSystem.actorSelection(masterCheckPath);
            try {
                Worker.ContainerCheckZombieResponse response = (Worker.ContainerCheckZombieResponse)FutureUtils.awaitResult(selection, (Object)request2, 10L);
                List<Long> zombieJobInstanceIds = response.getZombieJobInstanceIdList();
                if (CollectionUtils.isEmpty(zombieJobInstanceIds)) continue;
                LOGGER.warn("detect zombieJobInstanceIds:{}, clean...", StringUtils.join(zombieJobInstanceIds, ","));
                for (Long zombieJobInstanceId : zombieJobInstanceIds) {
                    if (zombieJobInstanceId == 0L) continue;
                    this.statusReqBatchHandlerPool.stop(zombieJobInstanceId);
                    this.containerPool.destroyByInstance(zombieJobInstanceId);
                }
            }
            catch (Throwable e) {
                LOGGER.error("ZombieContainerCheckTimer check error", e);
            }
        }
    }
}

