/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.ha.service;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.SleepUtils;
import com.antgroup.geaflow.ha.service.IHAService;
import com.antgroup.geaflow.ha.service.ResourceData;
import com.antgroup.geaflow.store.api.key.IKVStore;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHAService
implements IHAService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractHAService.class);
    private static final int DEFAULT_TIMEOUT = 3000;
    protected static final String TABLE_PREFIX = "WORKERS_";
    protected int recoverTimeout;
    protected Map<String, ResourceData> resourceDataCache = new ConcurrentHashMap<String, ResourceData>();
    protected IKVStore<String, ResourceData> kvStore;

    @Override
    public void open(Configuration configuration) {
        this.recoverTimeout = configuration.getInteger(ExecutionConfigKeys.FO_TIMEOUT_MS);
    }

    @Override
    public void register(String resourceId, ResourceData resourceData) {
        this.kvStore.put((Object)resourceId, (Object)resourceData);
    }

    @Override
    public ResourceData resolveResource(String resourceId) {
        return this.resourceDataCache.computeIfAbsent(resourceId, this::loadDataFromStore);
    }

    @Override
    public void invalidateResource(String resourceId) {
        this.resourceDataCache.remove(resourceId);
    }

    @Override
    public void close() {
        if (this.kvStore != null) {
            this.kvStore.close();
        }
    }

    protected ResourceData getResourceData(String resourceId) {
        return (ResourceData)this.kvStore.get((Object)resourceId);
    }

    private ResourceData loadDataFromStore(String resourceId) {
        long currentTime;
        long startTime = currentTime = System.currentTimeMillis();
        long checkTime = currentTime;
        IOException throwable = null;
        ResourceData resourceData = null;
        while (true) {
            if ((currentTime = System.currentTimeMillis()) - checkTime > 2000L) {
                long elapsedTime = currentTime - startTime;
                LOGGER.warn("failed to resolve resource:{} resourceData:{}", new Object[]{resourceId, resourceData, throwable});
                checkTime = currentTime;
                if (elapsedTime > (long)this.recoverTimeout) {
                    String msg = String.format("load resource %s timeout after %sms", resourceId, elapsedTime);
                    LOGGER.error(msg);
                    throw new GeaflowRuntimeException(msg);
                }
                SleepUtils.sleepMilliSecond((long)200L);
            }
            if ((resourceData = this.getResourceData(resourceId)) == null) continue;
            try {
                this.checkServiceAvailable(resourceData.getHost(), resourceData.getRpcPort());
            }
            catch (IOException ex) {
                throwable = ex;
                continue;
            }
            break;
        }
        return resourceData;
    }

    private void checkServiceAvailable(String hostName, int port) throws IOException {
        try (Socket socket = new Socket();){
            InetSocketAddress socketAddress = new InetSocketAddress(hostName, port);
            socket.connect(socketAddress, 3000);
        }
    }
}

