/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.metaserver.client;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.rpc.ConfigurableClientOption;
import com.antgroup.geaflow.common.rpc.HostAndPort;
import com.antgroup.geaflow.common.serialize.SerializerFactory;
import com.antgroup.geaflow.common.utils.RetryCommand;
import com.antgroup.geaflow.metaserver.model.protocal.MetaRequest;
import com.antgroup.geaflow.metaserver.model.protocal.request.RequestPBConverter;
import com.antgroup.geaflow.metaserver.model.protocal.response.ResponsePBConverter;
import com.antgroup.geaflow.metaserver.service.MetaServerService;
import com.antgroup.geaflow.rpc.proto.MetaServer;
import com.antgroup.geaflow.service.discovery.ServiceBuilder;
import com.antgroup.geaflow.service.discovery.ServiceBuilderFactory;
import com.antgroup.geaflow.service.discovery.ServiceConsumer;
import com.antgroup.geaflow.service.discovery.ServiceListener;
import com.baidu.brpc.client.BrpcProxy;
import com.baidu.brpc.client.RpcClient;
import com.baidu.brpc.client.channel.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseClient
implements ServiceListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseClient.class);
    private static final int MAX_RETRY = 5;
    private static final int RETRY_MS = 10000;
    protected ServiceConsumer serviceConsumer;
    protected RpcClient rpcClient;
    protected MetaServerService metaServerService;
    private HostAndPort currentServiceInfo;
    protected Configuration configuration;

    public BaseClient() {
    }

    public BaseClient(Configuration configuration) {
        this.configuration = configuration;
        ServiceBuilder serviceBuilder = ServiceBuilderFactory.build((String)configuration.getString(ExecutionConfigKeys.SERVICE_DISCOVERY_TYPE));
        this.serviceConsumer = serviceBuilder.buildConsumer(configuration);
        this.serviceConsumer.register((ServiceListener)this);
        this.buildServerConnect(true);
    }

    protected synchronized void buildServerConnect(boolean force) {
        boolean exits = this.serviceConsumer.exists("meta_server");
        if (!exits) {
            if (!force) {
                return;
            }
            throw new IllegalStateException("not find meta server info");
        }
        byte[] bytes = this.serviceConsumer.getDataAndWatch("meta_server");
        HostAndPort serviceInfo = (HostAndPort)SerializerFactory.getKryoSerializer().deserialize(bytes);
        if (this.currentServiceInfo != null && this.currentServiceInfo.equals((Object)serviceInfo)) {
            LOGGER.info("service info {} is same, skip update", (Object)this.currentServiceInfo);
            return;
        }
        if (this.rpcClient != null) {
            this.rpcClient.stop();
        }
        LOGGER.info("connect to meta server {}", (Object)serviceInfo);
        this.rpcClient = new RpcClient(new Endpoint(serviceInfo.getHost(), serviceInfo.getPort()), ConfigurableClientOption.build((Configuration)this.configuration));
        this.metaServerService = (MetaServerService)BrpcProxy.getProxy((RpcClient)this.rpcClient, MetaServerService.class);
        this.currentServiceInfo = serviceInfo;
    }

    protected <T> T process(MetaRequest request) {
        return (T)RetryCommand.run(() -> {
            MetaServer.ServiceResultPb result = this.metaServerService.process(RequestPBConverter.convert(request));
            return ResponsePBConverter.convert(result);
        }, () -> {
            this.buildServerConnect(false);
            return true;
        }, (int)5, (long)10000L, (boolean)true);
    }

    public void nodeCreated(String path) {
        this.checkAndUpdateConnector(path);
    }

    public void nodeDeleted(String path) {
        this.checkAndUpdateConnector(path);
    }

    public void nodeDataChanged(String path) {
        this.checkAndUpdateConnector(path);
    }

    public void nodeChildrenChanged(String path) {
        this.checkAndUpdateConnector(path);
    }

    private void checkAndUpdateConnector(String path) {
        if (path.contains("meta_server")) {
            this.buildServerConnect(false);
        }
    }

    public void close() {
        this.serviceConsumer.close();
        this.rpcClient.stop();
    }
}

