/*
 * Decompiled with CFR 0.152.
 */
package io.gravitee.cockpit.connectors.ws;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.gravitee.cockpit.api.CockpitConnector;
import io.gravitee.cockpit.api.command.Command;
import io.gravitee.cockpit.api.command.CommandProducer;
import io.gravitee.cockpit.api.command.Payload;
import io.gravitee.cockpit.api.command.Reply;
import io.gravitee.cockpit.api.command.hello.HelloCommand;
import io.gravitee.cockpit.api.command.hello.HelloReply;
import io.gravitee.cockpit.api.command.installation.InstallationPayload;
import io.gravitee.cockpit.api.command.installation.InstallationReply;
import io.gravitee.cockpit.connectors.core.internal.CommandHandlerWrapper;
import io.gravitee.cockpit.connectors.ws.channel.ClientChannel;
import io.gravitee.cockpit.connectors.ws.endpoints.WebSocketEndpoint;
import io.gravitee.cockpit.connectors.ws.http.HttpClientFactory;
import io.gravitee.common.service.AbstractService;
import io.gravitee.node.api.Node;
import io.gravitee.plugin.core.api.PluginManifest;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Single;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;

public class WebSocketCockpitConnector
extends AbstractService<CockpitConnector>
implements CockpitConnector {
    private static final Logger log = LoggerFactory.getLogger(WebSocketCockpitConnector.class);
    private static final long PING_HANDLER_DELAY = 5000L;
    private static final String COCKPIT_ACCEPTED_STATUS = "ACCEPTED";
    @Value(value="${cockpit.enabled:false}")
    private boolean enabled;
    @Value(value="${cockpit.ws.discovery:true}")
    private boolean discovery;
    @Autowired
    private HttpClientFactory httpClientFactory;
    @Autowired
    private Vertx vertx;
    @Autowired
    private Node node;
    @Autowired
    @Qualifier(value="cockpitCommandHandlers")
    private Map<Command.Type, CommandHandlerWrapper<Command<?>, Reply>> commandHandlers;
    @Autowired(required=false)
    @Qualifier(value="cockpitHelloCommandProducer")
    private CommandProducer helloCommandProducer;
    @Autowired
    private PluginManifest pluginManifest;
    @Autowired
    @Qualifier(value="cockpitObjectMapper")
    private ObjectMapper objectMapper;
    private boolean isPrimary = false;
    private boolean closedByCockpit = false;
    private boolean shuttingDown = false;
    private long pongHandlerId;
    private CircuitBreaker circuitBreaker;
    private final String path;
    private HttpClient httpClient;
    private ClientChannel clientChannel;
    private final Collection<Runnable> onConnectListeners = new ConcurrentLinkedQueue<Runnable>();
    private final Collection<Runnable> onDisconnectListeners = new ConcurrentLinkedQueue<Runnable>();
    private final Collection<Runnable> onReadyListeners = new ConcurrentLinkedQueue<Runnable>();
    private final Collection<Runnable> onPrimaryListeners = new ConcurrentLinkedQueue<Runnable>();
    private final Collection<Runnable> onReplicaListeners = new ConcurrentLinkedQueue<Runnable>();

    public WebSocketCockpitConnector() {
        this.path = "/ws/controller";
    }

    protected void doStart() throws Exception {
        super.doStart();
        if (this.enabled) {
            log.info("Cockpit connector is enabled. Starting connector.");
            ContainerShutdownHook shutdownHook = new ContainerShutdownHook(this);
            Runtime.getRuntime().addShutdownHook(shutdownHook);
            this.commandHandlers.values().forEach(commandHandler -> commandHandler.setCallback(this::handleOnReadyNotification));
            this.circuitBreaker = CircuitBreaker.create((String)"cockpit-connector", (Vertx)this.vertx, (CircuitBreakerOptions)new CircuitBreakerOptions().setMaxRetries(Integer.MAX_VALUE).setNotificationAddress(null));
            this.circuitBreaker.retryPolicy(integer -> 5000L);
            this.connect();
        } else {
            log.info("Cockpit connector is disabled.");
        }
    }

    protected void doStop() throws Exception {
        this.shuttingDown = true;
        super.doStop();
        if (this.pongHandlerId != 0L) {
            this.vertx.cancelTimer(this.pongHandlerId);
        }
        if (this.clientChannel != null) {
            this.clientChannel.close().andThen((CompletableSource)Completable.create(emitter -> this.httpClient.close().onFailure(arg_0 -> ((CompletableEmitter)emitter).onError(arg_0)).onSuccess(result -> emitter.onComplete()))).blockingAwait();
        }
    }

    public void registerOnConnectListener(Runnable runnable) {
        this.onConnectListeners.add(runnable);
    }

    public void registerOnDisconnectListener(Runnable runnable) {
        this.onDisconnectListeners.add(runnable);
    }

    public void registerOnReadyListener(Runnable runnable) {
        this.onReadyListeners.add(runnable);
    }

    public void registerOnPrimary(Runnable runnable) {
        this.onPrimaryListeners.add(runnable);
    }

    public void registerOnReplica(Runnable runnable) {
        this.onReplicaListeners.add(runnable);
    }

    private void connect() {
        this.circuitBreaker.execute(this::doConnect).onComplete(result -> {
            if (result.succeeded()) {
                WebSocket webSocket = (WebSocket)result.result();
                this.clientChannel = new ClientChannel(webSocket, this.node, (CommandProducer<HelloCommand, HelloReply>)this.helloCommandProducer, this.commandHandlers, this.pluginManifest, this.objectMapper);
                this.notifyOnConnectListeners();
                this.clientChannel.onClose(() -> {
                    this.closedByCockpit = true;
                    webSocket.close();
                });
                this.clientChannel.onPrimary(() -> {
                    this.isPrimary = true;
                    this.notifyOnPrimaryListeners();
                });
                this.clientChannel.onReplica(() -> {
                    this.isPrimary = false;
                    this.notifyOnReplicaListeners();
                });
                this.clientChannel.init().doOnError(throwable -> log.error("An error occurred when initializing the web socket channel.", throwable)).subscribe(helloReply -> this.handleOnReadyNotification(null, (Reply)helloReply));
                this.pongHandlerId = this.vertx.setPeriodic(5000L, pong -> {
                    if (!webSocket.isClosed()) {
                        webSocket.writePing(Buffer.buffer((String)("ping_pong: " + this.node.id() + " - " + this.node.hostname())));
                    }
                });
                if (this.discovery) {
                    log.info("Discovery mode is enabled, listening for cockpit instances...");
                }
                webSocket.exceptionHandler(throwable -> log.error("An error occurs on the websocket connection", throwable));
                webSocket.pongHandler(data -> log.debug("Got a pong from Cockpit Controller"));
                webSocket.closeHandler(closeEvent -> {
                    log.debug("Connection to Cockpit Controller has been closed.");
                    if (this.pongHandlerId != 0L) {
                        this.vertx.cancelTimer(this.pongHandlerId);
                    }
                    this.clientChannel.cleanup();
                    this.notifyOnDisconnectListeners();
                    if (!this.closedByCockpit && !this.shuttingDown) {
                        this.connect();
                    }
                });
            } else {
                this.connect();
            }
        });
    }

    private void notifyOnConnectListeners() {
        log.debug("Notifying all OnConnect listeners.");
        this.onConnectListeners.forEach(Runnable::run);
    }

    private void notifyOnDisconnectListeners() {
        log.debug("Notifying all OnDisconnect listeners.");
        this.onDisconnectListeners.forEach(Runnable::run);
    }

    private void notifyOnReadyListeners() {
        log.debug("Notifying all OnReady listeners.");
        this.onReadyListeners.forEach(Runnable::run);
    }

    private void notifyOnPrimaryListeners() {
        log.debug("Notifying all OnPrimary listeners.");
        this.onPrimaryListeners.forEach(Runnable::run);
    }

    private void notifyOnReplicaListeners() {
        log.debug("Notifying all OnReplica listeners.");
        this.onReplicaListeners.forEach(Runnable::run);
    }

    private void doConnect(Promise<WebSocket> promise) {
        try {
            WebSocketEndpoint webSocketEndpoint = this.httpClientFactory.nextEndpoint();
            if (webSocketEndpoint == null) {
                log.warn("No Cockpit endpoint is defined. Please check that 'cockpit.ws.endpoints' property has been properly defined.");
                promise.fail("No Cockpit endpoint is defined.");
                return;
            }
            log.debug("Trying to connect to Cockpit Controller WebSocket (endpoint [{}])." + webSocketEndpoint.getUrl());
            this.httpClient = this.httpClientFactory.getHttpClient(webSocketEndpoint);
            this.httpClient.webSocket(webSocketEndpoint.getPort(), webSocketEndpoint.getHost(), webSocketEndpoint.resolvePath(this.path), result -> {
                if (result.succeeded()) {
                    webSocketEndpoint.reinitRetryCount();
                    log.info("Channel is now connected to Cockpit Controller through websocket from {}", (Object)(webSocketEndpoint + this.path));
                    promise.complete((Object)((WebSocket)result.result()));
                } else {
                    Throwable throwable = result.cause();
                    log.error("An error occurs while trying to connect to the Cockpit Controller: {} [{} times]", new Object[]{throwable.getMessage(), webSocketEndpoint.getRetryCount(), throwable});
                    promise.fail(throwable);
                    this.httpClient.close();
                }
            });
        }
        catch (Exception e) {
            log.error("An error occurred when trying to connect to Cockpit Controller.", (Throwable)e);
            promise.fail((Throwable)e);
        }
    }

    public Single<Reply> sendCommand(Command<? extends Payload> command) {
        return this.clientChannel.send(command);
    }

    private void handleOnReadyNotification(Command<?> command, Reply reply) {
        String installationStatus = null;
        if (reply instanceof HelloReply) {
            installationStatus = ((HelloReply)reply).getInstallationStatus();
        } else if (reply instanceof InstallationReply) {
            installationStatus = ((InstallationPayload)command.getPayload()).getStatus();
        }
        if (COCKPIT_ACCEPTED_STATUS.equals(installationStatus)) {
            this.notifyOnReadyListeners();
        }
    }

    public boolean isPrimary() {
        return this.isPrimary;
    }

    private class ContainerShutdownHook
    extends Thread {
        private final CockpitConnector connector;

        private ContainerShutdownHook(CockpitConnector connector) {
            super("graviteeio-cockpit-connector-finalizer");
            this.connector = connector;
        }

        @Override
        public void run() {
            try {
                this.connector.stop();
            }
            catch (Exception ex) {
                LoggerFactory.getLogger(this.getClass()).error("Unexpected error while stopping {}", (Object)WebSocketCockpitConnector.this.name(), (Object)ex);
            }
        }
    }
}

