/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.driver;

import com.antgroup.geaflow.cluster.common.AbstractContainer;
import com.antgroup.geaflow.cluster.common.ExecutionIdGenerator;
import com.antgroup.geaflow.cluster.driver.DriverContext;
import com.antgroup.geaflow.cluster.driver.DriverEventDispatcher;
import com.antgroup.geaflow.cluster.driver.DriverInfo;
import com.antgroup.geaflow.cluster.driver.IDriver;
import com.antgroup.geaflow.cluster.exception.ComponentUncaughtExceptionHandler;
import com.antgroup.geaflow.cluster.executor.IPipelineExecutor;
import com.antgroup.geaflow.cluster.executor.PipelineExecutorContext;
import com.antgroup.geaflow.cluster.executor.PipelineExecutorFactory;
import com.antgroup.geaflow.cluster.protocol.IEvent;
import com.antgroup.geaflow.cluster.rpc.impl.DriverEndpoint;
import com.antgroup.geaflow.cluster.rpc.impl.PipelineMasterEndpoint;
import com.antgroup.geaflow.cluster.rpc.impl.RpcServiceImpl;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.ProcessUtil;
import com.antgroup.geaflow.common.utils.ThreadUtil;
import com.antgroup.geaflow.pipeline.Pipeline;
import com.antgroup.geaflow.pipeline.callback.TaskCallBack;
import com.antgroup.geaflow.pipeline.service.PipelineService;
import com.antgroup.geaflow.pipeline.task.PipelineTask;
import com.antgroup.geaflow.shuffle.service.ShuffleManager;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Driver
extends AbstractContainer
implements IDriver<IEvent, Boolean> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
    private static final String DRIVER_PREFIX = "driver-";
    private static final String DRIVER_EXECUTOR = "driver-executor";
    private static final AtomicInteger pipelineTaskIdGenerator = new AtomicInteger(0);
    private DriverEventDispatcher eventDispatcher;
    private DriverContext driverContext;
    private ExecutorService executorService;

    public Driver() {
        this(0);
    }

    public Driver(int rpcPort) {
        super(rpcPort);
    }

    @Override
    public void init(DriverContext driverContext) {
        super.init(driverContext.getId(), DRIVER_PREFIX, driverContext.getConfig());
        this.driverContext = driverContext;
        this.eventDispatcher = new DriverEventDispatcher();
        this.executorService = Executors.newFixedThreadPool(1, ThreadUtil.namedThreadFactory((boolean)true, (String)DRIVER_EXECUTOR, (Thread.UncaughtExceptionHandler)new ComponentUncaughtExceptionHandler()));
        ExecutionIdGenerator.init(this.id);
        ShuffleManager.getInstance().initShuffleMaster();
        if (driverContext.getPipeline() != null) {
            LOGGER.info("driver {} execute pipeline from recovered context", (Object)this.name);
            this.executorService.execute(() -> this.executePipelineInternal(driverContext.getPipeline()));
        }
        this.registerToMaster();
        this.registerHAService();
        LOGGER.info("driver {} init finish", (Object)this.name);
    }

    @Override
    protected void startRpcService() {
        this.rpcService = new RpcServiceImpl(this.rpcPort, this.configuration);
        this.rpcService.addEndpoint(new DriverEndpoint(this));
        this.rpcService.addEndpoint(new PipelineMasterEndpoint(this));
        this.rpcPort = this.rpcService.startService();
    }

    @Override
    public Boolean executePipeline(Pipeline pipeline) {
        LOGGER.info("driver {} execute pipeline {}", (Object)this.name, (Object)pipeline);
        Future<Boolean> future = this.executorService.submit(() -> this.executePipelineInternal(pipeline));
        try {
            return future.get();
        }
        catch (Throwable e) {
            throw new GeaflowRuntimeException(e);
        }
    }

    public Boolean executePipelineInternal(Pipeline pipeline) {
        try {
            LOGGER.info("start execute pipeline {}", (Object)pipeline);
            this.driverContext.addPipeline(pipeline);
            this.driverContext.checkpoint(new DriverContext.PipelineCheckpointFunction());
            IPipelineExecutor pipelineExecutor = PipelineExecutorFactory.createPipelineExecutor();
            PipelineExecutorContext executorContext = new PipelineExecutorContext(this.name, this.eventDispatcher, this.configuration, pipelineTaskIdGenerator);
            pipelineExecutor.init(executorContext);
            pipelineExecutor.register(pipeline.getViewDescMap());
            List pipelineTaskList = pipeline.getPipelineTaskList();
            List taskCallBackList = pipeline.getPipelineTaskCallbacks();
            int size = pipelineTaskList.size();
            for (int i = 0; i < size; ++i) {
                if (this.driverContext.getFinishedPipelineTasks() != null && this.driverContext.getFinishedPipelineTasks().contains(i)) continue;
                pipelineExecutor.runPipelineTask((PipelineTask)pipelineTaskList.get(i), (TaskCallBack)taskCallBackList.get(i));
                this.driverContext.addFinishedPipelineTask(i);
                this.driverContext.checkpoint(new DriverContext.PipelineTaskCheckpointFunction());
            }
            List pipelineServices = pipeline.getPipelineServices();
            for (PipelineService pipelineService : pipelineServices) {
                LOGGER.info("execute service");
                pipelineExecutor.startPipelineService(pipelineService);
            }
            LOGGER.info("finish execute pipeline {}", (Object)pipeline);
            return true;
        }
        catch (Throwable e) {
            LOGGER.error("driver exception", e);
            throw e;
        }
    }

    @Override
    public Boolean process(IEvent input) {
        LOGGER.info("{} process event {}", (Object)this.name, (Object)input);
        this.eventDispatcher.dispatch(input);
        return true;
    }

    @Override
    public void close() {
        super.close();
        this.executorService.shutdownNow();
        LOGGER.info("driver {} closed", (Object)this.name);
    }

    @Override
    protected DriverInfo buildComponentInfo() {
        DriverInfo driverInfo = new DriverInfo();
        driverInfo.setId(this.id);
        driverInfo.setName(this.name);
        driverInfo.setHost(ProcessUtil.getHostIp());
        driverInfo.setPid(ProcessUtil.getProcessId());
        driverInfo.setRpcPort(this.rpcPort);
        return driverInfo;
    }
}

