/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.graphql.websocket;

import graphql.ExecutionResult;
import io.smallrye.graphql.execution.ExecutionResponse;
import io.smallrye.graphql.execution.ExecutionResponseWriter;
import io.smallrye.graphql.execution.ExecutionService;
import io.smallrye.graphql.spi.LookupService;
import io.smallrye.graphql.websocket.GraphQLWebSocketSession;
import io.smallrye.graphql.websocket.GraphQLWebsocketHandler;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonValue;
import javax.json.stream.JsonParsingException;
import org.jboss.logging.Logger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public abstract class AbstractGraphQLWebsocketHandler
implements GraphQLWebsocketHandler {
    protected final Logger LOG = Logger.getLogger((String)GraphQLWebsocketHandler.class.getName());
    protected final ExecutionService executionService = LookupService.get().getInstance(ExecutionService.class).get();
    protected final GraphQLWebSocketSession session;
    protected final AtomicBoolean connectionInitialized;
    protected final String connectionAckMessage;
    protected final Map<String, Subscriber<ExecutionResult>> activeOperations;
    protected final Cancellable keepAliveSender;
    private final String dataMessageTypeName;
    private final Map<String, Object> context;
    private static final Subscriber<ExecutionResult> SINGLE_RESULT_MARKER = new Subscriber<ExecutionResult>(){

        public void onSubscribe(Subscription s) {
        }

        public void onNext(ExecutionResult executionResult) {
        }

        public void onError(Throwable t) {
        }

        public void onComplete() {
        }
    };

    public AbstractGraphQLWebsocketHandler(GraphQLWebSocketSession session, String dataMessageTypeName, Map<String, Object> context) {
        this.session = session;
        this.dataMessageTypeName = dataMessageTypeName;
        this.context = context;
        this.connectionInitialized = new AtomicBoolean(false);
        this.connectionAckMessage = this.createConnectionAckMessage().toString();
        this.activeOperations = new ConcurrentHashMap<String, Subscriber<ExecutionResult>>();
        this.keepAliveSender = Multi.createFrom().ticks().startingAfter(Duration.ofSeconds(10L)).every(Duration.ofSeconds(10L)).subscribe().with(tick -> this.sendKeepAlive());
    }

    @Override
    public void onMessage(String text) {
        if (this.LOG.isTraceEnabled()) {
            this.LOG.trace((Object)("<<< " + text));
        }
        this.onMessage(this.getMessageAsJsonObject(text));
    }

    @Override
    public void onThrowable(Throwable t) {
        this.LOG.warn((Object)"Error in websocket", t);
        if (this.keepAliveSender != null) {
            this.keepAliveSender.cancel();
        }
    }

    @Override
    public void onClose() {
        this.LOG.debug((Object)("GraphQL-over-websocket session " + this.session + " closed"));
        this.activeOperations.forEach((id, operation) -> this.cancelOperation((String)id));
        if (!this.session.isClosed()) {
            this.session.close((short)1000, "");
        }
        if (this.keepAliveSender != null) {
            this.keepAliveSender.cancel();
        }
    }

    @Override
    public void onEnd() {
    }

    protected void sendConnectionAckMessage() throws IOException {
        if (this.connectionInitialized.getAndSet(true)) {
            this.session.close((short)4429, "Too many initialisation requests");
        } else {
            this.session.sendMessage(this.connectionAckMessage);
        }
    }

    protected void sendDataMessage(JsonObject message) {
        final String operationId = message.getString("id");
        if (this.validSubscription(operationId)) {
            JsonObject payload = message.getJsonObject("payload");
            this.executionService.executeAsync(payload, this.context, new ExecutionResponseWriter(){

                @Override
                public void write(ExecutionResponse executionResponse) {
                    ExecutionResult executionResult = executionResponse.getExecutionResult();
                    if (executionResult != null) {
                        try {
                            if (!executionResult.isDataPresent()) {
                                AbstractGraphQLWebsocketHandler.this.sendErrorMessage(operationId, executionResponse);
                            } else {
                                Object data = executionResponse.getExecutionResult().getData();
                                if (data instanceof Map) {
                                    AbstractGraphQLWebsocketHandler.this.sendSingleMessage(operationId, executionResponse);
                                } else if (data instanceof Publisher) {
                                    AbstractGraphQLWebsocketHandler.this.sendStreamingMessage(operationId, executionResponse);
                                } else {
                                    AbstractGraphQLWebsocketHandler.this.logUnknownResult(executionResult);
                                }
                            }
                        }
                        catch (IOException ioe) {
                            this.fail(ioe);
                        }
                    }
                }
            });
        }
    }

    private JsonObject createConnectionAckMessage() {
        return Json.createObjectBuilder().add("type", "connection_ack").build();
    }

    private JsonObject getMessageAsJsonObject(String text) {
        try {
            return this.parseIncomingMessage(text);
        }
        catch (JsonParsingException ex) {
            this.session.close((short)4400, ex.getMessage());
            return null;
        }
        catch (IllegalArgumentException | NullPointerException ex) {
            this.session.close((short)4400, "Unknown message type");
            return null;
        }
    }

    private JsonObject parseIncomingMessage(String message) {
        return Json.createReader((Reader)new StringReader(message)).readObject();
    }

    private JsonObject createCompleteMessage(String operationId) {
        return Json.createObjectBuilder().add("type", "complete").add("id", operationId).build();
    }

    private JsonObject createDataMessage(String operationId, JsonObject payload) {
        return Json.createObjectBuilder().add("type", this.dataMessageTypeName).add("id", operationId).add("payload", (JsonValue)payload).build();
    }

    private void logUnknownResult(ExecutionResult executionResult) {
        this.LOG.warn((Object)("Unknown execution result of type " + executionResult.getClass()));
    }

    private void sendSingleMessage(String operationId, ExecutionResponse executionResponse) throws IOException {
        if (this.activeOperations.remove(operationId) != null) {
            this.session.sendMessage(this.createDataMessage(operationId, executionResponse.getExecutionResultAsJsonObject()).toString());
            this.session.sendMessage(this.createCompleteMessage(operationId).toString());
        }
    }

    private void sendStreamingMessage(String operationId, ExecutionResponse executionResponse) {
        SubscriptionSubscriber subscriber = new SubscriptionSubscriber(this.session, operationId);
        Publisher stream = (Publisher)executionResponse.getExecutionResult().getData();
        if (stream != null) {
            this.activeOperations.put(operationId, subscriber);
            stream.subscribe((Subscriber)subscriber);
        }
    }

    private void sendKeepAlive() {
        try {
            this.session.sendMessage(this.getPingMessage());
        }
        catch (IOException e) {
            this.LOG.warn((Object)e);
        }
    }

    protected void sendCancelMessage(JsonObject message) {
        String opId = message.getString("id");
        boolean cancelled = this.cancelOperation(opId);
        if (cancelled) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug((Object)("Completed operation id " + opId + " per client's request"));
            }
        } else if (this.LOG.isDebugEnabled()) {
            this.LOG.debug((Object)("Client requested to complete operation id " + opId + ", but no such operation is active"));
        }
    }

    private boolean cancelOperation(String opId) {
        Subscriber<ExecutionResult> subscriber = this.activeOperations.remove(opId);
        if (subscriber != null) {
            if (subscriber instanceof SubscriptionSubscriber) {
                ((SubscriptionSubscriber)subscriber).cancel();
            }
            return true;
        }
        return false;
    }

    private boolean validSubscription(String operationId) {
        if (!this.connectionInitialized.get()) {
            this.closeDueToConnectionNotInitialized();
            return false;
        }
        if (this.activeOperations.putIfAbsent(operationId, SINGLE_RESULT_MARKER) != null) {
            this.session.close((short)4409, "Subscriber for " + operationId + " already exists");
            return false;
        }
        return true;
    }

    protected abstract void onMessage(JsonObject var1);

    protected abstract void sendErrorMessage(String var1, ExecutionResponse var2) throws IOException;

    protected abstract void closeDueToConnectionNotInitialized();

    protected abstract String getPingMessage();

    private class SubscriptionSubscriber
    implements Subscriber<ExecutionResult> {
        private final AtomicReference<Subscription> subscription = new AtomicReference();
        private final GraphQLWebSocketSession session;
        private final String operationId;

        public SubscriptionSubscriber(GraphQLWebSocketSession session, String operationId) {
            this.session = session;
            this.operationId = operationId;
        }

        public void onSubscribe(Subscription s) {
            this.subscription.set(s);
            this.subscription.get().request(1L);
        }

        public void onNext(ExecutionResult executionResult) {
            if (!this.session.isClosed()) {
                ExecutionResponse executionResponse = new ExecutionResponse(executionResult);
                try {
                    this.session.sendMessage(AbstractGraphQLWebsocketHandler.this.createDataMessage(this.operationId, executionResponse.getExecutionResultAsJsonObject()).toString());
                }
                catch (IOException e) {
                    AbstractGraphQLWebsocketHandler.this.LOG.warn((Object)e);
                }
                this.subscription.get().request(1L);
            }
        }

        public void onError(Throwable t) {
            t.printStackTrace();
        }

        public void onComplete() {
            if (AbstractGraphQLWebsocketHandler.this.LOG.isTraceEnabled()) {
                AbstractGraphQLWebsocketHandler.this.LOG.trace((Object)("Subscription with id " + this.operationId + " completed"));
            }
            try {
                this.session.sendMessage(AbstractGraphQLWebsocketHandler.this.createCompleteMessage(this.operationId).toString());
            }
            catch (IOException e) {
                AbstractGraphQLWebsocketHandler.this.LOG.warn((Object)e);
            }
            AbstractGraphQLWebsocketHandler.this.activeOperations.remove(this.operationId);
        }

        public void cancel() {
            Subscription sub = this.subscription.get();
            if (sub != null) {
                sub.cancel();
            }
        }
    }
}

