/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.container;

import akka.actor.ActorSelection;
import akka.actor.Address;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.container.Container;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.MapJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.processor.ProcessorFactory;
import com.alibaba.schedulerx.worker.util.JavaProcessorProfileUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.io.IOException;

public class ThreadContainer
implements Runnable,
Container {
    private JobContext context;
    private JobProcessor jobProcessor;
    protected ContainerPool containerPool;
    protected ActorSelection masterActorSelection;
    private static final int RESULT_SIZE_MAX = 1000;
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(ThreadContainer.class);

    public ThreadContainer() {
    }

    public ThreadContainer(JobContext context, ContainerPool containerPool) throws Exception {
        this.context = context;
        this.containerPool = containerPool;
        this.masterActorSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
        if (this.masterActorSelection == null) {
            String errMsg = "get taskMaster akka path error, path=" + context.getInstanceMasterActorPath();
            LOGGER.error(errMsg);
            throw new IOException(errMsg);
        }
    }

    @Override
    public void run() {
        this.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        block17: {
            this.containerPool.setContext(this.context);
            long start2 = System.currentTimeMillis();
            LOGGER.debug("start run container, uniqueId={}, cost={}ms", this.context.getUniqueId(), start2 - this.context.getScheduleTime().getMillis());
            ProcessResult result2 = new ProcessResult(false);
            Address address = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
            String workerAddr = address.host().get() + ":" + address.port().get();
            String uniqueId = this.context.getUniqueId();
            try {
                if (this.context.getTaskAttempt() == 0) {
                    this.reportTaskStatus(new ProcessResult(InstanceStatus.RUNNING), workerAddr);
                }
                this.jobProcessor = "java".equalsIgnoreCase(this.context.getJobType()) ? JavaProcessorProfileUtil.getJavaProcessor(this.context.getContent()) : ProcessorFactory.create(this.context.getJobType());
                if (this.jobProcessor != null) {
                    if ("java".equalsIgnoreCase(this.context.getJobType()) && (this.jobProcessor instanceof MapJobProcessor || this.context.getExecuteMode().equals("broadcast"))) {
                        result2 = this.jobProcessor.process(this.context);
                    } else {
                        this.jobProcessor.preProcess(this.context);
                        result2 = this.jobProcessor.process(this.context);
                        this.jobProcessor.postProcess(this.context);
                    }
                    long end = System.currentTimeMillis();
                    LOGGER.debug("container run finished, uniqueId={}, cost={}ms", uniqueId, end - start2);
                    if (result2.getStatus() != null && result2.getStatus().getValue() == InstanceStatus.FAILED.getValue()) {
                        this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[ThreadContainer-run]job processor exec fail.", result2.getResult()));
                    }
                } else {
                    result2 = new ProcessResult(InstanceStatus.FAILED, "jobProcessor is null!");
                    this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[ThreadContainer-run]job processor exec fail.", result2.getResult()));
                }
                if (this.context.getTaskMaxAttempt() > 0 && this.context.getTaskId() > 0L && result2.getStatus().equals((Object)InstanceStatus.FAILED)) {
                    int taskAttempt = this.context.getTaskAttempt();
                    if (taskAttempt < this.context.getTaskMaxAttempt()) {
                        ++taskAttempt;
                        try {
                            Thread.sleep(1000 * this.context.getTaskAttemptInterval());
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("", e);
                        }
                        this.context.setTaskAttempt(taskAttempt);
                        this.start();
                        break block17;
                    }
                    this.reportTaskStatus(result2, workerAddr);
                    break block17;
                }
                this.reportTaskStatus(result2, workerAddr);
            }
            catch (Throwable t) {
                LOGGER.error("run fail uniqueId={}, serialNum={}", uniqueId, this.context.getSerialNum(), t);
                String errMsg = ExceptionUtil.getTrace(t);
                if (errMsg.getBytes().length > 1000) {
                    byte[] tmp = new byte[1000];
                    System.arraycopy(errMsg.getBytes(), 0, tmp, 0, 1000);
                    String fixedErrMsg = new String(tmp);
                    result2 = new ProcessResult(InstanceStatus.FAILED, fixedErrMsg);
                } else {
                    result2 = new ProcessResult(InstanceStatus.FAILED, errMsg);
                }
                this.logCollector.collect(uniqueId, "[ThreadContainer-run]job processor exec fail.", t);
                this.reportTaskStatus(result2, workerAddr);
            }
            finally {
                this.containerPool.remove(this.context.getUniqueId());
                this.containerPool.removeContext();
            }
        }
    }

    @Override
    public void kill() {
        LOGGER.info("kill container, jobInstanceId={}", this.context.getJobInstanceId());
        if (this.jobProcessor != null) {
            this.jobProcessor.kill(this.context);
        }
        Address address = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
        String workerAddr = address.host().get() + ":" + address.port().get();
        Worker.ContainerReportTaskStatusRequest.Builder builder = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.context.getJobId()).setJobInstanceId(this.context.getJobInstanceId()).setTaskId(this.context.getTaskId()).setStatus(InstanceStatus.FAILED.getValue()).setWorkerAddr(workerAddr).setWorkerId(WorkerIdGenerator.get()).setResult("killed");
        if (this.context.getTaskName() != null) {
            builder.setTaskName(this.context.getTaskName());
        }
        this.masterActorSelection.tell(builder.build(), null);
        this.containerPool.remove(this.context.getUniqueId());
    }

    private void reportTaskStatus(ProcessResult result2, String workerAddr) {
        Worker.ContainerReportTaskStatusRequest.Builder resultBuilder = Worker.ContainerReportTaskStatusRequest.newBuilder();
        resultBuilder.setJobId(this.context.getJobId());
        resultBuilder.setJobInstanceId(this.context.getJobInstanceId());
        resultBuilder.setTaskId(this.context.getTaskId());
        resultBuilder.setStatus(result2.getStatus().getValue());
        resultBuilder.setWorkerAddr(workerAddr);
        resultBuilder.setWorkerId(WorkerIdGenerator.get());
        resultBuilder.setSerialNum(this.context.getSerialNum());
        resultBuilder.setInstanceMasterActorPath(this.context.getInstanceMasterActorPath());
        if (this.context.getTaskName() != null) {
            resultBuilder.setTaskName(this.context.getTaskName());
        }
        if (result2.getResult() != null) {
            resultBuilder.setResult(result2.getResult());
        }
        boolean enableShareContainerPool = ConfigUtil.getWorkerConfig().getBoolean("share.container.pool", false);
        boolean submitResult = false;
        submitResult = enableShareContainerPool ? ContainerStatusReqHandlerPool.INSTANCE.submitReq(0L, resultBuilder.build()) : ContainerStatusReqHandlerPool.INSTANCE.submitReq(this.context.getJobInstanceId(), resultBuilder.build());
        LOGGER.info("reportTaskStatus instanceId={} submitResult={}, processResult={}", this.context.getUniqueId(), submitResult, result2);
        if (!submitResult) {
            this.masterActorSelection.tell(resultBuilder.build(), null);
        }
    }

    public JobContext getContext() {
        return this.context;
    }

    public void setContext(JobContext context) {
        this.context = context;
    }
}

