/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.fdx.bidirectional.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.vlingo.actors.Actor;
import io.vlingo.actors.ActorInstantiator;
import io.vlingo.actors.Logger;
import io.vlingo.actors.Stoppable;
import io.vlingo.common.Completes;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;
import io.vlingo.wire.fdx.bidirectional.ServerRequestResponseChannel;
import io.vlingo.wire.fdx.bidirectional.rsocket.RSocketChannelContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class RSocketServerChannelActor
extends Actor
implements ServerRequestResponseChannel {
    private final String name;
    private final Closeable serverSocket;
    private final Integer port;

    public RSocketServerChannelActor(RequestChannelConsumerProvider provider, ServerTransport<? extends Closeable> serverTransport, int port, String name, int maxBufferPoolSize, int messageBufferSize) {
        this.name = name;
        this.port = port;
        this.serverSocket = (Closeable)RSocketServer.create().payloadDecoder(PayloadDecoder.ZERO_COPY).acceptor((SocketAcceptor)new SocketAcceptorImpl(provider, maxBufferPoolSize, messageBufferSize, this.logger())).bind(serverTransport).block();
        if (this.serverSocket != null) {
            this.logger().info("RSocket server channel opened at port {}", new Object[]{this.port});
            this.serverSocket.onClose().doFinally(signalType -> this.logger().info("RSocket server channel closed")).subscribe();
        }
    }

    @Override
    public void close() {
        if (this.isStopped()) {
            return;
        }
        ((Stoppable)this.selfAs(Stoppable.class)).stop();
    }

    @Override
    public Completes<Integer> port() {
        return this.completes().with((Object)this.port);
    }

    public void stop() {
        if (this.serverSocket != null) {
            try {
                this.serverSocket.dispose();
            }
            catch (Exception e) {
                this.logger().error("Failed to close receive socket for: {}", new Object[]{this.name, e});
            }
        }
        super.stop();
    }

    public static class Instantiator
    implements ActorInstantiator<RSocketServerChannelActor> {
        private static final long serialVersionUID = -1999865617618138682L;
        private final RequestChannelConsumerProvider provider;
        private final ServerTransport<? extends Closeable> serverTransport;
        private final int port;
        private final String name;
        private final int maxBufferPoolSize;
        private final int messageBufferSize;

        public Instantiator(RequestChannelConsumerProvider provider, ServerTransport<? extends Closeable> serverTransport, int port, String name, int maxBufferPoolSize, int messageBufferSize) {
            this.provider = provider;
            this.serverTransport = serverTransport;
            this.port = port;
            this.name = name;
            this.maxBufferPoolSize = maxBufferPoolSize;
            this.messageBufferSize = messageBufferSize;
        }

        public RSocketServerChannelActor instantiate() {
            return new RSocketServerChannelActor(this.provider, this.serverTransport, this.port, this.name, this.maxBufferPoolSize, this.messageBufferSize);
        }

        public Class<RSocketServerChannelActor> type() {
            return RSocketServerChannelActor.class;
        }
    }

    private static class SocketAcceptorImpl
    implements SocketAcceptor {
        private final RSocket acceptor;

        private SocketAcceptorImpl(final RequestChannelConsumerProvider consumerProvider, final int maxBufferPoolSize, final int maxMessageSize, final Logger logger) {
            this.acceptor = new AbstractRSocket(){

                public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                    RSocketChannelContext context = new RSocketChannelContext(consumerProvider, maxBufferPoolSize, maxMessageSize, logger);
                    Flux.from(payloads).subscribeOn(Schedulers.single()).doOnNext(context::consume).doOnError(throwable -> logger.error("Unexpected error when consuming channel request", throwable)).subscribe();
                    return Flux.from(context.processor());
                }
            };
        }

        public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
            return Mono.just((Object)this.acceptor);
        }
    }
}

