/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.rsocket.Frame;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.internal.BaseDuplexConnection;
import io.rsocket.transport.netty.SendPublisher;
import java.util.Objects;
import java.util.Queue;
import org.reactivestreams.Publisher;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.util.concurrent.Queues;

public final class WebsocketDuplexConnection
extends BaseDuplexConnection {
    private final Connection connection;

    public WebsocketDuplexConnection(Connection connection) {
        this.connection = Objects.requireNonNull(connection, "connection must not be null");
        connection.channel().closeFuture().addListener(future -> {
            if (!this.isDisposed()) {
                this.dispose();
            }
        });
    }

    protected void doOnClose() {
        if (!this.connection.isDisposed()) {
            this.connection.dispose();
        }
    }

    public Flux<Frame> receive() {
        return this.connection.inbound().receive().map(buf -> {
            CompositeByteBuf composite = this.connection.channel().alloc().compositeBuffer();
            ByteBuf length = Unpooled.wrappedBuffer((byte[])new byte[3]);
            FrameHeaderFlyweight.encodeLength((ByteBuf)length, (int)0, (int)buf.readableBytes());
            composite.addComponents(true, new ByteBuf[]{length, buf.retain()});
            return Frame.from((ByteBuf)composite);
        });
    }

    public Mono<Void> send(Publisher<Frame> frames) {
        return Flux.from(frames).transform(frameFlux -> {
            if (frameFlux instanceof Fuseable.QueueSubscription) {
                Fuseable.QueueSubscription queueSubscription = (Fuseable.QueueSubscription)frameFlux;
                queueSubscription.requestFusion(2);
                return new SendPublisher<BinaryWebSocketFrame>((Queue<Frame>)queueSubscription, (Publisher<Frame>)frameFlux, this.connection.channel(), this::toBinaryWebSocketFrame, binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes());
            }
            return new SendPublisher<BinaryWebSocketFrame>((Queue)Queues.small().get(), (Publisher<Frame>)frameFlux, this.connection.channel(), this::toBinaryWebSocketFrame, binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes());
        }).then();
    }

    private BinaryWebSocketFrame toBinaryWebSocketFrame(Frame frame) {
        return new BinaryWebSocketFrame(frame.content().skipBytes(3).retain());
    }
}

