/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.fdx.inbound.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.vlingo.actors.Logger;
import io.vlingo.wire.channel.ChannelMessageDispatcher;
import io.vlingo.wire.channel.ChannelReader;
import io.vlingo.wire.channel.ChannelReaderConsumer;
import io.vlingo.wire.message.RawMessageBuilder;
import java.nio.ByteBuffer;
import reactor.core.publisher.Mono;

public class RSocketChannelInboundReader
implements ChannelReader,
ChannelMessageDispatcher {
    private final Logger logger;
    private final String name;
    private final int port;
    private final int maxMessageSize;
    private Closeable serverSocket;
    private ChannelReaderConsumer consumer;
    private final ServerTransport<? extends Closeable> serverTransport;

    public RSocketChannelInboundReader(ServerTransport<? extends Closeable> serverTransport, int port, String name, int maxMessageSize, Logger logger) {
        this.logger = logger;
        this.name = name;
        this.port = port;
        this.maxMessageSize = maxMessageSize;
        this.serverTransport = serverTransport;
    }

    @Override
    public ChannelReaderConsumer consumer() {
        return this.consumer;
    }

    @Override
    public Logger logger() {
        return this.logger;
    }

    @Override
    public void close() {
        if (this.serverSocket != null && !this.serverSocket.isDisposed()) {
            try {
                this.serverSocket.dispose();
            }
            catch (Throwable t) {
                this.logger.error("Unexpected error on closing inbound channel {}", new Object[]{this.name, t});
            }
        }
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public int port() {
        return this.port;
    }

    @Override
    public void openFor(ChannelReaderConsumer consumer) {
        this.consumer = consumer;
        if (this.serverSocket != null && !this.serverSocket.isDisposed()) {
            this.serverSocket.dispose();
        }
        this.serverSocket = (Closeable)RSocketServer.create().payloadDecoder(PayloadDecoder.ZERO_COPY).acceptor((SocketAcceptor)new SocketAcceptorImpl(this, this.name, this.maxMessageSize, this.logger)).bind(this.serverTransport).doOnError(throwable -> this.logger.error("Failed to create RSocket inbound channel {} at port {}", new Object[]{this.name, this.port, throwable})).block();
        if (this.serverSocket != null) {
            this.serverSocket.onClose().doFinally(signalType -> this.logger.info("RSocket inbound channel {} at port {} is closed", new Object[]{this.name, this.port})).subscribe(ignored -> {}, throwable -> this.logger.error("Unexpected error on closing inbound channel {}", new Object[]{this.name, throwable}));
            this.logger().info("RSocket inbound channel {} opened at port {}", new Object[]{this.name, this.port});
        }
    }

    @Override
    public void probeChannel() {
    }

    private static class SocketAcceptorImpl
    implements SocketAcceptor {
        private final ChannelMessageDispatcher dispatcher;
        private final String name;
        private final int maxMessageSize;
        private final Logger logger;

        private SocketAcceptorImpl(ChannelMessageDispatcher dispatcher, String name, int maxMessageSize, Logger logger) {
            this.dispatcher = dispatcher;
            this.name = name;
            this.maxMessageSize = maxMessageSize;
            this.logger = logger;
        }

        public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
            RawMessageBuilder rawMessageBuilder = new RawMessageBuilder(this.maxMessageSize);
            return Mono.just((Object)this.buildAcceptor(rawMessageBuilder));
        }

        private AbstractRSocket buildAcceptor(final RawMessageBuilder rawMessageBuilder) {
            return new AbstractRSocket(){

                public Mono<Void> fireAndForget(Payload payload) {
                    logger.trace("Message received on inbound channel {}", new Object[]{name});
                    try {
                        ByteBuffer payloadData = payload.getData();
                        rawMessageBuilder.workBuffer().put(payloadData);
                        dispatcher.dispatchMessagesFor(rawMessageBuilder);
                    }
                    catch (Throwable t) {
                        logger.error("Unexpected error in inbound channel {}. Message ignored.", new Object[]{name, t});
                    }
                    finally {
                        payload.release();
                        rawMessageBuilder.workBuffer().clear();
                    }
                    return Mono.empty();
                }
            };
        }
    }
}

