/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

public final class TcpDuplexConnection
implements DuplexConnection {
    private final Connection connection;

    public TcpDuplexConnection(Connection connection) {
        this.connection = Objects.requireNonNull(connection, "connection must not be null");
    }

    public void dispose() {
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    public Flux<Frame> receive() {
        return this.connection.inbound().receive().map(buf -> Frame.from((ByteBuf)buf.retain()));
    }

    public Mono<Void> send(Publisher<Frame> frames) {
        return Flux.from(frames).concatMap(this::sendOne).then();
    }

    public Mono<Void> sendOne(Frame frame) {
        return this.connection.outbound().sendObject((Object)frame.content()).then();
    }
}

