/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.resume.RSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

public class ServerRSocketSession
implements RSocketSession,
ResumeStateHolder,
CoreSubscriber<Long> {
    private static final Logger logger = LoggerFactory.getLogger(ServerRSocketSession.class);
    final ResumableDuplexConnection resumableConnection;
    final Duration resumeSessionDuration;
    final ResumableFramesStore resumableFramesStore;
    final String resumeToken;
    final ByteBufAllocator allocator;
    final boolean cleanupStoreOnKeepAlive;
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<ServerRSocketSession, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ServerRSocketSession.class, Subscription.class, "s");
    KeepAliveSupport keepAliveSupport;

    public ServerRSocketSession(ByteBuf resumeToken, DuplexConnection initialDuplexConnection, ResumableDuplexConnection resumableDuplexConnection, Duration resumeSessionDuration, ResumableFramesStore resumableFramesStore, boolean cleanupStoreOnKeepAlive) {
        this.resumeToken = resumeToken.toString(CharsetUtil.UTF_8);
        this.allocator = initialDuplexConnection.alloc();
        this.resumeSessionDuration = resumeSessionDuration;
        this.resumableFramesStore = resumableFramesStore;
        this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
        this.resumableConnection = resumableDuplexConnection;
        resumableDuplexConnection.onClose().doFinally(__ -> this.dispose()).subscribe();
        this.observeDisconnection(initialDuplexConnection);
    }

    void observeDisconnection(DuplexConnection activeConnection) {
        activeConnection.onClose().subscribe(null, e -> this.tryTimeoutSession(), () -> this.tryTimeoutSession());
    }

    void tryTimeoutSession() {
        this.keepAliveSupport.stop();
        Mono.delay((Duration)this.resumeSessionDuration).subscribe((CoreSubscriber)this);
        logger.debug("Connection is lost. Trying to timeout the active session[{}]", (Object)this.resumeToken);
    }

    public synchronized Mono<Void> resumeWith(ByteBuf resumeFrame, DuplexConnection nextDuplexConnection) {
        Subscription subscription;
        long remotePos = ResumeFrameCodec.firstAvailableClientPos(resumeFrame);
        long remoteImpliedPos = ResumeFrameCodec.lastReceivedServerPos(resumeFrame);
        long impliedPosition = this.resumableFramesStore.frameImpliedPosition();
        long position = this.resumableFramesStore.framePosition();
        logger.debug("Resume FRAME received. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}, ServerResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", new Object[]{remoteImpliedPos, remotePos, impliedPosition, position});
        do {
            if ((subscription = this.s) != Operators.cancelledSubscription()) continue;
            logger.debug("Session has already been expired. Terminating received connection");
            RejectedResumeException rejectedResumeException = new RejectedResumeException("resume_internal_error: Session Expired");
            nextDuplexConnection.sendErrorAndClose(rejectedResumeException);
            return nextDuplexConnection.onClose();
        } while (!S.compareAndSet(this, subscription, null));
        subscription.cancel();
        if (remotePos <= impliedPosition && position <= remoteImpliedPos) {
            try {
                if (position != remoteImpliedPos) {
                    this.resumableFramesStore.releaseFrames(remoteImpliedPos);
                }
                nextDuplexConnection.sendFrame(0, ResumeOkFrameCodec.encode(this.allocator, this.resumableFramesStore.frameImpliedPosition()));
                logger.debug("ResumeOK Frame has been sent");
            }
            catch (Throwable t) {
                logger.debug("Exception occurred while releasing frames in the frameStore", t);
                this.resumableConnection.dispose();
                nextDuplexConnection.sendErrorAndClose(new RejectedResumeException(t.getMessage(), t));
                return nextDuplexConnection.onClose();
            }
            if (this.resumableConnection.connect(nextDuplexConnection)) {
                this.observeDisconnection(nextDuplexConnection);
                this.keepAliveSupport.start();
                logger.debug("Session[{}] has been resumed successfully", (Object)this.resumeToken);
            } else {
                logger.debug("Session has already been expired. Terminating received connection");
                ConnectionErrorException connectionErrorException = new ConnectionErrorException("resume_internal_error: Session Expired");
                nextDuplexConnection.sendErrorAndClose(connectionErrorException);
            }
        } else {
            logger.debug("Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", new Object[]{remoteImpliedPos, position, remotePos, impliedPosition});
            this.resumableConnection.dispose();
            RejectedResumeException rejectedResumeException = new RejectedResumeException(String.format("resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]", remotePos, remoteImpliedPos, position, impliedPosition));
            nextDuplexConnection.sendErrorAndClose(rejectedResumeException);
        }
        return nextDuplexConnection.onClose();
    }

    @Override
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override
    public void onImpliedPosition(long remoteImpliedPos) {
        if (this.cleanupStoreOnKeepAlive) {
            this.resumableFramesStore.releaseFrames(remoteImpliedPos);
        }
    }

    public void onSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    public void onNext(Long aLong) {
        if (!Operators.terminate(S, (Object)this)) {
            return;
        }
        this.resumableConnection.dispose();
    }

    public void onComplete() {
    }

    public void onError(Throwable t) {
    }

    @Override
    public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
        this.keepAliveSupport = keepAliveSupport;
    }

    public void dispose() {
        if (Operators.terminate(S, (Object)this)) {
            this.resumableFramesStore.dispose();
            this.resumableConnection.dispose();
        }
    }

    public boolean isDisposed() {
        return this.resumableConnection.isDisposed();
    }
}

