/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.ha.service;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.SleepUtils;
import com.antgroup.geaflow.ha.service.IHAService;
import com.antgroup.geaflow.ha.service.ResourceData;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.utils.NetworkUtil;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHAService
implements IHAService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractHAService.class);
    protected static final String TABLE_PREFIX = "WORKERS_";
    protected static final int LOAD_INTERVAL_MS = 200;
    protected int connectTimeout;
    protected int recoverTimeout;
    protected Map<String, ResourceData> resourceDataCache = new ConcurrentHashMap<String, ResourceData>();
    protected IKVStore<String, ResourceData> kvStore;

    @Override
    public void open(Configuration configuration) {
        this.recoverTimeout = configuration.getInteger(ExecutionConfigKeys.FO_TIMEOUT_MS);
        this.connectTimeout = configuration.getInteger(ExecutionConfigKeys.RPC_CONNECT_TIMEOUT_MS);
    }

    @Override
    public void register(String resourceId, ResourceData resourceData) {
        if (this.kvStore != null) {
            this.kvStore.put((Object)resourceId, (Object)resourceData);
        }
    }

    @Override
    public ResourceData resolveResource(String resourceId) {
        return this.resourceDataCache.computeIfAbsent(resourceId, key -> this.loadDataFromStore((String)key, true));
    }

    @Override
    public ResourceData loadResource(String resourceId) {
        return this.resourceDataCache.computeIfAbsent(resourceId, key -> this.loadDataFromStore((String)key, false));
    }

    @Override
    public ResourceData invalidateResource(String resourceId) {
        return this.resourceDataCache.remove(resourceId);
    }

    @Override
    public void close() {
        if (this.kvStore != null) {
            this.kvStore.close();
        }
    }

    protected ResourceData getResourceData(String resourceId) {
        if (this.kvStore != null) {
            return (ResourceData)this.kvStore.get((Object)resourceId);
        }
        return null;
    }

    private ResourceData loadDataFromStore(String resourceId, boolean resolve) {
        return this.loadDataFromStore(resourceId, resolve, this.recoverTimeout, ResourceData::getRpcPort);
    }

    public ResourceData loadDataFromStore(String resourceId, boolean resolve, Function<ResourceData, Integer> portFunc) {
        return this.loadDataFromStore(resourceId, resolve, this.recoverTimeout, portFunc);
    }

    private ResourceData loadDataFromStore(String resourceId, boolean resolve, int timeoutMs, Function<ResourceData, Integer> portFunc) {
        ResourceData resourceData;
        long currentTime;
        long startTime = currentTime = System.currentTimeMillis();
        long checkTime = currentTime;
        IOException throwable = null;
        while (true) {
            if ((currentTime = System.currentTimeMillis()) - checkTime > 2000L) {
                long elapsedTime = currentTime - startTime;
                checkTime = currentTime;
                if (elapsedTime > (long)timeoutMs) {
                    String reason = throwable != null ? throwable.getMessage() : null;
                    String msg = String.format("load resource %s timeout after %sms, reason:%s", resourceId, elapsedTime, reason);
                    LOGGER.error(msg);
                    throw new GeaflowRuntimeException(msg);
                }
                SleepUtils.sleepMilliSecond((long)200L);
            }
            if ((resourceData = this.getResourceData(resourceId)) == null) continue;
            try {
                if (!resolve) break;
                int port = portFunc.apply(resourceData);
                NetworkUtil.checkServiceAvailable((String)resourceData.getHost(), (int)port, (int)this.connectTimeout);
            }
            catch (IOException ex) {
                throwable = ex;
                continue;
            }
            break;
        }
        return resourceData;
    }
}

