/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.resourcemanager;

import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.clustermanager.ContainerExecutorInfo;
import com.antgroup.geaflow.cluster.clustermanager.ExecutorRegisterException;
import com.antgroup.geaflow.cluster.clustermanager.ExecutorRegisteredCallback;
import com.antgroup.geaflow.cluster.clustermanager.IClusterManager;
import com.antgroup.geaflow.cluster.resourcemanager.IResourceManager;
import com.antgroup.geaflow.cluster.resourcemanager.ReleaseResourceRequest;
import com.antgroup.geaflow.cluster.resourcemanager.ReleaseResponse;
import com.antgroup.geaflow.cluster.resourcemanager.RequireResourceRequest;
import com.antgroup.geaflow.cluster.resourcemanager.RequireResponse;
import com.antgroup.geaflow.cluster.resourcemanager.ResourceManagerContext;
import com.antgroup.geaflow.cluster.resourcemanager.ResourceSession;
import com.antgroup.geaflow.cluster.resourcemanager.WorkerInfo;
import com.antgroup.geaflow.cluster.resourcemanager.WorkerSnapshot;
import com.antgroup.geaflow.cluster.resourcemanager.allocator.IAllocator;
import com.antgroup.geaflow.cluster.resourcemanager.allocator.RoundRobinAllocator;
import com.antgroup.geaflow.cluster.system.ClusterMetaStore;
import com.antgroup.geaflow.common.errorcode.RuntimeErrors;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.SleepUtils;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultResourceManager
implements IResourceManager,
ExecutorRegisteredCallback,
Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultResourceManager.class);
    private static final String OPERATION_REQUIRE = "require";
    private static final String OPERATION_RELEASE = "release";
    private static final String OPERATION_ALLOCATE = "allocate";
    private static final int DEFAULT_SLEEP_MS = 10;
    private static final int MAX_REQUIRE_RETRY_TIMES = 1;
    private static final int MAX_RELEASE_RETRY_TIMES = 100;
    private final AtomicReference<Throwable> allocateWorkerErr = new AtomicReference();
    private final AtomicInteger pendingWorkerCounter = new AtomicInteger(0);
    private final AtomicBoolean resourceLock = new AtomicBoolean(true);
    private final AtomicBoolean recovering = new AtomicBoolean(false);
    private final AtomicBoolean inited = new AtomicBoolean(false);
    private final Map<IAllocator.AllocateStrategy, IAllocator<String, WorkerInfo>> allocators = new HashMap<IAllocator.AllocateStrategy, IAllocator<String, WorkerInfo>>();
    protected final IClusterManager clusterManager;
    protected final ClusterMetaStore metaKeeper;
    private final Map<WorkerInfo.WorkerId, WorkerInfo> availableWorkers = new HashMap<WorkerInfo.WorkerId, WorkerInfo>();
    private final Map<String, ResourceSession> sessions = new HashMap<String, ResourceSession>();

    public DefaultResourceManager(IClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        this.metaKeeper = ClusterMetaStore.getInstance();
    }

    @Override
    public void init(ResourceManagerContext context) {
        this.allocators.put(IAllocator.AllocateStrategy.ROUND_ROBIN, new RoundRobinAllocator());
        ClusterContext clusterContext = context.getClusterContext();
        clusterContext.addExecutorRegisteredCallback(this);
        boolean isRecover = context.isRecover();
        this.recovering.set(isRecover);
        int workerNum = clusterContext.getClusterConfig().getContainerNum() * clusterContext.getClusterConfig().getContainerWorkerNum();
        this.pendingWorkerCounter.set(workerNum);
        LOGGER.info("init worker number {}, isRecover {}", (Object)workerNum, (Object)isRecover);
        if (isRecover) {
            this.recover();
        } else {
            this.clusterManager.allocateWorkers(workerNum);
        }
        this.inited.set(true);
        LOGGER.info("init worker manager finish");
    }

    @Override
    public RequireResponse requireResource(RequireResourceRequest requireRequest) {
        String requireId = requireRequest.getRequireId();
        int requiredNum = requireRequest.getRequiredNum();
        if (this.sessions.containsKey(requireId)) {
            Map<WorkerInfo.WorkerId, WorkerInfo> sessionWorkers = this.sessions.get(requireId).getWorkers();
            if (requiredNum != sessionWorkers.size()) {
                String msg = "require number mismatch, old " + sessionWorkers.size() + " new " + requiredNum;
                LOGGER.error("[{}] require from session err: {}", (Object)requireId, (Object)msg);
                return RequireResponse.fail(requireId, msg);
            }
            ArrayList<WorkerInfo> workers = new ArrayList<WorkerInfo>(sessionWorkers.values());
            LOGGER.info("[{}] require from session with {} worker", (Object)requireId, (Object)workers.size());
            return RequireResponse.success(requireId, workers);
        }
        if (requiredNum <= 0) {
            String msg = RuntimeErrors.INST.resourceIllegalRequireNumError("illegal num " + requiredNum);
            LOGGER.error("[{}] {}", (Object)requireId, (Object)msg);
            return RequireResponse.fail(requireId, msg);
        }
        if (this.recovering.get()) {
            String msg = "resource manager still recovering";
            LOGGER.warn("[{}] {}", (Object)requireId, (Object)msg);
            return RequireResponse.fail(requireId, msg);
        }
        if (this.pendingWorkerCounter.get() > 0) {
            String msg = "some worker still pending creation";
            LOGGER.warn("[{}] {}", (Object)requireId, (Object)msg);
            return RequireResponse.fail(requireId, msg);
        }
        Optional<List> optional = this.withLock(OPERATION_REQUIRE, num -> {
            if (this.availableWorkers.size() < num) {
                LOGGER.warn("[{}] require {}, available {}, return empty", new Object[]{requireId, num, this.availableWorkers.size()});
                return Collections.emptyList();
            }
            IAllocator.AllocateStrategy strategy = requireRequest.getAllocateStrategy();
            List<WorkerInfo> allocated = this.allocators.get((Object)strategy).allocate(this.availableWorkers.values(), (int)num);
            for (WorkerInfo worker : allocated) {
                WorkerInfo.WorkerId workerId = worker.generateWorkerId();
                this.availableWorkers.remove(workerId);
                ResourceSession session = this.sessions.computeIfAbsent(requireId, ResourceSession::new);
                session.addWorker(workerId, worker);
            }
            LOGGER.info("[{}] require {} allocated {} available {}", new Object[]{requireId, num, allocated.size(), this.availableWorkers.size()});
            if (!allocated.isEmpty()) {
                this.persist();
            }
            return allocated;
        }, requiredNum, 1);
        List<WorkerInfo> allocated = optional.orElse(Collections.emptyList());
        return RequireResponse.success(requireId, allocated);
    }

    @Override
    public ReleaseResponse releaseResource(ReleaseResourceRequest releaseRequest) {
        int actualSize;
        String releaseId = String.valueOf(releaseRequest.getReleaseId());
        if (!this.sessions.containsKey(releaseId)) {
            String msg = "release fail, session not exists: " + releaseId;
            LOGGER.error(msg);
            return ReleaseResponse.fail(releaseId, msg);
        }
        int expectSize = this.sessions.get(releaseId).getWorkers().size();
        if (expectSize != (actualSize = releaseRequest.getWorkers().size())) {
            String msg = String.format("release fail, worker num of session %s mismatch, expected %d, actual %d", releaseId, expectSize, actualSize);
            LOGGER.error(msg);
            return ReleaseResponse.fail(releaseId, msg);
        }
        Optional<Boolean> optional = this.withLock(OPERATION_RELEASE, workers -> {
            for (WorkerInfo worker : workers) {
                WorkerInfo.WorkerId workerId = worker.generateWorkerId();
                this.availableWorkers.put(workerId, worker);
                if (this.sessions.get(releaseId).removeWorker(workerId)) continue;
                String msg = String.format("worker %s not exists in session %s", workerId, releaseId);
                LOGGER.error(msg);
                throw new GeaflowRuntimeException(msg);
            }
            this.sessions.remove(releaseId);
            LOGGER.info("[{}] release {} available {}", new Object[]{releaseId, workers.size(), this.availableWorkers.size()});
            this.persist();
            return true;
        }, releaseRequest.getWorkers(), 100);
        if (!optional.orElse(false).booleanValue()) {
            String msg = "release fail after 100 times";
            LOGGER.error(msg);
            return ReleaseResponse.fail(releaseId, msg);
        }
        return ReleaseResponse.success(releaseId);
    }

    @Override
    public void onSuccess(ContainerExecutorInfo containerExecutorInfo) {
        this.waitForInit();
        this.withLock(OPERATION_ALLOCATE, container -> {
            String containerName = container.getContainerName();
            String host = container.getHost();
            int rpcPort = container.getRpcPort();
            int shufflePort = container.getShufflePort();
            int processId = container.getProcessId();
            List<Integer> executorIds = container.getExecutorIds();
            this.onRegister(containerName, host, rpcPort, shufflePort, processId, executorIds);
            return true;
        }, containerExecutorInfo, Integer.MAX_VALUE);
    }

    private void onRegister(String containerName, String host, int rpcPort, int shufflePort, int processId, List<Integer> executorIds) {
        for (Integer workerIndex : executorIds) {
            int pending;
            WorkerInfo.WorkerId workerId = new WorkerInfo.WorkerId(containerName, workerIndex);
            WorkerInfo worker = null;
            if (this.availableWorkers.containsKey(workerId)) {
                worker = this.availableWorkers.get(workerId);
            }
            for (ResourceSession session : this.sessions.values()) {
                if (!session.getWorkers().containsKey(workerId)) continue;
                worker = session.getWorkers().get(workerId);
            }
            if (worker == null) {
                worker = WorkerInfo.build(host, rpcPort, shufflePort, processId, workerIndex, containerName);
                this.availableWorkers.put(worker.generateWorkerId(), worker);
                pending = this.pendingWorkerCounter.addAndGet(-1);
                LOGGER.info("allocate {} worker from cluster manager container {}, host {}, processId {}, workerIndex {}, pending {}", new Object[]{executorIds.size(), containerName, host, processId, workerIndex, pending});
                continue;
            }
            worker.setHost(host);
            worker.setProcessId(processId);
            worker.setRpcPort(rpcPort);
            worker.setShufflePort(shufflePort);
            pending = this.pendingWorkerCounter.get();
            LOGGER.info("recover {} worker from cluster manager container {}, host {}, processId {}, workerIndex {}, pending {}", new Object[]{executorIds.size(), containerName, host, processId, workerIndex, pending});
        }
        int pending = this.pendingWorkerCounter.get();
        int used = this.sessions.values().stream().mapToInt(s -> s.getWorkers().size()).sum();
        if (pending <= 0) {
            this.recovering.set(false);
            LOGGER.info("register worker over, available/used : {}/{}, pending {}", new Object[]{this.availableWorkers.size(), used, pending});
        } else {
            LOGGER.info("still pending : {}, available/used : {}/{}", new Object[]{pending, this.availableWorkers.size(), used});
        }
        this.persist();
    }

    @Override
    public void onFailure(ExecutorRegisterException e) {
        LOGGER.error("create worker err", (Throwable)((Object)e));
        this.allocateWorkerErr.compareAndSet(null, (Throwable)((Object)e));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T, R> Optional<R> withLock(String operation, Function<T, R> function, T input, int maxRetryTimes) {
        this.checkError();
        try {
            int retry = 0;
            while (!this.resourceLock.compareAndSet(true, false)) {
                SleepUtils.sleepMilliSecond((long)10L);
                if (++retry >= maxRetryTimes) {
                    LOGGER.warn("[{}] lock not ready, return empty", (Object)operation);
                    Optional optional = Optional.empty();
                    return optional;
                }
                if (retry % 100 != 0) continue;
                LOGGER.warn("[{}] lock not ready after {} times", (Object)operation, (Object)retry);
            }
            Optional<R> optional = Optional.of(function.apply(input));
            return optional;
        }
        finally {
            this.resourceLock.set(true);
        }
    }

    private void persist() {
        long start = System.currentTimeMillis();
        ArrayList<WorkerInfo> available = new ArrayList<WorkerInfo>(this.availableWorkers.values());
        ArrayList<ResourceSession> sessions = new ArrayList<ResourceSession>(this.sessions.values());
        int used = sessions.stream().mapToInt(s -> s.getWorkers().size()).sum();
        this.metaKeeper.saveWorkers(new WorkerSnapshot(available, sessions));
        this.metaKeeper.flush();
        LOGGER.info("persist {}/{} workers costs {}ms", new Object[]{this.availableWorkers.size(), used, System.currentTimeMillis() - start});
    }

    private void recover() {
        long start = System.currentTimeMillis();
        WorkerSnapshot workerSnapshot = this.metaKeeper.getWorkers();
        List<WorkerInfo> available = workerSnapshot.getAvailableWorkers();
        List<ResourceSession> sessions = workerSnapshot.getSessions();
        for (WorkerInfo workerInfo : available) {
            WorkerInfo.WorkerId workerId = workerInfo.generateWorkerId();
            this.availableWorkers.put(workerId, workerInfo);
        }
        int usedWorkerNum = 0;
        for (ResourceSession session : sessions) {
            String sessionId = session.getId();
            this.sessions.put(sessionId, session);
            usedWorkerNum += session.getWorkers().size();
        }
        int n = this.availableWorkers.size();
        this.pendingWorkerCounter.addAndGet(-n - usedWorkerNum);
        LOGGER.info("recover {}/{} workers, pending {}, costs {}ms", new Object[]{n, usedWorkerNum, this.pendingWorkerCounter.get(), System.currentTimeMillis() - start});
        if (this.pendingWorkerCounter.get() <= 0) {
            this.recovering.set(false);
            LOGGER.info("recover worker over, available/used : {}/{}", (Object)this.availableWorkers.size(), (Object)usedWorkerNum);
        }
    }

    private void waitForInit() {
        int count = 0;
        while (!this.inited.get()) {
            if (++count % 100 == 0) {
                LOGGER.warn("resource manager not inited, wait {}ms and retry", (Object)10);
            }
            SleepUtils.sleepMilliSecond((long)10L);
        }
    }

    @VisibleForTesting
    protected AtomicInteger getPendingWorkerCounter() {
        return this.pendingWorkerCounter;
    }

    @VisibleForTesting
    protected AtomicBoolean getResourceLock() {
        return this.resourceLock;
    }

    private void checkError() {
        Throwable firstException = this.allocateWorkerErr.get();
        if (firstException != null) {
            throw new GeaflowRuntimeException(firstException);
        }
    }
}

