/*
 * Decompiled with CFR 0.152.
 */
package io.dingodb.server.client.connector.impl;

import io.dingodb.common.Location;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.error.CommonError;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.common.util.NoBreakFunctions;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.NetService;
import io.dingodb.net.NetServiceProvider;
import io.dingodb.server.api.CoordinatorServerApi;
import io.dingodb.server.client.config.ClientConfiguration;
import io.dingodb.server.client.connector.Connector;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatorConnector
implements Connector,
Supplier<Location> {
    private static final Logger log = LoggerFactory.getLogger(CoordinatorConnector.class);
    private static final CoordinatorConnector DEFAULT_CONNECTOR;
    private final NetService netService = ServiceLoader.load(NetServiceProvider.class).iterator().next().get();
    private final AtomicReference<Channel> leaderChannel = new AtomicReference();
    private final AtomicReference<Location> leaderAddress = new AtomicReference();
    private final Set<Location> coordinatorAddresses = new HashSet<Location>();
    private final Map<Location, Channel> listenLeaderChannels = new ConcurrentHashMap<Location, Channel>();
    private long lastUpdateLeaderTime;
    private long lastUpdateNotLeaderChannelsTime;

    public static CoordinatorConnector defaultConnector() {
        return DEFAULT_CONNECTOR;
    }

    public CoordinatorConnector(List<Location> coordinatorAddresses) {
        this.coordinatorAddresses.addAll(coordinatorAddresses);
        this.refresh();
    }

    @Override
    public Channel newChannel() {
        this.get();
        return this.netService.newChannel(this.leaderChannel.get().remoteLocation());
    }

    @Override
    public boolean verify() {
        return this.leaderChannel.get() != null && this.leaderChannel.get().status() == Channel.Status.ACTIVE;
    }

    @Override
    public void refresh() {
        Executors.submit("coordinator-connector-refresh", this::initChannels);
    }

    @Override
    public Location get() {
        int times = 10;
        int sleep = 200;
        while (!this.verify() && times-- > 0) {
            try {
                Thread.sleep(sleep);
                this.refresh();
                sleep *= 10 - times;
            }
            catch (InterruptedException e) {
                log.error("Wait coordinator connector ready, but interrupted.");
            }
        }
        if (!this.verify()) {
            CommonError.EXEC_TIMEOUT.throwFormatError("wait connector available", Thread.currentThread().getName(), "");
        }
        return this.leaderChannel.get().remoteLocation();
    }

    private void initChannels() {
        for (Location address : this.coordinatorAddresses) {
            if (this.verify()) {
                return;
            }
            try {
                CoordinatorServerApi api = this.netService.apiRegistry().proxy(CoordinatorServerApi.class, () -> address);
                Location leaderAddress = api.leader();
                Channel channel = this.netService.newChannel(leaderAddress);
                this.connectedLeader(channel);
                return;
            }
            catch (Exception e) {
                log.error("Open coordinator channel error, address: {}", (Object)address, (Object)e);
            }
        }
    }

    private void connected(Message message, Channel channel) {
        log.info("Connected coordinator [{}] channel.", (Object)channel.remoteLocation());
        this.coordinatorAddresses.add(channel.remoteLocation());
        channel.setCloseListener(this::listenClose);
        channel.setMessageListener(this::listenLeader);
    }

    private void connectedLeader(Channel channel) {
        try {
            if (!this.leaderChange(channel)) {
                channel.close();
                return;
            }
            this.lastUpdateLeaderTime = System.currentTimeMillis();
            Supplier<Location> leaderLocation = channel::remoteLocation;
            this.coordinatorAddresses.addAll(this.netService.apiRegistry().proxy(CoordinatorServerApi.class, leaderLocation).getAll().stream().map(location -> new Location(location.getHost(), location.getPort())).collect(Collectors.toList()));
            this.coordinatorAddresses.stream().filter(address -> !address.equals(channel.remoteLocation())).forEach(address -> Executors.submit("CoordinatorConnector", () -> this.listenLeaderChannels.computeIfAbsent((Location)address, NoBreakFunctions.wrap(this::connectFollow, e -> log.error("Open follow channel error, address: {}", address, e)))));
            this.lastUpdateNotLeaderChannelsTime = System.currentTimeMillis();
            log.info("Connected coordinator leader success, remote: [{}]", (Object)channel.remoteLocation());
        }
        catch (Exception e) {
            log.error("Connected coordinator leader error, address: {}", (Object)channel, (Object)e);
        }
    }

    @Nonnull
    private Channel connectFollow(Location address) {
        Channel ch = this.netService.newChannel(address);
        ch.setMessageListener(this::connected);
        ch.send(new Message("RAFT_SERVICE", ByteArrayUtils.EMPTY_BYTES));
        log.info("Open coordinator channel, address: [{}]", (Object)address);
        return ch;
    }

    private void closeChannel(Channel channel) {
        try {
            channel.close();
        }
        catch (Exception e) {
            log.error("Close coordinator channel error, address: [{}].", (Object)channel.remoteLocation(), (Object)e);
        }
    }

    private void listenLeader(Message message, Channel channel) {
        log.info("Receive leader message from [{}]", (Object)channel.remoteLocation().getUrl());
        this.leaderChange(channel);
    }

    private synchronized boolean leaderChange(Channel channel) {
        Channel oldLeader = this.leaderChannel.get();
        if (oldLeader != null && oldLeader.isActive() && channel.remoteLocation().equals(oldLeader.remoteLocation())) {
            log.info("Coordinator leader not changed, remote: {}", (Object)channel.remoteLocation());
            return false;
        }
        if (!this.leaderChannel.compareAndSet(oldLeader, channel) && !this.leaderChannel.compareAndSet(null, channel)) {
            return false;
        }
        this.lastUpdateLeaderTime = System.currentTimeMillis();
        log.info("Coordinator leader channel changed, new leader remote: [{}], old leader remote: [{}]", (Object)channel.remoteLocation(), (Object)(oldLeader == null ? null : oldLeader.remoteLocation()));
        if (oldLeader != null) {
            this.closeChannel(oldLeader);
        }
        channel.setCloseListener(this::listenClose);
        return true;
    }

    private void listenClose(Channel channel) {
        this.listenLeaderChannels.remove(channel.remoteLocation());
        log.info("Coordinator channel closed, remote: [{}]", (Object)channel.remoteLocation());
        if (this.leaderChannel.compareAndSet(channel, null)) {
            this.lastUpdateLeaderTime = System.currentTimeMillis();
            log.info("Coordinator leader channel closed, remote: [{}].", (Object)channel.remoteLocation());
        }
    }

    static {
        if (ClientConfiguration.instance() == null) {
            DEFAULT_CONNECTOR = null;
        } else {
            String coordSrvList = ClientConfiguration.coordinatorExchangeSvrList();
            List<String> servers = Arrays.asList(coordSrvList.split(","));
            List<Location> addrList = servers.stream().map(s2 -> s2.split(":")).map(ss -> new Location(ss[0], Integer.parseInt(ss[1]))).collect(Collectors.toList());
            DEFAULT_CONNECTOR = new CoordinatorConnector(addrList);
        }
    }
}

