/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.websocket;

import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.convert.value.ConvertibleValues;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.KotlinUtils;
import io.micronaut.http.HttpAttributes;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.bind.binders.ContinuationArgumentBinder;
import io.micronaut.http.context.ServerRequestContext;
import io.micronaut.http.netty.websocket.AbstractNettyWebSocketHandler;
import io.micronaut.http.netty.websocket.NettyWebSocketSession;
import io.micronaut.http.netty.websocket.WebSocketSessionRepository;
import io.micronaut.http.server.CoroutineHelper;
import io.micronaut.http.server.netty.NettyEmbeddedServices;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.MethodExecutionHandle;
import io.micronaut.web.router.UriRouteMatch;
import io.micronaut.websocket.CloseReason;
import io.micronaut.websocket.WebSocketPongMessage;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.bind.WebSocketState;
import io.micronaut.websocket.context.WebSocketBean;
import io.micronaut.websocket.event.WebSocketMessageProcessedEvent;
import io.micronaut.websocket.event.WebSocketSessionClosedEvent;
import io.micronaut.websocket.event.WebSocketSessionOpenEvent;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import java.security.Principal;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.ContextView;

@Internal
public class NettyServerWebSocketHandler
extends AbstractNettyWebSocketHandler {
    public static final String ID = "websocket-handler";
    private final NettyWebSocketSession serverSession;
    private final NettyEmbeddedServices nettyEmbeddedServices;
    @Nullable
    private final CoroutineHelper coroutineHelper;
    private final Argument<?> bodyArgument;
    private final Argument<?> pongArgument;

    NettyServerWebSocketHandler(NettyEmbeddedServices nettyEmbeddedServices, WebSocketSessionRepository webSocketSessionRepository, WebSocketServerHandshaker handshaker, WebSocketBean<?> webSocketBean, HttpRequest<?> request, UriRouteMatch<Object, Object> routeMatch, ChannelHandlerContext ctx, @Nullable CoroutineHelper coroutineHelper) {
        block14: {
            List<Argument<?>> unboundArguments;
            BoundExecutable bound;
            super(ctx, nettyEmbeddedServices.getRequestArgumentSatisfier().getBinderRegistry(), nettyEmbeddedServices.getMediaTypeCodecRegistry(), webSocketBean, request, routeMatch.getVariableValues(), handshaker.version(), handshaker.selectedSubprotocol(), webSocketSessionRepository, nettyEmbeddedServices.getApplicationContext().getConversionService());
            this.serverSession = this.createWebSocketSession(ctx);
            DefaultExecutableBinder<WebSocketState> binder = new DefaultExecutableBinder<WebSocketState>();
            if (this.messageHandler != null) {
                bound = binder.tryBind(this.messageHandler.getExecutableMethod(), this.webSocketBinder, new WebSocketState((WebSocketSession)this.serverSession, this.originatingRequest));
                unboundArguments = bound.getUnboundArguments();
                if (unboundArguments.size() == 1) {
                    this.bodyArgument = unboundArguments.iterator().next();
                } else {
                    this.bodyArgument = null;
                    if (this.LOG.isErrorEnabled()) {
                        this.LOG.error("WebSocket @OnMessage method " + webSocketBean.getTarget() + "." + this.messageHandler.getExecutableMethod() + " should define exactly 1 message parameter, but found 2 possible candidates: " + unboundArguments);
                    }
                    if (this.serverSession.isOpen()) {
                        this.serverSession.close(CloseReason.INTERNAL_ERROR);
                    }
                }
            } else {
                this.bodyArgument = null;
            }
            if (this.pongHandler != null) {
                bound = binder.tryBind(this.pongHandler.getExecutableMethod(), this.webSocketBinder, new WebSocketState((WebSocketSession)this.serverSession, this.originatingRequest));
                unboundArguments = bound.getUnboundArguments();
                if (unboundArguments.size() == 1 && unboundArguments.get(0).isAssignableFrom(WebSocketPongMessage.class)) {
                    this.pongArgument = unboundArguments.get(0);
                } else {
                    this.pongArgument = null;
                    if (this.LOG.isErrorEnabled()) {
                        this.LOG.error("WebSocket @OnMessage pong handler method " + webSocketBean.getTarget() + "." + this.pongHandler.getExecutableMethod() + " should define exactly 1 message parameter assignable from a WebSocketPongMessage, but found: " + unboundArguments);
                    }
                    if (this.serverSession.isOpen()) {
                        this.serverSession.close(CloseReason.INTERNAL_ERROR);
                    }
                }
            } else {
                this.pongArgument = null;
            }
            this.nettyEmbeddedServices = nettyEmbeddedServices;
            this.coroutineHelper = coroutineHelper;
            request.setAttribute(HttpAttributes.ROUTE_MATCH, routeMatch);
            Flux.from(this.callOpenMethod(ctx)).subscribe(v -> {}, t -> this.forwardErrorToUser(ctx, e -> {
                if (this.LOG.isErrorEnabled()) {
                    this.LOG.error("Error Opening WebSocket [" + webSocketBean + "]: " + e.getMessage(), (Throwable)e);
                }
            }, (Throwable)t));
            ApplicationEventPublisher<WebSocketSessionOpenEvent> eventPublisher = nettyEmbeddedServices.getEventPublisher(WebSocketSessionOpenEvent.class);
            try {
                eventPublisher.publishEvent(new WebSocketSessionOpenEvent((WebSocketSession)this.serverSession));
            }
            catch (Exception e) {
                if (!this.LOG.isErrorEnabled()) break block14;
                this.LOG.error("Error publishing WebSocket opened event: " + e.getMessage(), e);
            }
        }
    }

    @Override
    public NettyWebSocketSession getSession() {
        return this.serverSession;
    }

    @Override
    public Argument<?> getBodyArgument() {
        return this.bodyArgument;
    }

    @Override
    public Argument<?> getPongArgument() {
        return this.pongArgument;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            this.writeCloseFrameAndTerminate(ctx, CloseReason.GOING_AWAY);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public boolean acceptInboundMessage(Object msg) {
        return msg instanceof WebSocketFrame;
    }

    @Override
    protected NettyWebSocketSession createWebSocketSession(final ChannelHandlerContext ctx) {
        String id = (String)this.originatingRequest.getHeaders().get(HttpHeaderNames.SEC_WEBSOCKET_KEY);
        Channel channel = ctx.channel();
        NettyWebSocketSession session = new NettyWebSocketSession(id, channel, this.originatingRequest, this.mediaTypeCodecRegistry, this.webSocketVersion.toHttpHeaderValue(), ctx.pipeline().get(SslHandler.class) != null){
            private final ConvertibleValues<Object> uriVars;
            {
                super(id, channel, request, codecRegistry, protocolVersion, isSecure);
                this.uriVars = ConvertibleValues.of(NettyServerWebSocketHandler.this.uriVariables);
            }

            public Optional<String> getSubprotocol() {
                return Optional.ofNullable(NettyServerWebSocketHandler.this.subProtocol);
            }

            @Override
            public Set<? extends WebSocketSession> getOpenSessions() {
                return NettyServerWebSocketHandler.this.webSocketSessionRepository.getChannelGroup().stream().flatMap(ch -> {
                    NettyWebSocketSession s = ch.attr(NettyWebSocketSession.WEB_SOCKET_SESSION_KEY).get();
                    if (s != null && s.isOpen()) {
                        return Stream.of(s);
                    }
                    return Stream.empty();
                }).collect(Collectors.toSet());
            }

            @Override
            public void close(CloseReason closeReason) {
                super.close(closeReason);
                NettyServerWebSocketHandler.this.webSocketSessionRepository.removeChannel(ctx.channel());
            }

            public Optional<Principal> getUserPrincipal() {
                return NettyServerWebSocketHandler.this.originatingRequest.getAttribute(HttpAttributes.PRINCIPAL, Principal.class);
            }

            public ConvertibleValues<Object> getUriVariables() {
                return this.uriVars;
            }
        };
        this.webSocketSessionRepository.addChannel(channel);
        return session;
    }

    @Override
    protected Publisher<?> instrumentPublisher(ChannelHandlerContext ctx, Object result) {
        Publisher actual = Publishers.convertPublisher(this.conversionService, result, Publisher.class);
        Publisher traced = subscriber -> ServerRequestContext.with(this.originatingRequest, () -> actual.subscribe(new Subscriber<Object>(){

            @Override
            public void onSubscribe(Subscription s) {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, () -> subscriber.onSubscribe(s));
            }

            @Override
            public void onNext(Object object) {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, () -> subscriber.onNext(object));
            }

            @Override
            public void onError(Throwable t) {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, () -> subscriber.onError(t));
            }

            @Override
            public void onComplete() {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, subscriber::onComplete);
            }
        }));
        return Flux.from(traced).subscribeOn(Schedulers.fromExecutorService(ctx.channel().eventLoop()));
    }

    @Override
    protected Object invokeExecutable(BoundExecutable boundExecutable, MethodExecutionHandle<?, ?> messageHandler) {
        ExecutableMethod executableMethod;
        Executable target;
        if (this.coroutineHelper != null && (target = boundExecutable.getTarget()) instanceof ExecutableMethod && (executableMethod = (ExecutableMethod)target).isSuspend()) {
            return Flux.deferContextual(ctx -> {
                try {
                    this.coroutineHelper.setupCoroutineContext(this.originatingRequest, (ContextView)ctx, PropagatedContext.getOrEmpty());
                    Object immediateReturnValue = this.invokeExecutable0(boundExecutable, messageHandler);
                    if (KotlinUtils.isKotlinCoroutineSuspended(immediateReturnValue)) {
                        return Mono.fromCompletionStage(ContinuationArgumentBinder.extractContinuationCompletableFutureSupplier(this.originatingRequest));
                    }
                    return Mono.empty();
                }
                catch (Exception e) {
                    return Flux.error(e);
                }
            });
        }
        return this.invokeExecutable0(boundExecutable, messageHandler);
    }

    private Object invokeExecutable0(BoundExecutable boundExecutable, MethodExecutionHandle<?, ?> messageHandler) {
        return ServerRequestContext.with(this.originatingRequest, () -> boundExecutable.invoke(messageHandler.getTarget()));
    }

    @Override
    protected void messageHandled(ChannelHandlerContext ctx, Object message) {
        ctx.executor().execute(() -> {
            block2: {
                try {
                    this.nettyEmbeddedServices.getEventPublisher(WebSocketMessageProcessedEvent.class).publishEvent(new WebSocketMessageProcessedEvent((WebSocketSession)this.getSession(), message));
                }
                catch (Exception e) {
                    if (!this.LOG.isErrorEnabled()) break block2;
                    this.LOG.error("Error publishing WebSocket message processed event: " + e.getMessage(), e);
                }
            }
        });
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        block3: {
            Channel channel = ctx.channel();
            channel.attr(NettyWebSocketSession.WEB_SOCKET_SESSION_KEY).set(null);
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Removing WebSocket Server session: {}", (Object)this.serverSession);
            }
            this.webSocketSessionRepository.removeChannel(channel);
            try {
                this.nettyEmbeddedServices.getEventPublisher(WebSocketSessionClosedEvent.class).publishEvent(new WebSocketSessionClosedEvent((WebSocketSession)this.serverSession));
            }
            catch (Exception e) {
                if (!this.LOG.isErrorEnabled()) break block3;
                this.LOG.error("Error publishing WebSocket closed event: " + e.getMessage(), e);
            }
        }
        super.handlerRemoved(ctx);
    }
}

