/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.container;

import akka.actor.ActorSelection;
import akka.actor.Address;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.container.Container;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.container.MarkedRunnable;
import com.alibaba.schedulerx.worker.container.ThreadContainerPool;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.listener.ListenerServiceLoader;
import com.alibaba.schedulerx.worker.listener.ThreadContainerListener;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.JobProcessorEx;
import com.alibaba.schedulerx.worker.processor.MapJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.util.JobProcessorUtil;
import com.alibaba.schedulerx.worker.util.WorkerConfigUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Future;

public class ThreadContainer
implements MarkedRunnable<Long>,
Container {
    private JobContext context;
    private JobProcessor jobProcessor;
    protected ContainerPool containerPool;
    protected ActorSelection masterActorSelection;
    private static final int RESULT_SIZE_MAX = 1000;
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(ThreadContainer.class);
    private Future future;

    public ThreadContainer() {
    }

    public ThreadContainer(JobContext context, ContainerPool containerPool) throws Exception {
        this.context = context;
        this.containerPool = containerPool;
        this.masterActorSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
        if (this.masterActorSelection == null) {
            String errMsg = "get taskMaster akka path error, path=" + context.getInstanceMasterActorPath();
            LOGGER.error(errMsg);
            throw new IOException(errMsg);
        }
    }

    @Override
    public void run() {
        try {
            this.start();
        }
        catch (Throwable t) {
            LOGGER.error("start processor thread fail uniqueId={}, serialNum={}", this.context.getUniqueId(), this.context.getSerialNum(), t);
            this.containerPool.remove(this.context.getUniqueId());
            Address address = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
            String workerAddr = address.host().get() + ":" + address.port().get();
            this.reportTaskStatus(new ProcessResult(false, "start processor thread failed, uniqueId=" + this.context.getUniqueId()), workerAddr);
            this.containerPool.removeContext();
        }
    }

    public void executeBeforeListener(JobContext context) {
        Collection<ThreadContainerListener> listeners = ListenerServiceLoader.INSTANCE.getListeners(ThreadContainerListener.class);
        if (CollectionUtils.isNotEmpty(listeners)) {
            for (ThreadContainerListener listener : listeners) {
                try {
                    listener.before(context);
                }
                catch (Throwable t) {
                    LOGGER.warn("ThreadContainerListener<{}> before exec failed.", listener.getClass().getSimpleName(), t);
                }
            }
        }
    }

    public void executeAfterListener(JobContext context, ProcessResult result2) {
        try {
            Collection<ThreadContainerListener> listeners = ListenerServiceLoader.INSTANCE.getListeners(ThreadContainerListener.class);
            if (CollectionUtils.isNotEmpty(listeners)) {
                for (ThreadContainerListener listener : listeners) {
                    try {
                        listener.after(context, result2);
                    }
                    catch (Throwable t) {
                        LOGGER.warn("ThreadContainerListener<{}> after exec failed.", listener.getClass().getSimpleName(), t);
                    }
                }
            }
        }
        catch (Throwable t) {
            LOGGER.warn("ThreadContainerListener after exec failed. ", t);
        }
    }

    @Override
    public void start() {
        String threadName = ThreadContainerPool.getInstance().genThreadName(this.context.getJobId(), this.context.getJobInstanceId(), this.context.getTaskId());
        Thread.currentThread().setName(threadName);
        this.executeBeforeListener(this.context);
        this.containerPool.setContext(this.context);
        long start2 = System.currentTimeMillis();
        LOGGER.debug("start run container, uniqueId={}, cost={}ms", this.context.getUniqueId(), start2 - this.context.getScheduleTime().getMillis());
        ProcessResult result2 = new ProcessResult(false);
        Address address = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
        String workerAddr = address.host().get() + ":" + address.port().get();
        String uniqueId = this.context.getUniqueId();
        try {
            int taskAttempt;
            if (this.context.getTaskAttempt() == 0) {
                this.reportTaskStatus(new ProcessResult(InstanceStatus.RUNNING), workerAddr);
            }
            this.jobProcessor = JobProcessorUtil.getJobProcessor(this.context);
            if (this.jobProcessor != null) {
                try {
                    if ("java".equalsIgnoreCase(this.context.getJobType()) && (this.jobProcessor instanceof MapJobProcessor || this.context.getExecuteMode().equals("broadcast"))) {
                        result2 = this.jobProcessor.process(this.context);
                    } else if (this.jobProcessor instanceof JobProcessorEx) {
                        ((JobProcessorEx)this.jobProcessor).preProcess(this.context);
                        result2 = this.jobProcessor.process(this.context);
                        ((JobProcessorEx)this.jobProcessor).postProcess(this.context);
                    } else {
                        result2 = this.jobProcessor.process(this.context);
                    }
                }
                catch (InterruptedException e1) {
                    throw e1;
                }
                catch (Throwable t) {
                    LOGGER.error("run fail uniqueId={}, serialNum={}", uniqueId, this.context.getSerialNum(), t);
                    String fixedErrMsg = ExceptionUtil.getFixedErrMsgByThrowable(t, 1000);
                    result2 = new ProcessResult(InstanceStatus.FAILED, fixedErrMsg);
                    this.logCollector.collect(this.context.getAppGroupId(), uniqueId, "job processor exec fail:", t);
                }
                long end = System.currentTimeMillis();
                LOGGER.debug("container run finished, uniqueId={}, cost={}ms", uniqueId, end - start2);
                if (result2 == null) {
                    result2 = new ProcessResult(InstanceStatus.FAILED, "result can't be null");
                }
            } else {
                result2 = new ProcessResult(InstanceStatus.FAILED, "jobProcessor is null");
                this.logCollector.collect(this.context.getAppGroupId(), uniqueId, ClientLoggerMessage.appendMessage("job processor exec fail:", result2.getResult()));
            }
            if (this.context.getTaskMaxAttempt() > 0 && this.context.getTaskId() > 0L && result2.getStatus().equals(InstanceStatus.FAILED) && Thread.currentThread().isAlive() && (taskAttempt = this.context.getTaskAttempt()) < this.context.getTaskMaxAttempt()) {
                Thread.sleep(1000 * this.context.getTaskAttemptInterval());
                this.context.setTaskAttempt(++taskAttempt);
                this.start();
                return;
            }
        }
        catch (Throwable t) {
            LOGGER.error("run fail uniqueId={}, serialNum={}", uniqueId, this.context.getSerialNum(), t);
            String fixedErrMsg = ExceptionUtil.getFixedErrMsgByThrowable(t, 1000);
            result2 = new ProcessResult(InstanceStatus.FAILED, fixedErrMsg);
            this.logCollector.collect(this.context.getAppGroupId(), uniqueId, "job processor exec fail:", t);
        }
        this.containerPool.remove(this.context.getUniqueId());
        this.reportTaskStatus(result2, workerAddr);
        this.containerPool.removeContext();
        this.executeAfterListener(this.context, result2);
    }

    @Override
    public void kill(boolean mayInterruptIfRunning) {
        LOGGER.info("kill container, jobInstanceId={}", this.context.getJobInstanceId());
        if (this.jobProcessor != null && this.jobProcessor instanceof JobProcessorEx) {
            ((JobProcessorEx)this.jobProcessor).kill(this.context);
        }
        if (this.future != null) {
            this.future.cancel(mayInterruptIfRunning);
        }
        this.containerPool.remove(this.context.getUniqueId());
    }

    private void reportTaskStatus(ProcessResult result2, String workerAddr) {
        Worker.ContainerReportTaskStatusRequest.Builder resultBuilder = Worker.ContainerReportTaskStatusRequest.newBuilder();
        resultBuilder.setJobId(this.context.getJobId());
        resultBuilder.setJobInstanceId(this.context.getJobInstanceId());
        resultBuilder.setTaskId(this.context.getTaskId());
        resultBuilder.setStatus(result2.getStatus().getValue());
        resultBuilder.setWorkerAddr(workerAddr);
        resultBuilder.setWorkerId(WorkerIdGenerator.get());
        resultBuilder.setSerialNum(this.context.getSerialNum());
        if (StringUtils.isNotBlank(this.context.getTraceId())) {
            resultBuilder.setTraceId(this.context.getTraceId());
        }
        resultBuilder.setInstanceMasterActorPath(this.context.getInstanceMasterActorPath());
        if (this.context.getTaskName() != null) {
            resultBuilder.setTaskName(this.context.getTaskName());
        }
        if (result2.getResult() != null) {
            resultBuilder.setResult(result2.getResult());
        }
        boolean enableShareContainerPool = WorkerConfigUtil.isEnableShareContainerPool();
        boolean submitResult = false;
        submitResult = enableShareContainerPool ? ContainerStatusReqHandlerPool.INSTANCE.submitReq(0L, resultBuilder.build()) : ContainerStatusReqHandlerPool.INSTANCE.submitReq(this.context.getJobInstanceId(), resultBuilder.build());
        LOGGER.info("reportTaskStatus instanceId={}, serialNum={}, submitResult={}, processResult={}", this.context.getUniqueId(), this.context.getSerialNum(), submitResult, result2);
        if (!submitResult) {
            this.masterActorSelection.tell(resultBuilder.build(), null);
        }
    }

    public JobContext getContext() {
        return this.context;
    }

    public void setContext(JobContext context) {
        this.context = context;
    }

    public Future getFuture() {
        return this.future;
    }

    public void setFuture(Future future2) {
        this.future = future2;
    }

    public JobProcessor getJobProcessor() {
        return this.jobProcessor;
    }

    @Override
    public Long identify() {
        return this.context.getJobInstanceId();
    }
}

