/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.impl.ReconnectConfiguration;
import io.axoniq.axonserver.connector.impl.ServerAddress;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.NodeInfo;
import io.axoniq.axonserver.grpc.control.PlatformInfo;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerManagedChannel
extends ManagedChannel {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerManagedChannel.class);
    private final List<ServerAddress> routingServers;
    private final long reconnectInterval;
    private final String context;
    private final ClientIdentification clientIdentification;
    private final ScheduledExecutorService executor;
    private final boolean forcePlatformReconnect;
    private final BiFunction<ServerAddress, String, ManagedChannel> connectionFactory;
    private final long connectTimeout;
    private final AtomicReference<ManagedChannel> activeChannel = new AtomicReference();
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private final AtomicBoolean suppressErrors = new AtomicBoolean();
    private final Queue<Runnable> connectListeners = new LinkedBlockingQueue<Runnable>();
    private final AtomicLong nextAttemptTime = new AtomicLong();
    private final AtomicLong connectionDeadline = new AtomicLong();
    private final AtomicReference<Exception> lastConnectException = new AtomicReference();
    private final AtomicBoolean scheduleGate = new AtomicBoolean();

    public AxonServerManagedChannel(List<ServerAddress> routingServers, ReconnectConfiguration reconnectConfiguration, String context, ClientIdentification clientIdentification, ScheduledExecutorService executor, BiFunction<ServerAddress, String, ManagedChannel> connectionFactory) {
        this.routingServers = new ArrayList<ServerAddress>(routingServers);
        this.reconnectInterval = reconnectConfiguration.getTimeUnit().toMillis(reconnectConfiguration.getReconnectInterval());
        this.context = context;
        this.clientIdentification = clientIdentification;
        this.executor = executor;
        this.forcePlatformReconnect = reconnectConfiguration.isForcePlatformReconnect();
        this.connectionFactory = connectionFactory;
        this.connectTimeout = reconnectConfiguration.getTimeUnit().toMillis(reconnectConfiguration.getConnectTimeout());
    }

    private ManagedChannel connectChannel() {
        ManagedChannel connection = null;
        for (ServerAddress nodeInfo : this.routingServers) {
            ManagedChannel candidate = null;
            try {
                candidate = this.connectionFactory.apply(nodeInfo, this.context);
                PlatformServiceGrpc.PlatformServiceBlockingStub stub = (PlatformServiceGrpc.PlatformServiceBlockingStub)PlatformServiceGrpc.newBlockingStub((Channel)candidate).withDeadlineAfter(this.connectTimeout, TimeUnit.MILLISECONDS);
                logger.info("Requesting connection details from {}:{}", (Object)nodeInfo.getHostName(), (Object)nodeInfo.getGrpcPort());
                PlatformInfo clusterInfo = stub.getPlatformServer(this.clientIdentification);
                NodeInfo primaryClusterInfo = clusterInfo.getPrimary();
                logger.debug("Received PlatformInfo suggesting [{}] ({}:{}), {}", new Object[]{primaryClusterInfo.getNodeName(), primaryClusterInfo.getHostName(), primaryClusterInfo.getGrpcPort(), clusterInfo.getSameConnection() ? "allowing use of existing connection" : "requiring new connection"});
                if (clusterInfo.getSameConnection() || primaryClusterInfo.getGrpcPort() == nodeInfo.getGrpcPort() && primaryClusterInfo.getHostName().equals(nodeInfo.getHostName())) {
                    logger.debug("Reusing existing channel");
                    connection = candidate;
                } else {
                    candidate.shutdown();
                    logger.info("Connecting to [{}] ({}:{})", new Object[]{primaryClusterInfo.getNodeName(), primaryClusterInfo.getHostName(), primaryClusterInfo.getGrpcPort()});
                    ServerAddress serverAddress = new ServerAddress(primaryClusterInfo.getHostName(), primaryClusterInfo.getGrpcPort());
                    connection = this.connectionFactory.apply(serverAddress, this.context);
                }
                this.suppressErrors.set(false);
                this.lastConnectException.set(null);
                break;
            }
            catch (Exception e) {
                this.lastConnectException.set(e);
                ObjectUtils.doIfNotNull(candidate, this::shutdownNow);
                if (!this.suppressErrors.getAndSet(true)) {
                    logger.warn("Connecting to AxonServer node [{}] failed.", (Object)nodeInfo, (Object)e);
                    continue;
                }
                logger.warn("Connecting to AxonServer node [{}] failed: {}", (Object)nodeInfo, (Object)e.getMessage());
            }
        }
        return connection;
    }

    private void shutdownNow(ManagedChannel managedChannel) {
        try {
            managedChannel.shutdownNow().awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("Interrupted during shutdown");
        }
    }

    public ManagedChannel shutdown() {
        this.shutdown.set(true);
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::shutdown);
        return this;
    }

    public boolean isShutdown() {
        return this.shutdown.get();
    }

    public boolean isTerminated() {
        if (!this.shutdown.get()) {
            return false;
        }
        ManagedChannel current = this.activeChannel.get();
        return current == null || current.isTerminated();
    }

    public ManagedChannel shutdownNow() {
        this.shutdown.set(true);
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::shutdownNow);
        return this;
    }

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        ManagedChannel current = this.activeChannel.get();
        if (current != null) {
            current.awaitTermination(timeout, unit);
        }
        return true;
    }

    public <REQ, RESP> ClientCall<REQ, RESP> newCall(MethodDescriptor<REQ, RESP> methodDescriptor, CallOptions callOptions) {
        ManagedChannel current = this.activeChannel.get();
        if (current == null || current.getState(false) != ConnectivityState.READY) {
            this.ensureConnected();
            current = this.activeChannel.get();
        }
        if (current == null) {
            return new FailingCall();
        }
        return current.newCall(methodDescriptor, callOptions);
    }

    public String authority() {
        return this.routingServers.get(0).toString();
    }

    public ConnectivityState getState(boolean requestConnection) {
        ManagedChannel current;
        if (this.shutdown.get()) {
            return ConnectivityState.SHUTDOWN;
        }
        if (requestConnection) {
            this.ensureConnected();
        }
        if ((current = this.activeChannel.get()) == null) {
            if (this.lastConnectException.get() == null) {
                return ConnectivityState.IDLE;
            }
            return ConnectivityState.TRANSIENT_FAILURE;
        }
        ConnectivityState state = current.getState(requestConnection);
        return state == ConnectivityState.SHUTDOWN ? ConnectivityState.TRANSIENT_FAILURE : state;
    }

    public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
        ManagedChannel current = this.activeChannel.get();
        logger.debug("Registering state change listener for {} on channel {}", (Object)source, (Object)current);
        switch (source) {
            case SHUTDOWN: 
            case READY: 
            case IDLE: 
            case CONNECTING: {
                if (current != null) {
                    current.notifyWhenStateChanged(source, callback);
                    break;
                }
                callback.run();
                break;
            }
            case TRANSIENT_FAILURE: {
                if (current == null) {
                    this.connectListeners.add(callback);
                    break;
                }
                callback.run();
            }
        }
    }

    public void resetConnectBackoff() {
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::resetConnectBackoff);
    }

    public void enterIdle() {
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::enterIdle);
    }

    private synchronized void ensureConnected() {
        if (this.shutdown.get()) {
            return;
        }
        logger.debug("Checking connection state");
        ManagedChannel current = this.activeChannel.get();
        ConnectivityState state = current == null ? ConnectivityState.SHUTDOWN : current.getState(true);
        long now = System.currentTimeMillis();
        switch (state) {
            case SHUTDOWN: 
            case TRANSIENT_FAILURE: {
                long deadline = this.nextAttemptTime.getAndUpdate(d -> d > now ? d : now + this.reconnectInterval);
                if (deadline > now) {
                    long timeLeft = Math.min(500L, deadline - now);
                    logger.debug("Reconnect timeout still enforced. Scheduling a new connection check in {}ms", (Object)timeLeft);
                    this.scheduleConnectionCheck(timeLeft);
                    return;
                }
                if (current != null) {
                    logger.info("Connection to AxonServer lost. Attempting to reconnect...");
                }
                this.createConnection(current);
                break;
            }
            case CONNECTING: {
                long connectDeadline = this.connectionDeadline.getAndUpdate(d -> d > now ? d : now + this.connectTimeout);
                if (connectDeadline != 0L && connectDeadline < now) {
                    logger.info("Unable to recover current connection to AxonServer. Attempting to reconnect...");
                    this.createConnection(current);
                    break;
                }
                this.scheduleConnectionCheck(Math.min(500L, connectDeadline - now));
                break;
            }
            case READY: {
                if (this.forcePlatformReconnect) {
                    this.connectionDeadline.set(1L);
                }
                logger.debug("Connection is {}", (Object)state);
                break;
            }
            default: {
                logger.debug("Connection is {}, checking again in 50ms", (Object)state);
                this.scheduleConnectionCheck(50L);
            }
        }
    }

    private void createConnection(ManagedChannel current) {
        ManagedChannel newConnection;
        if (this.forcePlatformReconnect && current != null && !current.isShutdown()) {
            logger.debug("Shut down current connection");
            current.shutdown();
        }
        if ((newConnection = this.connectChannel()) != null) {
            Runnable listener;
            if (!this.activeChannel.compareAndSet(current, newConnection)) {
                logger.debug("A successful Connection was concurrently set up. Closing this one.");
                newConnection.shutdown();
                return;
            }
            ObjectUtils.doIfNotNull(current, ManagedChannel::shutdown);
            if (logger.isInfoEnabled()) {
                logger.info("Successfully connected to {}", (Object)newConnection.authority());
            }
            this.connectionDeadline.set(0L);
            this.nextAttemptTime.set(0L);
            logger.debug("Registering state change handler");
            newConnection.notifyWhenStateChanged(ConnectivityState.READY, () -> this.verifyConnectionStateChange(newConnection));
            while ((listener = this.connectListeners.poll()) != null) {
                listener.run();
            }
        } else {
            logger.info("Failed to get connection to AxonServer. Scheduling a reconnect in {}ms", (Object)this.reconnectInterval);
            this.scheduleConnectionCheck(this.reconnectInterval);
        }
    }

    private void verifyConnectionStateChange(ManagedChannel channel) {
        ConnectivityState currentState = channel.getState(false);
        logger.debug("Connection state changed to {} scheduling connection check.}", (Object)currentState);
        if (currentState != ConnectivityState.SHUTDOWN) {
            logger.debug("Registering new state change handler");
            channel.notifyWhenStateChanged(currentState, () -> this.verifyConnectionStateChange(channel));
        }
        this.scheduleConnectionCheck(10L);
    }

    private void scheduleConnectionCheck(long interval) {
        try {
            if (this.scheduleGate.compareAndSet(false, true)) {
                this.executor.schedule(() -> {
                    this.scheduleGate.set(false);
                    this.ensureConnected();
                }, interval, TimeUnit.MILLISECONDS);
            }
        }
        catch (RejectedExecutionException e) {
            this.scheduleGate.set(false);
            logger.debug("Did not schedule reconnect attempt. Connector is shut down");
        }
    }

    public void requestReconnect() {
        ObjectUtils.doIfNotNull(this.activeChannel.getAndSet(null), currentChannel -> {
            logger.info("Reconnect for context {} requested. Closing current connection.", (Object)this.context);
            this.nextAttemptTime.set(0L);
            currentChannel.shutdown();
            this.executor.schedule(() -> ((ManagedChannel)currentChannel).shutdownNow(), 5L, TimeUnit.SECONDS);
        });
    }

    public boolean isReady() {
        return this.getState(false) == ConnectivityState.READY;
    }

    private static class FailingCall<REQ, RESP>
    extends ClientCall<REQ, RESP> {
        private FailingCall() {
        }

        public void start(ClientCall.Listener<RESP> responseListener, Metadata headers) {
            responseListener.onClose(Status.UNAVAILABLE, null);
        }

        public void request(int numMessages) {
        }

        public void cancel(String message, Throwable cause) {
        }

        public void halfClose() {
        }

        public void sendMessage(REQ message) {
        }
    }
}

