/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.web.reactive.socket.adapter;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

public abstract class AbstractListenerWebSocketSession<T>
extends AbstractWebSocketSession<T>
implements Subscriber<Void> {
    private static final int RECEIVE_BUFFER_SIZE = 8192;
    @Nullable
    private final Sinks.Empty<Void> handlerCompletionSink;
    private final WebSocketReceivePublisher receivePublisher;
    @Nullable
    private volatile WebSocketSendProcessor sendProcessor;
    private final AtomicBoolean sendCalled = new AtomicBoolean();
    private final Sinks.One<CloseStatus> closeStatusSink = Sinks.one();

    public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
        this(delegate, id, info, bufferFactory, null);
    }

    public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory, @Nullable Sinks.Empty<Void> handlerCompletionSink) {
        super(delegate, id, info, bufferFactory);
        this.receivePublisher = new WebSocketReceivePublisher();
        this.handlerCompletionSink = handlerCompletionSink;
    }

    protected WebSocketSendProcessor getSendProcessor() {
        WebSocketSendProcessor sendProcessor = this.sendProcessor;
        Assert.state((sendProcessor != null ? 1 : 0) != 0, (String)"No WebSocketSendProcessor available");
        return sendProcessor;
    }

    @Override
    public Flux<WebSocketMessage> receive() {
        return this.canSuspendReceiving() ? Flux.from((Publisher)this.receivePublisher) : Flux.from((Publisher)this.receivePublisher).onBackpressureBuffer(8192);
    }

    @Override
    public Mono<Void> send(Publisher<WebSocketMessage> messages) {
        if (this.sendCalled.compareAndSet(false, true)) {
            WebSocketSendProcessor sendProcessor;
            this.sendProcessor = sendProcessor = new WebSocketSendProcessor();
            return Mono.from(subscriber -> {
                messages.subscribe((Subscriber)sendProcessor);
                sendProcessor.subscribe(subscriber);
            });
        }
        return Mono.error((Throwable)new IllegalStateException("send() has already been called"));
    }

    @Override
    public Mono<CloseStatus> closeStatus() {
        return this.closeStatusSink.asMono();
    }

    protected abstract boolean canSuspendReceiving();

    protected abstract void suspendReceiving();

    protected abstract void resumeReceiving();

    protected abstract boolean sendMessage(WebSocketMessage var1) throws IOException;

    void handleMessage(WebSocketMessage.Type type, WebSocketMessage message) {
        this.receivePublisher.handleMessage(message);
    }

    void handleError(Throwable ex) {
        this.closeStatusSink.tryEmitEmpty();
        this.receivePublisher.onError(ex);
        WebSocketSendProcessor sendProcessor = this.sendProcessor;
        if (sendProcessor != null) {
            sendProcessor.cancel();
            sendProcessor.onError(ex);
        }
    }

    void handleClose(CloseStatus closeStatus) {
        this.closeStatusSink.tryEmitValue((Object)closeStatus);
        this.receivePublisher.onAllDataRead();
        WebSocketSendProcessor sendProcessor = this.sendProcessor;
        if (sendProcessor != null) {
            sendProcessor.cancel();
            sendProcessor.onComplete();
        }
    }

    public void onSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    public void onNext(Void aVoid) {
    }

    public void onError(Throwable ex) {
        if (this.handlerCompletionSink != null) {
            this.handlerCompletionSink.tryEmitError(ex);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)"WebSocket session completed with error", ex);
        } else if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("WebSocket session completed with error: " + ex.getMessage()));
        }
        this.close(CloseStatus.SERVER_ERROR);
    }

    public void onComplete() {
        if (this.handlerCompletionSink != null) {
            this.handlerCompletionSink.tryEmitEmpty();
        }
        this.close();
    }

    private final class WebSocketReceivePublisher
    extends AbstractListenerReadPublisher<WebSocketMessage> {
        private volatile Queue<Object> pendingMessages;

        WebSocketReceivePublisher() {
            super(AbstractListenerWebSocketSession.this.getLogPrefix());
            this.pendingMessages = (Queue)Queues.unbounded((int)Queues.SMALL_BUFFER_SIZE).get();
        }

        protected void checkOnDataAvailable() {
            AbstractListenerWebSocketSession.this.resumeReceiving();
            int size = this.pendingMessages.size();
            if (rsReadLogger.isTraceEnabled()) {
                rsReadLogger.trace((Object)(this.getLogPrefix() + "checkOnDataAvailable (" + size + " pending)"));
            }
            if (size > 0) {
                this.onDataAvailable();
            }
        }

        protected void readingPaused() {
            AbstractListenerWebSocketSession.this.suspendReceiving();
        }

        @Nullable
        protected WebSocketMessage read() {
            return (WebSocketMessage)this.pendingMessages.poll();
        }

        void handleMessage(WebSocketMessage message) {
            if (AbstractListenerWebSocketSession.this.logger.isTraceEnabled()) {
                AbstractListenerWebSocketSession.this.logger.trace((Object)(this.getLogPrefix() + "Received " + message));
            } else if (rsReadLogger.isTraceEnabled()) {
                rsReadLogger.trace((Object)(this.getLogPrefix() + "Received " + message));
            }
            if (!this.pendingMessages.offer(message)) {
                this.discardData();
                throw new IllegalStateException("Too many messages. Please ensure WebSocketSession.receive() is subscribed to.");
            }
            this.onDataAvailable();
        }

        protected void discardData() {
            Queue<Object> queue = this.pendingMessages;
            this.pendingMessages = (Queue)Queues.empty().get();
            WebSocketMessage message;
            while ((message = (WebSocketMessage)queue.poll()) != null) {
                message.release();
            }
            return;
        }
    }

    protected final class WebSocketSendProcessor
    extends AbstractListenerWriteProcessor<WebSocketMessage> {
        private volatile boolean isReady;

        WebSocketSendProcessor() {
            super(AbstractListenerWebSocketSession.this.receivePublisher.getLogPrefix());
            this.isReady = true;
        }

        protected boolean write(WebSocketMessage message) throws IOException {
            if (AbstractListenerWebSocketSession.this.logger.isTraceEnabled()) {
                AbstractListenerWebSocketSession.this.logger.trace((Object)(this.getLogPrefix() + "Sending " + message));
            } else if (rsWriteLogger.isTraceEnabled()) {
                rsWriteLogger.trace((Object)(this.getLogPrefix() + "Sending " + message));
            }
            return AbstractListenerWebSocketSession.this.sendMessage(message);
        }

        protected boolean isDataEmpty(WebSocketMessage message) {
            return message.getPayload().readableByteCount() == 0;
        }

        protected boolean isWritePossible() {
            return this.isReady;
        }

        public void setReadyToSend(boolean ready) {
            if (ready && rsWriteLogger.isTraceEnabled()) {
                rsWriteLogger.trace((Object)(this.getLogPrefix() + "Ready to send"));
            }
            this.isReady = ready;
        }

        protected void discardData(WebSocketMessage message) {
            message.release();
        }
    }
}

