/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.udp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.common.ChannelBridge;
import reactor.ipc.netty.common.DuplexSocket;
import reactor.ipc.netty.common.MonoChannelFuture;
import reactor.ipc.netty.common.NettyChannel;
import reactor.ipc.netty.common.NettyChannelHandler;
import reactor.ipc.netty.config.ServerOptions;
import reactor.ipc.netty.tcp.TcpChannel;
import reactor.ipc.netty.util.NettyNativeDetector;
import reactor.util.Logger;
import reactor.util.Loggers;

public final class UdpServer
extends DuplexSocket<ByteBuf, ByteBuf, NettyChannel>
implements ChannelBridge<TcpChannel> {
    public static final int DEFAULT_UDP_THREAD_COUNT = Integer.parseInt(System.getProperty("reactor.udp.ioThreadCount", "" + Schedulers.DEFAULT_POOL_SIZE));
    final Bootstrap bootstrap;
    final EventLoopGroup ioGroup;
    final InetSocketAddress listenAddress;
    final NetworkInterface multicastInterface;
    final ServerOptions options;
    volatile DatagramChannel channel;
    static final Logger log = Loggers.getLogger(UdpServer.class);
    static final AtomicLong COUNTER = new AtomicLong();

    public static UdpServer create() {
        return UdpServer.create("127.0.0.1");
    }

    public static UdpServer create(String bindAddress) {
        return UdpServer.create(bindAddress, DEFAULT_PORT);
    }

    public static UdpServer create(int port) {
        return UdpServer.create("127.0.0.1", port);
    }

    public static UdpServer create(String bindAddress, int port) {
        return UdpServer.create(ServerOptions.create().listen(bindAddress, port));
    }

    public static UdpServer create(ServerOptions options) {
        return new UdpServer(options);
    }

    UdpServer(ServerOptions options) {
        this.listenAddress = options.listenAddress();
        this.multicastInterface = options.multicastInterface();
        this.options = options.toImmutable();
        if (null != options.eventLoopGroup()) {
            this.ioGroup = options.eventLoopGroup();
        } else {
            int ioThreadCount = DEFAULT_UDP_THREAD_COUNT;
            ThreadFactory tf = r -> {
                Thread t = new Thread(r, "reactor-udp-io-" + COUNTER.incrementAndGet());
                t.setDaemon(options.daemon());
                return t;
            };
            this.ioGroup = options.protocolFamily() == null ? NettyNativeDetector.instance().newEventLoopGroup(ioThreadCount, tf) : new NioEventLoopGroup(ioThreadCount, tf);
        }
        this.bootstrap = (Bootstrap)((Bootstrap)new Bootstrap().group(this.ioGroup)).option(ChannelOption.AUTO_READ, (Object)false);
        if (options.protocolFamily() == null && NettyNativeDetector.instance().getDatagramChannel(this.ioGroup).getSimpleName().startsWith("Epoll")) {
            this.bootstrap.channel(NettyNativeDetector.instance().getDatagramChannel(this.ioGroup));
        } else {
            this.bootstrap.channelFactory(() -> new NioDatagramChannel(this.toNettyFamily(options.protocolFamily())));
        }
        ((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.option(ChannelOption.SO_RCVBUF, (Object)options.rcvbuf())).option(ChannelOption.SO_SNDBUF, (Object)options.sndbuf())).option(ChannelOption.SO_REUSEADDR, (Object)options.reuseAddr())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)Math.min(Integer.MAX_VALUE, options.timeoutMillis())));
        if (null != this.listenAddress) {
            this.bootstrap.localAddress((SocketAddress)this.listenAddress);
        } else {
            this.bootstrap.localAddress(NetUtil.LOCALHOST, 3000);
        }
        if (null != this.multicastInterface) {
            this.bootstrap.option(ChannelOption.IP_MULTICAST_IF, (Object)this.multicastInterface);
        }
    }

    public InetSocketAddress getListenAddress() {
        return this.listenAddress;
    }

    public NetworkInterface getMulticastInterface() {
        return this.multicastInterface;
    }

    public ServerOptions getOptions() {
        return this.options;
    }

    public Mono<Void> join(InetAddress multicastAddress) {
        return this.join(multicastAddress, null);
    }

    public Mono<Void> join(final InetAddress multicastAddress, NetworkInterface iface) {
        if (null == this.channel) {
            throw new IllegalStateException("UdpServer not running.");
        }
        if (null == iface && null != this.getMulticastInterface()) {
            iface = this.getMulticastInterface();
        }
        ChannelFuture future = null != iface ? this.channel.joinGroup(new InetSocketAddress(multicastAddress, this.getListenAddress().getPort()), iface) : this.channel.joinGroup(multicastAddress);
        return new MonoChannelFuture<Future<?>>((Future)future){

            @Override
            protected void doComplete(Future<?> future, Subscriber<? super Void> s) {
                log.info("JOIN {}", new Object[]{multicastAddress});
                super.doComplete(future, s);
            }
        };
    }

    public Mono<Void> leave(InetAddress multicastAddress) {
        return this.leave(multicastAddress, null);
    }

    public Mono<Void> leave(final InetAddress multicastAddress, NetworkInterface iface) {
        if (null == this.channel) {
            throw new IllegalStateException("UdpServer not running.");
        }
        if (null == iface && null != this.getMulticastInterface()) {
            iface = this.getMulticastInterface();
        }
        ChannelFuture future = null != iface ? this.channel.leaveGroup(new InetSocketAddress(multicastAddress, this.getListenAddress().getPort()), iface) : this.channel.leaveGroup(multicastAddress);
        return new MonoChannelFuture<Future<?>>((Future)future){

            @Override
            protected void doComplete(Future<?> future, Subscriber<? super Void> s) {
                log.info("LEAVE {}", new Object[]{multicastAddress});
                super.doComplete(future, s);
            }
        };
    }

    @Override
    protected Mono<Void> doStart(final Function<? super NettyChannel, ? extends Publisher<Void>> channelHandler) {
        return new Mono<Void>(){

            public void subscribe(final Subscriber<? super Void> subscriber) {
                ChannelFuture future = ((Bootstrap)UdpServer.this.bootstrap.handler((ChannelHandler)new ChannelInitializer<DatagramChannel>(){

                    public void initChannel(DatagramChannel ch) throws Exception {
                        if (null != UdpServer.this.getOptions() && null != UdpServer.this.getOptions().pipelineConfigurer()) {
                            UdpServer.this.getOptions().pipelineConfigurer().accept(ch.pipeline());
                        }
                        UdpServer.this.bindChannel(channelHandler, ch);
                    }
                })).bind();
                future.addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        subscriber.onSubscribe(Operators.emptySubscription());
                        if (future.isSuccess()) {
                            log.info("BIND {}", new Object[]{future.channel().localAddress()});
                            UdpServer.this.channel = (DatagramChannel)future.channel();
                            subscriber.onComplete();
                        } else {
                            Exceptions.throwIfFatal((Throwable)future.cause());
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        };
    }

    @Override
    protected Mono<Void> doShutdown() {
        return new MonoChannelFuture<ChannelFuture>(this.channel.close()){

            @Override
            protected void doComplete(ChannelFuture future, Subscriber<? super Void> s) {
                if (null == UdpServer.this.getOptions() || null == UdpServer.this.getOptions().eventLoopGroup()) {
                    MonoChannelFuture.from(UdpServer.this.ioGroup.shutdownGracefully()).subscribe(s);
                } else {
                    super.doComplete(future, s);
                }
            }
        };
    }

    void bindChannel(Function<? super NettyChannel, ? extends Publisher<Void>> handler, DatagramChannel ioChannel) {
        ChannelPipeline pipeline = ioChannel.pipeline();
        if (log.isDebugEnabled()) {
            pipeline.addLast("loggingHandler", (ChannelHandler)new LoggingHandler(UdpServer.class));
        }
        pipeline.addLast("reactiveBridge", new NettyChannelHandler<TcpChannel>(handler, this, (Channel)ioChannel));
    }

    InternetProtocolFamily toNettyFamily(ProtocolFamily family) {
        if (family == null) {
            return null;
        }
        switch (family.name()) {
            case "INET": {
                return InternetProtocolFamily.IPv4;
            }
            case "INET6": {
                return InternetProtocolFamily.IPv6;
            }
        }
        throw new IllegalArgumentException("Unsupported protocolFamily: " + family.name());
    }

    @Override
    public TcpChannel createChannelBridge(Channel ioChannel, Flux<Object> input, Object ... parameters) {
        return new TcpChannel(ioChannel, input);
    }
}

