/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Loopback;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.common.NettyChannel;
import reactor.ipc.netty.common.NettyChannelHandler;
import reactor.ipc.netty.common.NettyOutbound;

public class TcpChannel
extends Mono<Void>
implements NettyChannel,
Loopback,
Trackable {
    final Channel ioChannel;
    final Flux<Object> input;
    volatile boolean flushEach;

    public TcpChannel(Channel ioChannel, Flux<Object> input) {
        this.input = input;
        this.ioChannel = ioChannel;
    }

    @Override
    public Mono<Void> send(Publisher<? extends ByteBuf> dataStream) {
        return new PostWritePublisher(dataStream);
    }

    @Override
    public Mono<Void> sendObject(Publisher<?> dataStream) {
        return new PostWritePublisher(dataStream);
    }

    public Flux<Object> receiveObject() {
        return this.input;
    }

    public Object connectedInput() {
        return this.input;
    }

    public Object connectedOutput() {
        String dst;
        Channel parent = this.ioChannel.parent();
        SocketAddress remote = this.ioChannel.remoteAddress();
        SocketAddress local = this.ioChannel.localAddress();
        String src = local != null ? local.toString() : "";
        String string = dst = remote != null ? remote.toString() : "";
        if (parent == null) {
            String _src = src;
            src = dst;
            dst = _src;
        }
        return src.replaceFirst("localhost", "") + ":" + dst.replaceFirst("localhost", "");
    }

    @Override
    public void subscribe(Subscriber<? super Void> subscriber) {
        Mono.empty().subscribe(subscriber);
    }

    public boolean isStarted() {
        return this.ioChannel.isActive();
    }

    public boolean isTerminated() {
        return !this.ioChannel.isOpen();
    }

    protected final void emitWriter(Publisher<?> encodedWriter, Subscriber<? super Void> postWriter) {
        ChannelFutureListener postWriteListener = future -> {
            postWriter.onSubscribe(Operators.emptySubscription());
            if (future.isSuccess()) {
                postWriter.onComplete();
            } else {
                postWriter.onError(future.cause());
            }
        };
        NettyChannelHandler.FlushMode mode = this.flushEach ? NettyChannelHandler.FlushMode.AUTO_EACH : NettyChannelHandler.FlushMode.MANUAL_COMPLETE;
        NettyChannelHandler.ChannelWriter writer = new NettyChannelHandler.ChannelWriter(encodedWriter, mode);
        if (this.ioChannel.eventLoop().inEventLoop()) {
            this.ioChannel.write((Object)writer).addListener((GenericFutureListener)postWriteListener);
        } else {
            this.ioChannel.eventLoop().execute(() -> this.ioChannel.write((Object)writer).addListener((GenericFutureListener)postWriteListener));
        }
    }

    public String toString() {
        return this.ioChannel.toString();
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return (InetSocketAddress)this.ioChannel.remoteAddress();
    }

    @Override
    public NettyChannel.Lifecycle on() {
        return new NettyLifecycle();
    }

    @Override
    public Channel delegate() {
        return this.ioChannel;
    }

    @Override
    public NettyOutbound flushEach() {
        this.flushEach = true;
        return this;
    }

    final class PostWritePublisher
    extends Mono<Void>
    implements Receiver,
    Loopback {
        private final Publisher<?> dataStream;

        public PostWritePublisher(Publisher<?> dataStream) {
            this.dataStream = dataStream;
        }

        public void subscribe(Subscriber<? super Void> s) {
            try {
                TcpChannel.this.emitWriter(this.dataStream, s);
            }
            catch (Throwable throwable) {
                Operators.error(s, (Throwable)throwable);
            }
        }

        public Object upstream() {
            return this.dataStream;
        }

        public Object connectedInput() {
            return TcpChannel.this;
        }

        public Object connectedOutput() {
            return TcpChannel.this;
        }
    }

    final class NettyLifecycle
    implements NettyChannel.Lifecycle {
        NettyLifecycle() {
        }

        @Override
        public NettyChannel.Lifecycle close(final Runnable onClose) {
            TcpChannel.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new ChannelDuplexHandler(){

                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                    onClose.run();
                    super.channelInactive(ctx);
                }
            }});
            return this;
        }

        @Override
        public NettyChannel.Lifecycle readIdle(long idleTimeout, final Runnable onReadIdle) {
            TcpChannel.this.ioChannel.pipeline().addFirst(new ChannelHandler[]{new IdleStateHandler(idleTimeout, 0L, 0L, TimeUnit.MILLISECONDS){

                protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
                    if (evt.state() == IdleState.READER_IDLE) {
                        onReadIdle.run();
                    }
                    super.channelIdle(ctx, evt);
                }
            }});
            return this;
        }

        @Override
        public NettyChannel.Lifecycle writeIdle(long idleTimeout, final Runnable onWriteIdle) {
            TcpChannel.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, idleTimeout, 0L, TimeUnit.MILLISECONDS){

                protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
                    if (evt.state() == IdleState.WRITER_IDLE) {
                        onWriteIdle.run();
                    }
                    super.channelIdle(ctx, evt);
                }
            }});
            return this;
        }
    }
}

