/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.container;

import com.antgroup.geaflow.cluster.collector.EmitterService;
import com.antgroup.geaflow.cluster.common.AbstractContainer;
import com.antgroup.geaflow.cluster.container.ContainerContext;
import com.antgroup.geaflow.cluster.container.ContainerInfo;
import com.antgroup.geaflow.cluster.container.IContainer;
import com.antgroup.geaflow.cluster.fetcher.FetcherService;
import com.antgroup.geaflow.cluster.protocol.ICommand;
import com.antgroup.geaflow.cluster.protocol.IEvent;
import com.antgroup.geaflow.cluster.protocol.OpenContainerEvent;
import com.antgroup.geaflow.cluster.protocol.OpenContainerResponseEvent;
import com.antgroup.geaflow.cluster.rpc.impl.ContainerEndpoint;
import com.antgroup.geaflow.cluster.rpc.impl.RpcServiceImpl;
import com.antgroup.geaflow.cluster.task.service.TaskService;
import com.antgroup.geaflow.cluster.worker.Dispatcher;
import com.antgroup.geaflow.cluster.worker.DispatcherService;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.ProcessUtil;
import com.antgroup.geaflow.shuffle.service.ShuffleManager;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Container
extends AbstractContainer
implements IContainer<IEvent, IEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Container.class);
    private static final String CONTAINER_NAME_PREFIX = "container-";
    private ContainerContext containerContext;
    private Dispatcher dispatcher;
    protected FetcherService fetcherService;
    protected EmitterService emitterService;
    protected TaskService workerService;
    protected DispatcherService dispatcherService;

    public Container() {
        this(0);
    }

    public Container(int rpcPort) {
        super(rpcPort);
    }

    @Override
    public void init(ContainerContext containerContext) {
        try {
            this.containerContext = containerContext;
            super.init(containerContext.getId(), CONTAINER_NAME_PREFIX, containerContext.getConfig());
            this.registerToMaster();
            LOGGER.info("container {} init finish", (Object)this.name);
        }
        catch (Throwable t) {
            LOGGER.error("init container err", t);
            throw new GeaflowRuntimeException(t);
        }
    }

    @Override
    protected void startRpcService() {
        this.rpcService = new RpcServiceImpl(this.rpcPort, this.configuration);
        this.rpcService.addEndpoint(new ContainerEndpoint(this));
        this.rpcPort = this.rpcService.startService();
    }

    @Override
    public OpenContainerResponseEvent open(OpenContainerEvent event) {
        try {
            int num = event.getExecutorNum();
            Preconditions.checkArgument((num > 0 ? 1 : 0) != 0, (Object)"worker num should > 0");
            LOGGER.info("open container {} with {} executors", (Object)this.name, (Object)num);
            this.fetcherService = new FetcherService(num, this.configuration);
            this.emitterService = new EmitterService(num, this.configuration);
            this.workerService = new TaskService(this.id, num, this.configuration, this.metricGroup, this.fetcherService, this.emitterService);
            this.dispatcher = new Dispatcher(this.workerService);
            this.dispatcherService = new DispatcherService(this.dispatcher);
            this.fetcherService.start();
            this.emitterService.start();
            this.workerService.start();
            this.dispatcherService.start();
            if (this.containerContext.getReliableEvents() != null) {
                for (IEvent reliableEvent : this.containerContext.getReliableEvents()) {
                    LOGGER.info("{} replay event {}", (Object)this.name, (Object)reliableEvent);
                    this.dispatcher.add((ICommand)reliableEvent);
                }
            }
            this.registerHAService();
            return new OpenContainerResponseEvent(this.id, 0);
        }
        catch (Throwable throwable) {
            LOGGER.error("{} open error", (Object)this.name, (Object)throwable);
            throw throwable;
        }
    }

    @Override
    public IEvent process(IEvent input) {
        LOGGER.info("{} process event {}", (Object)this.name, (Object)input);
        try {
            this.containerContext.addEvent(input);
            this.containerContext.checkpoint(new ContainerContext.EventCheckpointFunction());
            this.dispatcher.add((ICommand)input);
            return null;
        }
        catch (Throwable throwable) {
            LOGGER.error("{} process error", (Object)this.name, (Object)throwable);
            throw throwable;
        }
    }

    @Override
    public void close() {
        super.close();
        if (this.fetcherService != null) {
            this.fetcherService.shutdown();
        }
        if (this.workerService != null) {
            this.workerService.shutdown();
        }
        if (this.dispatcherService != null) {
            this.dispatcherService.shutdown();
        }
        if (this.emitterService != null) {
            this.emitterService.shutdown();
        }
        LOGGER.info("container {} closed", (Object)this.name);
    }

    @Override
    protected ContainerInfo buildComponentInfo() {
        ContainerInfo containerInfo = new ContainerInfo();
        containerInfo.setId(this.id);
        containerInfo.setName(this.name);
        containerInfo.setPid(ProcessUtil.getProcessId());
        containerInfo.setHost(ProcessUtil.getHostIp());
        containerInfo.setRpcPort(this.rpcPort);
        containerInfo.setShufflePort(ShuffleManager.getInstance().getShufflePort());
        return containerInfo;
    }
}

