/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector;

import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandServiceGrpc;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.NodeInfo;
import io.axoniq.axonserver.grpc.control.PlatformInboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformInfo;
import io.axoniq.axonserver.grpc.control.PlatformOutboundInstruction;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryServiceGrpc;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.EnumMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.net.ssl.SSLException;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.SynchronizedStreamObserver;
import org.axonframework.axonserver.connector.util.ContextAddingInterceptor;
import org.axonframework.axonserver.connector.util.TokenAddingInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PlatformConnectionManager {
    private static final Logger logger = LoggerFactory.getLogger(PlatformConnectionManager.class);
    private volatile ManagedChannel channel;
    private volatile StreamObserver<PlatformInboundInstruction> inputStream;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private volatile ScheduledFuture<?> reconnectTask;
    private final List<Runnable> disconnectListeners = new CopyOnWriteArrayList<Runnable>();
    private final List<Function<Runnable, Runnable>> reconnectInterceptors = new CopyOnWriteArrayList<Function<Runnable, Runnable>>();
    private final List<Runnable> reconnectListeners = new CopyOnWriteArrayList<Runnable>();
    private final AxonServerConfiguration connectInformation;
    private final Map<PlatformOutboundInstruction.RequestCase, Collection<Consumer<PlatformOutboundInstruction>>> handlers = new EnumMap<PlatformOutboundInstruction.RequestCase, Collection<Consumer<PlatformOutboundInstruction>>>(PlatformOutboundInstruction.RequestCase.class);

    public PlatformConnectionManager(AxonServerConfiguration connectInformation) {
        this.connectInformation = connectInformation;
    }

    public synchronized Channel getChannel() {
        if (this.channel == null || this.channel.isShutdown()) {
            this.channel = null;
            logger.info("Connecting {}using SSL...", (Object)(this.connectInformation.isSslEnabled() ? "" : "not "));
            boolean unavailable = false;
            for (NodeInfo nodeInfo : this.connectInformation.routingServers()) {
                ManagedChannel candidate = this.createChannel(nodeInfo.getHostName(), nodeInfo.getGrpcPort());
                PlatformServiceGrpc.PlatformServiceBlockingStub stub = (PlatformServiceGrpc.PlatformServiceBlockingStub)PlatformServiceGrpc.newBlockingStub((Channel)candidate).withInterceptors(new ClientInterceptor[]{new ContextAddingInterceptor(this.connectInformation.getContext()), new TokenAddingInterceptor(this.connectInformation.getToken())});
                try {
                    PlatformInfo clusterInfo = stub.getPlatformServer(ClientIdentification.newBuilder().setClientName(this.connectInformation.getClientName()).setComponentName(this.connectInformation.getComponentName()).build());
                    if (this.isPrimary(nodeInfo, clusterInfo)) {
                        this.channel = candidate;
                    } else {
                        this.shutdown(candidate);
                        logger.info("Connecting to {} ({}:{})", new Object[]{clusterInfo.getPrimary().getNodeName(), clusterInfo.getPrimary().getHostName(), clusterInfo.getPrimary().getGrpcPort()});
                        this.channel = this.createChannel(clusterInfo.getPrimary().getHostName(), clusterInfo.getPrimary().getGrpcPort());
                    }
                    this.startInstructionStream(clusterInfo.getPrimary().getNodeName());
                    unavailable = false;
                    logger.info("Re-subscribing commands and queries");
                    this.reconnectListeners.forEach(Runnable::run);
                    break;
                }
                catch (StatusRuntimeException sre) {
                    this.shutdown(candidate);
                    logger.warn("Connecting to AxonServer node {}:{} failed: {}", new Object[]{nodeInfo.getHostName(), nodeInfo.getGrpcPort(), sre.getMessage()});
                    if (!sre.getStatus().getCode().equals((Object)Status.Code.UNAVAILABLE)) continue;
                    unavailable = true;
                }
            }
            if (unavailable) {
                this.scheduleReconnect();
                throw new RuntimeException("No connection to AxonServer available");
            }
        }
        return this.channel;
    }

    private void shutdown(ManagedChannel managedChannel) {
        try {
            managedChannel.shutdownNow().awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            logger.debug("Interrupted during shutdown");
        }
    }

    private boolean isPrimary(NodeInfo nodeInfo, PlatformInfo clusterInfo) {
        return clusterInfo.getPrimary().getGrpcPort() == nodeInfo.getGrpcPort() && clusterInfo.getPrimary().getHostName().equals(nodeInfo.getHostName());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private ManagedChannel createChannel(String hostName, int port) {
        NettyChannelBuilder builder = NettyChannelBuilder.forAddress((String)hostName, (int)port);
        if (this.connectInformation.getKeepAliveTime() > 0L) {
            builder.keepAliveTime(this.connectInformation.getKeepAliveTime(), TimeUnit.MILLISECONDS).keepAliveTimeout(this.connectInformation.getKeepAliveTimeout(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(true);
        }
        if (this.connectInformation.isSslEnabled()) {
            try {
                if (this.connectInformation.getCertFile() == null) return builder.build();
                File certFile = new File(this.connectInformation.getCertFile());
                if (!certFile.exists()) {
                    throw new RuntimeException("Certificate file " + this.connectInformation.getCertFile() + " does not exist");
                }
                SslContext sslContext = GrpcSslContexts.forClient().trustManager(new File(this.connectInformation.getCertFile())).build();
                builder.sslContext(sslContext);
                return builder.build();
            }
            catch (SSLException e) {
                throw new RuntimeException("Couldn't set up SSL context", e);
            }
        } else {
            builder.usePlaintext();
        }
        return builder.build();
    }

    private synchronized void startInstructionStream(final String name) {
        logger.debug("Start instruction stream to {}", (Object)name);
        this.inputStream = new SynchronizedStreamObserver<PlatformInboundInstruction>(((PlatformServiceGrpc.PlatformServiceStub)PlatformServiceGrpc.newStub((Channel)this.channel).withInterceptors(new ClientInterceptor[]{new ContextAddingInterceptor(this.connectInformation.getContext()), new TokenAddingInterceptor(this.connectInformation.getToken())})).openStream(new StreamObserver<PlatformOutboundInstruction>(){

            public void onNext(PlatformOutboundInstruction messagePlatformOutboundInstruction) {
                ((Collection)PlatformConnectionManager.this.handlers.getOrDefault((Object)messagePlatformOutboundInstruction.getRequestCase(), new ArrayDeque())).forEach(consumer -> consumer.accept(messagePlatformOutboundInstruction));
                switch (messagePlatformOutboundInstruction.getRequestCase()) {
                    case NODE_NOTIFICATION: {
                        logger.debug("Received: {}", (Object)messagePlatformOutboundInstruction.getNodeNotification());
                        break;
                    }
                    case REQUEST_RECONNECT: {
                        Runnable reconnect = () -> {
                            PlatformConnectionManager.this.disconnectListeners.forEach(Runnable::run);
                            PlatformConnectionManager.this.inputStream.onCompleted();
                            PlatformConnectionManager.this.scheduleReconnect();
                        };
                        for (Function interceptor : PlatformConnectionManager.this.reconnectInterceptors) {
                            reconnect = (Runnable)interceptor.apply(reconnect);
                        }
                        reconnect.run();
                        break;
                    }
                }
            }

            public void onError(Throwable throwable) {
                StatusRuntimeException sre;
                logger.debug("Lost instruction stream from {} - {}", (Object)name, (Object)throwable.getMessage());
                PlatformConnectionManager.this.disconnectListeners.forEach(Runnable::run);
                if (throwable instanceof StatusRuntimeException && (sre = (StatusRuntimeException)throwable).getStatus().getCode().equals((Object)Status.Code.PERMISSION_DENIED)) {
                    return;
                }
                PlatformConnectionManager.this.scheduleReconnect();
            }

            public void onCompleted() {
                logger.warn("Closed instruction stream to {}", (Object)name);
                PlatformConnectionManager.this.disconnectListeners.forEach(Runnable::run);
                PlatformConnectionManager.this.scheduleReconnect();
            }
        }));
        this.inputStream.onNext((Object)PlatformInboundInstruction.newBuilder().setRegister(ClientIdentification.newBuilder().setClientName(this.connectInformation.getClientName()).setComponentName(this.connectInformation.getComponentName())).build());
    }

    private synchronized void tryReconnect() {
        if (this.channel != null) {
            return;
        }
        try {
            this.reconnectTask = null;
            this.getChannel();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void addReconnectListener(Runnable action) {
        this.reconnectListeners.add(action);
    }

    public void addDisconnectListener(Runnable action) {
        this.disconnectListeners.add(action);
    }

    public void addReconnectInterceptor(Function<Runnable, Runnable> interceptor) {
        this.reconnectInterceptors.add(interceptor);
    }

    private synchronized void scheduleReconnect() {
        if (this.reconnectTask == null || this.reconnectTask.isDone()) {
            if (this.channel != null) {
                try {
                    this.channel.shutdownNow().awaitTermination(1L, TimeUnit.SECONDS);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.channel = null;
            this.reconnectTask = this.scheduler.schedule(this::tryReconnect, 1L, TimeUnit.SECONDS);
        }
    }

    public StreamObserver<CommandProviderOutbound> getCommandStream(StreamObserver<CommandProviderInbound> commandsFromRoutingServer, ClientInterceptor[] interceptors) {
        return ((CommandServiceGrpc.CommandServiceStub)CommandServiceGrpc.newStub(this.getChannel()).withInterceptors(interceptors)).openStream(commandsFromRoutingServer);
    }

    public StreamObserver<QueryProviderOutbound> getQueryStream(StreamObserver<QueryProviderInbound> queryProviderInboundStreamObserver, ClientInterceptor[] interceptors) {
        return ((QueryServiceGrpc.QueryServiceStub)QueryServiceGrpc.newStub(this.getChannel()).withInterceptors(interceptors)).openStream(queryProviderInboundStreamObserver);
    }

    public void onOutboundInstruction(PlatformOutboundInstruction.RequestCase requestCase, Consumer<PlatformOutboundInstruction> consumer) {
        Collection consumers = this.handlers.computeIfAbsent(requestCase, rc -> new LinkedList());
        consumers.add(consumer);
    }

    public void send(PlatformInboundInstruction instruction) {
        this.inputStream.onNext((Object)instruction);
    }
}

