/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.core.ClientServerInputMultiplexer;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.exceptions.UnsupportedSetupException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ServerRSocketSession;
import io.rsocket.resume.SessionManager;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Mono;

abstract class ServerSetup {
    ServerSetup() {
    }

    abstract Mono<Void> acceptRSocketSetup(ByteBuf var1, ClientServerInputMultiplexer var2, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> var3);

    abstract Mono<Void> acceptRSocketResume(ByteBuf var1, ClientServerInputMultiplexer var2);

    void dispose() {
    }

    Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
        DuplexConnection duplexConnection = multiplexer.asSetupConnection();
        return duplexConnection.sendOne(ErrorFrameCodec.encode(duplexConnection.alloc(), 0, exception)).onErrorResume(err -> Mono.empty());
    }

    static class ResumableServerSetup
    extends ServerSetup {
        private final SessionManager sessionManager;
        private final Duration resumeSessionDuration;
        private final Duration resumeStreamTimeout;
        private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
        private final boolean cleanupStoreOnKeepAlive;

        ResumableServerSetup(SessionManager sessionManager, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory, boolean cleanupStoreOnKeepAlive) {
            this.sessionManager = sessionManager;
            this.resumeSessionDuration = resumeSessionDuration;
            this.resumeStreamTimeout = resumeStreamTimeout;
            this.resumeStoreFactory = resumeStoreFactory;
            this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
        }

        @Override
        public Mono<Void> acceptRSocketSetup(ByteBuf frame, ClientServerInputMultiplexer multiplexer, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> then) {
            if (SetupFrameCodec.resumeEnabled(frame)) {
                ByteBuf resumeToken = SetupFrameCodec.resumeToken(frame);
                ResumableDuplexConnection connection = this.sessionManager.save(new ServerRSocketSession(multiplexer.asClientServerConnection(), this.resumeSessionDuration, this.resumeStreamTimeout, this.resumeStoreFactory, resumeToken, this.cleanupStoreOnKeepAlive)).resumableConnection();
                return then.apply(new KeepAliveHandler.ResumableKeepAliveHandler(connection), new ClientServerInputMultiplexer(connection));
            }
            return then.apply(new KeepAliveHandler.DefaultKeepAliveHandler(multiplexer), multiplexer);
        }

        @Override
        public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer) {
            ServerRSocketSession session = this.sessionManager.get(ResumeFrameCodec.token(frame));
            if (session != null) {
                return session.continueWith(multiplexer.asClientServerConnection()).resumeWith(frame).onClose().then();
            }
            return this.sendError(multiplexer, new RejectedResumeException("unknown resume token")).doFinally(s -> {
                frame.release();
                multiplexer.dispose();
            });
        }

        @Override
        public void dispose() {
            this.sessionManager.dispose();
        }
    }

    static class DefaultServerSetup
    extends ServerSetup {
        DefaultServerSetup() {
        }

        @Override
        public Mono<Void> acceptRSocketSetup(ByteBuf frame, ClientServerInputMultiplexer multiplexer, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> then) {
            if (SetupFrameCodec.resumeEnabled(frame)) {
                return this.sendError(multiplexer, new UnsupportedSetupException("resume not supported")).doFinally(signalType -> {
                    frame.release();
                    multiplexer.dispose();
                });
            }
            return then.apply(new KeepAliveHandler.DefaultKeepAliveHandler(multiplexer), multiplexer);
        }

        @Override
        public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer) {
            return this.sendError(multiplexer, new RejectedResumeException("resume not supported")).doFinally(signalType -> {
                frame.release();
                multiplexer.dispose();
            });
        }
    }
}

