/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.MessageCodec;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.ExceptionHandler;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableServer;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

public final class TransportImpl
implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
    private final TransportConfig config;
    private final LoopResources loopResources;
    private final DirectProcessor<Message> messagesSubject;
    private final FluxSink<Message> messageSink;
    private final Map<Address, Mono<? extends Connection>> connections;
    private final ExceptionHandler exceptionHandler;
    private final TransportChannelInitializer channelInitializer;
    private final MonoProcessor<Void> stop;
    private final MonoProcessor<Void> onStop;
    private final Address address;
    private final DisposableServer server;
    private final MessageCodec messageCodec;

    public TransportImpl(TransportConfig config) {
        this.config = config;
        this.loopResources = LoopResources.create((String)"sc-cluster-io", (int)1, (boolean)true);
        this.messagesSubject = DirectProcessor.create();
        this.messageSink = this.messagesSubject.sink();
        this.connections = new ConcurrentHashMap<Address, Mono<? extends Connection>>();
        this.exceptionHandler = new ExceptionHandler();
        this.channelInitializer = new TransportChannelInitializer();
        this.stop = MonoProcessor.create();
        this.onStop = MonoProcessor.create();
        this.messageCodec = config.messageCodec();
        this.address = null;
        this.server = null;
    }

    private TransportImpl(DisposableServer server, TransportImpl other) {
        this.server = server;
        this.address = TransportImpl.prepareAddress(server);
        this.config = other.config;
        this.loopResources = other.loopResources;
        this.messagesSubject = other.messagesSubject;
        this.messageSink = other.messageSink;
        this.connections = other.connections;
        this.exceptionHandler = other.exceptionHandler;
        this.channelInitializer = other.channelInitializer;
        this.stop = other.stop;
        this.onStop = other.onStop;
        this.messageCodec = other.messageCodec;
        this.stop.then(this.doStop()).doFinally(s -> this.onStop.onComplete()).subscribe(null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", (Object)this.address, (Object)ex.toString()));
    }

    private static Address prepareAddress(DisposableServer server) {
        InetAddress address = server.address().getAddress();
        int port = server.address().getPort();
        if (address.isAnyLocalAddress()) {
            return Address.create((String)Address.getLocalIpAddress().getHostAddress(), (int)port);
        }
        return Address.create((String)address.getHostAddress(), (int)port);
    }

    public static Transport bindAwait() {
        return TransportImpl.bindAwait(TransportConfig.defaultConfig());
    }

    public static Transport bindAwait(TransportConfig config) {
        try {
            return (Transport)TransportImpl.bind(config).block();
        }
        catch (Exception e) {
            throw Exceptions.propagate((Throwable)(e.getCause() != null ? e.getCause() : e));
        }
    }

    public static Mono<Transport> bind() {
        return TransportImpl.bind(TransportConfig.defaultConfig());
    }

    public static Mono<Transport> bind(TransportConfig config) {
        return new TransportImpl(config).bind0();
    }

    public Mono<Transport> bind0() {
        return this.newTcpServer().handle(this::onMessage).bind().doOnSuccess(t -> LOGGER.info("[bind0][{}] Bound cluster transport", (Object)t.address())).doOnError(ex -> LOGGER.error("[bind0][{}] Exception occurred: {}", (Object)this.config.port(), (Object)ex.toString())).map(server -> new TransportImpl((DisposableServer)server, this)).cast(Transport.class);
    }

    public Address address() {
        return this.address;
    }

    public boolean isStopped() {
        return this.onStop.isDisposed();
    }

    public final Mono<Void> stop() {
        return Mono.defer(() -> {
            this.stop.onComplete();
            return this.onStop;
        });
    }

    private Mono<Void> doStop() {
        return Mono.defer(() -> {
            LOGGER.info("[{}][doStop] Stopping", (Object)this.address);
            this.messageSink.complete();
            return Flux.concatDelayError((Publisher[])new Publisher[]{this.closeServer(), this.shutdownLoopResources()}).then().doFinally(s -> this.connections.clear()).doOnSuccess(avoid -> LOGGER.info("[{}][doStop] Stopped", (Object)this.address));
        });
    }

    public final Flux<Message> listen() {
        return this.messagesSubject.onBackpressureBuffer();
    }

    public Mono<Void> send(Address address, Message message) {
        return this.connections.computeIfAbsent(address, this::connect0).map(Connection::outbound).flatMap(out -> out.send((Publisher)Mono.just((Object)message).map(this::toByteBuf), bb -> true).then()).then();
    }

    public Mono<Message> requestResponse(Address address, Message request) {
        return Mono.create(sink -> {
            Objects.requireNonNull(request, "request must be not null");
            Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
            Disposable receive = this.listen().filter(resp -> resp.correlationId() != null).filter(resp -> resp.correlationId().equals(request.correlationId())).take(1L).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0), () -> ((MonoSink)sink).success());
            Disposable send = this.send(address, request).subscribe(null, ex -> {
                receive.dispose();
                sink.error(ex);
            });
            sink.onDispose((Disposable)Disposables.composite((Disposable[])new Disposable[]{send, receive}));
        });
    }

    private Mono<Void> onMessage(NettyInbound in, NettyOutbound out) {
        return in.receive().retain().map(this::toMessage).doOnNext(arg_0 -> this.messageSink.next(arg_0)).then();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Message toMessage(ByteBuf byteBuf) {
        try (ByteBufInputStream stream = new ByteBufInputStream(byteBuf, true);){
            Message message = this.messageCodec.deserialize((InputStream)stream);
            return message;
        }
        catch (Exception e) {
            LOGGER.warn("[{}][toMessage] Exception occurred: {}", (Object)this.address, (Object)e.toString());
            throw new DecoderException((Throwable)e);
        }
    }

    private ByteBuf toByteBuf(Message message) {
        ByteBuf bb = ByteBufAllocator.DEFAULT.buffer();
        ByteBufOutputStream stream = new ByteBufOutputStream(bb);
        try {
            this.messageCodec.serialize(message, (OutputStream)stream);
        }
        catch (Exception e) {
            bb.release();
            LOGGER.warn("[{}][toByteBuf] Exception occurred: {}", (Object)this.address, (Object)e.toString());
            throw new EncoderException((Throwable)e);
        }
        return bb;
    }

    private Mono<? extends Connection> connect0(Address address1) {
        return this.newTcpClient(address1).doOnDisconnected(c -> {
            LOGGER.debug("[{}][disconnected][{}] Channel: {}", new Object[]{this.address, address1, c.channel()});
            this.connections.remove(address1);
        }).doOnConnected(c -> LOGGER.debug("[{}][connected][{}] Channel: {}", new Object[]{this.address, address1, c.channel()})).connect().doOnError(th -> {
            LOGGER.debug("[{}][connect0][{}] Exception occurred: {}", new Object[]{this.address, address1, th.toString()});
            this.connections.remove(address1);
        }).cache();
    }

    private Mono<Void> closeServer() {
        return Mono.defer(() -> {
            if (this.server == null) {
                return Mono.empty();
            }
            LOGGER.info("[{}][closeServer] Closing server channel", (Object)this.address);
            return Mono.fromRunnable(() -> ((DisposableServer)this.server).dispose()).then(this.server.onDispose()).doOnSuccess(avoid -> LOGGER.info("[{}][closeServer] Closed server channel", (Object)this.address)).doOnError(e -> LOGGER.warn("[{}][closeServer] Exception occurred: {}", (Object)this.address, (Object)e.toString()));
        });
    }

    private Mono<Void> shutdownLoopResources() {
        return Mono.fromRunnable(() -> ((LoopResources)this.loopResources).dispose()).then(this.loopResources.disposeLater());
    }

    private TcpServer newTcpServer() {
        TcpServer tcpServer = TcpServer.create().runOn(this.loopResources).option(ChannelOption.TCP_NODELAY, (Object)true).option(ChannelOption.SO_KEEPALIVE, (Object)true).option(ChannelOption.SO_REUSEADDR, (Object)true).port(this.config.port());
        if (this.config.host() != null) {
            tcpServer = tcpServer.host(this.config.host());
        }
        return tcpServer.bootstrap(b -> BootstrapHandlers.updateConfiguration((ServerBootstrap)b, (String)"inbound", (BiConsumer)this.channelInitializer));
    }

    private TcpClient newTcpClient(Address address) {
        return TcpClient.create((ConnectionProvider)ConnectionProvider.newConnection()).runOn(this.loopResources).host(address.host()).port(address.port()).option(ChannelOption.TCP_NODELAY, (Object)true).option(ChannelOption.SO_KEEPALIVE, (Object)true).option(ChannelOption.SO_REUSEADDR, (Object)true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.config.connectTimeout()).bootstrap(b -> BootstrapHandlers.updateConfiguration((Bootstrap)b, (String)"outbound", (BiConsumer)this.channelInitializer));
    }

    private final class TransportChannelInitializer
    implements BiConsumer<ConnectionObserver, Channel> {
        private static final int LENGTH_FIELD_LENGTH = 4;

        private TransportChannelInitializer() {
        }

        @Override
        public void accept(ConnectionObserver connectionObserver, Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(4)});
            pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(TransportImpl.this.config.maxFrameLength(), 0, 4, 0, 4)});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }
}

