/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.graphql.client.vertx.websocket.graphqltransportws;

import io.smallrye.graphql.client.GraphQLClientException;
import io.smallrye.graphql.client.InvalidResponseException;
import io.smallrye.graphql.client.impl.ResponseReader;
import io.smallrye.graphql.client.vertx.websocket.WebSocketSubprotocolHandler;
import io.smallrye.graphql.client.vertx.websocket.graphqltransportws.MessageType;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.Cancellable;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.http.WebSocket;
import java.io.Reader;
import java.io.StringReader;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.JsonValue;
import javax.json.stream.JsonParsingException;
import org.jboss.logging.Logger;

public class GraphQLTransportWSSubprotocolHandler
implements WebSocketSubprotocolHandler {
    private static final Logger log = Logger.getLogger(GraphQLTransportWSSubprotocolHandler.class);
    private static final String SUBSCRIPTION_ID = "1";
    private final AtomicReference<WebSocket> webSocketReference = new AtomicReference();
    private final AtomicBoolean subscriptionIsActive = new AtomicBoolean(false);
    private final Integer subscriptionInitializationTimeout;
    private JsonObject connectionInitMessage;
    private JsonObject pongMessage;

    public GraphQLTransportWSSubprotocolHandler() {
        this.subscriptionInitializationTimeout = null;
    }

    public GraphQLTransportWSSubprotocolHandler(Integer subscriptionInitializationTimeout) {
        this.subscriptionInitializationTimeout = subscriptionInitializationTimeout;
    }

    @Override
    public void handleWebSocketStart(JsonObject request, MultiEmitter<? super String> dataEmitter, WebSocket webSocket) {
        log.trace((Object)("Initializing subscription over graphql-transport-ws protocol with request: " + request.toString()));
        this.webSocketReference.set(webSocket);
        this.connectionInitMessage = Json.createObjectBuilder().add("type", "connection_init").build();
        this.pongMessage = Json.createObjectBuilder().add("type", "pong").add("payload", Json.createObjectBuilder().add("message", "keepalive")).build();
        webSocket.closeHandler(v -> {
            if (webSocket.closeStatusCode() != null) {
                if (webSocket.closeStatusCode() == 1000) {
                    log.debug((Object)"Subscription finished successfully, the server closed the connection with status code 1000");
                    dataEmitter.complete();
                } else {
                    dataEmitter.fail((Throwable)new InvalidResponseException("Server closed the websocket connection with code: " + webSocket.closeStatusCode() + " and reason: " + webSocket.closeReason()));
                }
            } else {
                dataEmitter.complete();
            }
        });
        webSocket.exceptionHandler(arg_0 -> dataEmitter.fail(arg_0));
        dataEmitter.onTermination(() -> ((WebSocket)webSocket).close());
        this.send(webSocket, this.connectionInitMessage);
        Cancellable timeoutWaitingForConnectionAckMessage = null;
        if (this.subscriptionInitializationTimeout != null) {
            timeoutWaitingForConnectionAckMessage = Uni.createFrom().item((Object)1).onItem().delayIt().by(Duration.ofMillis(this.subscriptionInitializationTimeout.intValue())).subscribe().with(timeout -> {
                dataEmitter.fail((Throwable)new InvalidResponseException("Server did not send a connection_ack message"));
                webSocket.close((short)1002, "Timeout waiting for a connection_ack message");
            });
        }
        Cancellable finalTimeoutWaitingForConnectionAckMessage = timeoutWaitingForConnectionAckMessage;
        webSocket.handler(text -> {
            if (!dataEmitter.isCancelled()) {
                if (log.isTraceEnabled()) {
                    log.trace((Object)("<<< " + text));
                }
                try {
                    JsonObject message = this.parseIncomingMessage(text.toString());
                    MessageType messageType = this.getMessageType(message);
                    switch (messageType) {
                        case PING: {
                            this.send(webSocket, this.pongMessage);
                            break;
                        }
                        case CONNECTION_ACK: {
                            if (this.subscriptionIsActive.get()) break;
                            if (finalTimeoutWaitingForConnectionAckMessage != null) {
                                finalTimeoutWaitingForConnectionAckMessage.cancel();
                            }
                            this.subscriptionIsActive.set(true);
                            this.send(webSocket, this.createSubscribeMessage(request, SUBSCRIPTION_ID));
                            break;
                        }
                        case NEXT: {
                            String id = message.getString("id");
                            if (!id.equals(SUBSCRIPTION_ID)) {
                                dataEmitter.fail((Throwable)new InvalidResponseException("Received event for an unexpected subscription ID: " + id));
                            }
                            JsonObject data = message.getJsonObject("payload");
                            dataEmitter.emit((Object)data.toString());
                            break;
                        }
                        case ERROR: {
                            List errors = message.getJsonArray("payload").stream().map(ResponseReader::readError).collect(Collectors.toList());
                            dataEmitter.fail((Throwable)new GraphQLClientException("Received an error", errors));
                            break;
                        }
                        case COMPLETE: {
                            dataEmitter.complete();
                            break;
                        }
                    }
                }
                catch (IllegalArgumentException | JsonParsingException e) {
                    log.error((Object)("Unexpected message from server: " + text));
                    dataEmitter.fail((Throwable)new InvalidResponseException("Unexpected message from server", e));
                }
            } else {
                log.warn((Object)"Received an additional item for a subscription that has already ended with a failure, dropping it.");
            }
        });
    }

    @Override
    public void handleCancel() {
        WebSocket webSocket = this.webSocketReference.get();
        if (webSocket != null && !webSocket.isClosed()) {
            if (this.subscriptionIsActive.getAndSet(false)) {
                this.send(webSocket, this.createCompleteMessage(SUBSCRIPTION_ID));
            }
            webSocket.close((short)1000);
        }
    }

    private MessageType getMessageType(JsonObject message) {
        return MessageType.fromString(message.getString("type"));
    }

    private JsonObject parseIncomingMessage(String message) {
        return Json.createReader((Reader)new StringReader(message)).readObject();
    }

    private JsonObject createSubscribeMessage(JsonObject request, String id) {
        JsonObject variables;
        JsonObjectBuilder payload = Json.createObjectBuilder();
        payload.add("query", request.getString("query"));
        JsonValue operationName = (JsonValue)request.get((Object)"operationName");
        if (operationName instanceof JsonString) {
            payload.add("operationName", operationName);
        }
        if ((variables = request.getJsonObject("variables")) != null) {
            payload.add("variables", (JsonValue)variables);
        }
        return Json.createObjectBuilder().add("type", "subscribe").add("id", id).add("payload", payload).build();
    }

    private JsonObject createCompleteMessage(String id) {
        return Json.createObjectBuilder().add("type", "complete").add("id", id).build();
    }

    private void send(WebSocket webSocket, JsonObject message) {
        String string = message.toString();
        if (log.isTraceEnabled()) {
            log.trace((Object)(">>> " + string));
        }
        webSocket.writeTextMessage(string);
    }
}

