/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSource;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.ByteBufMono;

public final class ByteBufFlux
extends FluxSource<ByteBuf, ByteBuf> {
    final ByteBufAllocator alloc;
    static final Function<Object, ByteBuf> bytebufExtractor = o -> {
        if (o instanceof ByteBuf) {
            return (ByteBuf)o;
        }
        if (o instanceof ByteBufHolder) {
            return ((ByteBufHolder)o).content();
        }
        throw new IllegalArgumentException("Object " + o + " of type " + o.getClass() + " cannot be converted to ByteBuf");
    };
    static final int MAX_CHUNK_SIZE = 524288;

    public static ByteBufFlux fromInbound(Publisher<?> source) {
        return ByteBufFlux.fromInbound(source, ByteBufAllocator.DEFAULT);
    }

    public static ByteBufFlux fromInbound(Publisher<?> source, ByteBufAllocator allocator) {
        Objects.requireNonNull(allocator, "allocator");
        return new ByteBufFlux((Flux<ByteBuf>)Flux.from(source).map(bytebufExtractor), allocator);
    }

    public static ByteBufFlux fromPath(Path path) {
        return ByteBufFlux.fromPath(path, 524288);
    }

    public static ByteBufFlux fromPath(Path path, int maxChunkSize) {
        return ByteBufFlux.fromPath(path, maxChunkSize, ByteBufAllocator.DEFAULT);
    }

    public static ByteBufFlux fromPath(Path path, ByteBufAllocator allocator) {
        return ByteBufFlux.fromPath(path, 524288, allocator);
    }

    public static ByteBufFlux fromPath(Path path, int maxChunkSize, ByteBufAllocator allocator) {
        Objects.requireNonNull(path, "path");
        Objects.requireNonNull(allocator, "allocator");
        if (maxChunkSize < 1) {
            throw new IllegalArgumentException("chunk size must be strictly positive, was: " + maxChunkSize);
        }
        return new ByteBufFlux((Flux<ByteBuf>)Flux.generate(() -> FileChannel.open(path, new OpenOption[0]), (fc, sink) -> {
            try {
                ByteBuf buf = allocator.buffer();
                long pos = buf.writeBytes((ScatteringByteChannel)fc, maxChunkSize);
                if (pos < 0L) {
                    sink.complete();
                } else {
                    sink.next((Object)buf);
                }
            }
            catch (IOException e) {
                sink.error((Throwable)e);
            }
            return fc;
        }), allocator);
    }

    public final Flux<ByteBuffer> asByteBuffer() {
        return this.map(ByteBuf::nioBuffer);
    }

    public final Flux<byte[]> asByteArray() {
        return this.map(bb -> {
            byte[] bytes = new byte[bb.readableBytes()];
            bb.readBytes(bytes);
            return bytes;
        });
    }

    public Flux<InputStream> asInputStream() {
        return this.map(ByteBufMono.ReleasingInputStream::new);
    }

    public final Flux<String> asString() {
        return this.asString(Charset.defaultCharset());
    }

    public final Flux<String> asString(Charset charset) {
        return this.map(s -> s.toString(charset));
    }

    public ByteBufMono aggregate() {
        return (ByteBufMono)((Object)Mono.using(() -> ((ByteBufAllocator)this.alloc).compositeBuffer(), b -> this.reduce(b, (prev, next) -> prev.addComponent(next.retain())).doOnNext(cbb -> cbb.writerIndex(cbb.capacity())).filter(ByteBuf::isReadable), ReferenceCounted::release).as(ByteBufMono::new));
    }

    public ByteBufMono multicast() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public ByteBufFlux retain() {
        return new ByteBufFlux((Flux<ByteBuf>)this.doOnNext(ByteBuf::retain), this.alloc);
    }

    ByteBufFlux(Flux<ByteBuf> source, ByteBufAllocator allocator) {
        super(source);
        this.alloc = allocator;
    }

    public void subscribe(Subscriber<? super ByteBuf> s) {
        this.source.subscribe(s);
    }
}

