/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.graphql.ws;

import graphql.ExecutionResult;
import io.micronaut.configuration.graphql.GraphQLInvocation;
import io.micronaut.configuration.graphql.GraphQLInvocationData;
import io.micronaut.configuration.graphql.ws.CompleteMessage;
import io.micronaut.configuration.graphql.ws.ConnectionAckMessage;
import io.micronaut.configuration.graphql.ws.ConnectionInitMessage;
import io.micronaut.configuration.graphql.ws.ErrorMessage;
import io.micronaut.configuration.graphql.ws.GraphQLWsConfiguration;
import io.micronaut.configuration.graphql.ws.Message;
import io.micronaut.configuration.graphql.ws.NextMessage;
import io.micronaut.configuration.graphql.ws.PingMessage;
import io.micronaut.configuration.graphql.ws.PongMessage;
import io.micronaut.configuration.graphql.ws.SubscribeMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.codec.CodecException;
import io.micronaut.scheduling.ScheduledExecutorTaskScheduler;
import io.micronaut.websocket.CloseReason;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnError;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ServerWebSocket(value="${graphql.graphql-ws.path:/graphql-ws}", subprotocols="graphql-transport-ws")
@Requires(property="graphql.graphql-ws.enabled", value="true", defaultValue="false")
public class GraphQLWsHandler {
    static final String HTTP_REQUEST_KEY = "httpRequest";
    private static final Logger LOG = LoggerFactory.getLogger(GraphQLWsHandler.class);
    private final ScheduledExecutorTaskScheduler scheduler;
    private final GraphQLInvocation graphQLInvocation;
    private final GraphQLWsConfiguration configuration;
    private final ConcurrentSkipListSet<String> connections = new ConcurrentSkipListSet();
    private final ConcurrentMap<String, Publisher<? extends Message>> subscriptions = new ConcurrentHashMap<String, Publisher<? extends Message>>();

    public GraphQLWsHandler(ScheduledExecutorTaskScheduler scheduler, GraphQLInvocation graphQLInvocation, GraphQLWsConfiguration configuration) {
        this.scheduler = scheduler;
        this.graphQLInvocation = graphQLInvocation;
        this.configuration = configuration;
    }

    @OnOpen
    public void onOpen(WebSocketSession session, HttpRequest request) {
        session.put((CharSequence)HTTP_REQUEST_KEY, (Object)request);
        this.scheduler.schedule(this.configuration.getConnectionInitWaitTimeout(), () -> {
            if (!this.connections.contains(session.getId())) {
                session.close(new CloseReason(4408, "Connection initialisation timeout."));
            }
        });
        if (LOG.isTraceEnabled()) {
            LOG.trace("Opened websocket connection with id {}", (Object)session.getId());
        }
    }

    @OnMessage
    public Publisher<Message> onMessage(Message message, WebSocketSession session) {
        if (message instanceof ConnectionInitMessage) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received connection initialisation request for session id {}", (Object)session.getId());
            }
            return this.connections.add(session.getId()) ? session.send((Object)new ConnectionAckMessage()) : this.tooManyInitialisationRequests(session);
        }
        if (message instanceof PingMessage) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received a ping message for session id {}", (Object)session.getId());
            }
            return session.send((Object)new PongMessage());
        }
        if (message instanceof SubscribeMessage) {
            SubscribeMessage m = (SubscribeMessage)message;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received subscription message for session id {}", (Object)session.getId());
            }
            if (!this.connections.contains(session.getId())) {
                return this.unauthorized(session);
            }
            if (this.subscriptions.containsKey(m.getId())) {
                return this.subscriberAlreadyExists(m.getId(), session);
            }
            Mono subscription = this.executeSubscribe(m, session).doFinally(s -> this.subscriptions.remove(m.getId()));
            this.subscriptions.put(m.getId(), (Publisher<? extends Message>)subscription);
            return subscription;
        }
        if (message instanceof CompleteMessage) {
            CompleteMessage m = (CompleteMessage)message;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received complete message for session id {}", (Object)session.getId());
            }
            this.subscriptions.remove(m.getId());
        }
        return Mono.empty();
    }

    private Mono<Message> executeSubscribe(SubscribeMessage subscribeMessage, WebSocketSession session) {
        GraphQLInvocationData invocationData = new GraphQLInvocationData(subscribeMessage.getSubscribePayload().getQuery(), subscribeMessage.getSubscribePayload().getOperationName(), subscribeMessage.getSubscribePayload().getVariables());
        Optional httpRequest = session.get((CharSequence)HTTP_REQUEST_KEY, HttpRequest.class);
        if (httpRequest.isEmpty()) {
            return Mono.error((Throwable)new IllegalStateException("The HTTP request from the original WebSocket connection could not be retrieved."));
        }
        return Flux.from(this.graphQLInvocation.invoke(invocationData, (HttpRequest)httpRequest.get(), null)).flatMap(executionResult -> {
            Object patt7103$temp;
            if (executionResult.isDataPresent() && executionResult.getData() != null && (patt7103$temp = executionResult.getData()) instanceof Publisher) {
                Publisher p = (Publisher)patt7103$temp;
                return this.handleExecutionResultPublisher(p);
            }
            return Flux.just((Object)executionResult);
        }).takeUntil(e -> !this.subscriptions.containsKey(subscribeMessage.getId())).flatMap(executionResult -> this.handleExecutionResult(subscribeMessage, session, (ExecutionResult)executionResult)).last().filter(NextMessage.class::isInstance).flatMap(m -> this.completeSubscription(subscribeMessage, session));
    }

    private Flux<ExecutionResult> handleExecutionResultPublisher(Publisher<?> p) {
        return Flux.from(p).map(o -> {
            if (o instanceof ExecutionResult) {
                ExecutionResult publishedExecutionResult = (ExecutionResult)o;
                return publishedExecutionResult;
            }
            throw new IllegalArgumentException("Subscription data is an invalid type " + o.getClass().getName() + "- expected to be an ExecutionResult");
        });
    }

    private Publisher<Message> handleExecutionResult(SubscribeMessage subscribeMessage, WebSocketSession session, ExecutionResult executionResult) {
        if (!session.isOpen() && this.subscriptions.containsKey(subscribeMessage.getId())) {
            return Mono.empty();
        }
        if (executionResult.getErrors().isEmpty()) {
            return session.send((Object)new NextMessage(subscribeMessage.getId(), executionResult));
        }
        return session.send((Object)ErrorMessage.of(subscribeMessage.getId(), executionResult.getErrors()));
    }

    private Mono<CompleteMessage> completeSubscription(SubscribeMessage subscribeMessage, WebSocketSession session) {
        return Mono.from((Publisher)(session.isOpen() && this.subscriptions.containsKey(subscribeMessage.getId()) ? session.send((Object)new CompleteMessage(subscribeMessage.getId())) : Mono.empty()));
    }

    private Publisher<Message> unauthorized(WebSocketSession session) {
        session.close(new CloseReason(4401, "Unauthorized."));
        return Mono.empty();
    }

    private Publisher<Message> tooManyInitialisationRequests(WebSocketSession session) {
        session.close(new CloseReason(4403, "Too many initialisation requests."));
        return Mono.empty();
    }

    private Publisher<Message> subscriberAlreadyExists(String id, WebSocketSession session) {
        session.close(new CloseReason(4409, "Subscriber for " + id + " already exists."));
        return Mono.empty();
    }

    @OnClose
    public void onClose(WebSocketSession session, CloseReason closeReason) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Closed websocket connection with id {} with reason {}", (Object)session.getId(), (Object)closeReason);
        }
    }

    @OnError
    public void onError(WebSocketSession session, Throwable t) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Error websocket connection with id {} with error {}", (Object)session.getId(), (Object)t.getMessage());
        }
        if (t instanceof CodecException || t instanceof InstantiationError) {
            session.close(new CloseReason(4400, "Invalid message."));
        }
    }
}

