/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.implementation.util;

import com.azure.core.implementation.util.TypeUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;

public final class FluxUtil {
    private static final int DEFAULT_CHUNK_SIZE = 65536;

    public static boolean isFluxByteBuf(Type entityType) {
        Type innerType;
        return TypeUtil.isTypeOrSubTypeOf(entityType, Flux.class) && TypeUtil.isTypeOrSubTypeOf(innerType = TypeUtil.getTypeArguments(entityType)[0], ByteBuf.class);
    }

    public static Mono<byte[]> collectBytesInByteBufStream(Flux<ByteBuf> stream, boolean autoReleaseEnabled) {
        if (autoReleaseEnabled) {
            return Mono.using(Unpooled::compositeBuffer, cbb -> stream.collect(() -> cbb, (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer((ByteBuf)buffer).retain())), ReferenceCountUtil::release).filter(cbb -> cbb.isReadable()).map(FluxUtil::byteBufToArray);
        }
        return stream.collect(Unpooled::compositeBuffer, (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer((ByteBuf)buffer))).filter(cbb -> cbb.isReadable()).map(FluxUtil::byteBufToArray);
    }

    public static Flux<ByteBuf> split(ByteBuf whole, int chunkSize) {
        return Flux.generate(() -> ((ByteBuf)whole).readerIndex(), (readFromIndex, synchronousSync) -> {
            int writerIndex = whole.writerIndex();
            if (readFromIndex >= writerIndex) {
                synchronousSync.complete();
                return writerIndex;
            }
            int readSize = Math.min(writerIndex - readFromIndex, chunkSize);
            synchronousSync.next((Object)whole.slice(readFromIndex.intValue(), readSize).retain());
            return readFromIndex + readSize;
        });
    }

    public static byte[] byteBufToArray(ByteBuf byteBuf) {
        int length = byteBuf.readableBytes();
        byte[] byteArray = new byte[length];
        byteBuf.getBytes(byteBuf.readerIndex(), byteArray);
        return byteArray;
    }

    public static Mono<ByteBuf> collectByteBufStream(Flux<ByteBuf> stream, boolean autoReleaseEnabled) {
        if (autoReleaseEnabled) {
            Mono mergedCbb = Mono.using(() -> {
                CompositeByteBuf initialCbb = Unpooled.compositeBuffer();
                return initialCbb;
            }, initialCbb -> {
                Mono reducedCbb = stream.reduce(initialCbb, (currentCbb, nextBb) -> {
                    CompositeByteBuf updatedCbb = currentCbb.addComponent(nextBb.retain());
                    return updatedCbb;
                });
                return reducedCbb.doOnNext(cbb -> cbb.writerIndex(cbb.capacity())).filter(cbb -> cbb.isReadable());
            }, finalCbb -> finalCbb.release());
            return mergedCbb;
        }
        return stream.collect(Unpooled::compositeBuffer, (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer((ByteBuf)buffer))).filter(cbb -> cbb.isReadable()).map(bb -> bb);
    }

    public static Mono<Void> bytebufStreamToFile(Flux<ByteBuf> content, AsynchronousFileChannel outFile) {
        return FluxUtil.bytebufStreamToFile(content, outFile, 0L);
    }

    public static Mono<Void> bytebufStreamToFile(Flux<ByteBuf> content, AsynchronousFileChannel outFile, long position) {
        return Mono.create(emitter -> content.subscribe((Subscriber)new ByteBufToFileSubscriber(outFile, position, (MonoSink)emitter)));
    }

    public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) {
        return new ByteBufStreamFromFile(fileChannel, chunkSize, offset, length);
    }

    public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel, long offset, long length) {
        return FluxUtil.byteBufStreamFromFile(fileChannel, 65536, offset, length);
    }

    public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel) {
        try {
            long size = fileChannel.size();
            return FluxUtil.byteBufStreamFromFile(fileChannel, 65536, 0L, size);
        }
        catch (IOException e) {
            return Flux.error((Throwable)e);
        }
    }

    private FluxUtil() {
    }

    private static final class ByteBufStreamFromFile
    extends Flux<ByteBuf> {
        private final ByteBufAllocator alloc = ByteBufAllocator.DEFAULT;
        private final AsynchronousFileChannel fileChannel;
        private final int chunkSize;
        private final long offset;
        private final long length;

        ByteBufStreamFromFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) {
            this.fileChannel = fileChannel;
            this.chunkSize = chunkSize;
            this.offset = offset;
            this.length = length;
        }

        public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
            FileReadSubscription subscription = new FileReadSubscription((Subscriber<? super ByteBuf>)actual, this.fileChannel, this.alloc, this.chunkSize, this.offset, this.length);
            actual.onSubscribe((Subscription)subscription);
        }

        static final class FileReadSubscription
        implements Subscription,
        CompletionHandler<Integer, ByteBuf> {
            private static final int NOT_SET = -1;
            private static final long serialVersionUID = -6831808726875304256L;
            private final Subscriber<? super ByteBuf> subscriber;
            private volatile long position;
            private final AsynchronousFileChannel fileChannel;
            private final ByteBufAllocator alloc;
            private final int chunkSize;
            private final long offset;
            private final long length;
            private volatile boolean done;
            private Throwable error;
            private volatile ByteBuf next;
            private volatile boolean cancelled;
            volatile int wip;
            static final AtomicIntegerFieldUpdater<FileReadSubscription> WIP = AtomicIntegerFieldUpdater.newUpdater(FileReadSubscription.class, "wip");
            volatile long requested;
            static final AtomicLongFieldUpdater<FileReadSubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(FileReadSubscription.class, "requested");

            FileReadSubscription(Subscriber<? super ByteBuf> subscriber, AsynchronousFileChannel fileChannel, ByteBufAllocator alloc, int chunkSize, long offset, long length) {
                this.subscriber = subscriber;
                this.fileChannel = fileChannel;
                this.alloc = alloc;
                this.chunkSize = chunkSize;
                this.offset = offset;
                this.length = length;
                this.position = -1L;
            }

            public void request(long n) {
                if (Operators.validate((long)n)) {
                    Operators.addCap(REQUESTED, (Object)this, (long)n);
                    this.drain();
                }
            }

            public void cancel() {
                this.cancelled = true;
            }

            @Override
            public void completed(Integer bytesRead, ByteBuf buffer) {
                if (!this.cancelled) {
                    if (bytesRead == -1) {
                        this.done = true;
                    } else {
                        long position2;
                        long pos = this.position;
                        int bytesWanted = Math.min(bytesRead, this.maxRequired(pos));
                        buffer.writerIndex(bytesWanted);
                        this.position = position2 = pos + (long)bytesWanted;
                        this.next = buffer;
                        if (position2 >= this.offset + this.length) {
                            this.done = true;
                        }
                    }
                    this.drain();
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuf attachment) {
                if (!this.cancelled) {
                    this.error = exc;
                    this.done = true;
                    this.drain();
                }
            }

            private void drain() {
                if (WIP.getAndIncrement(this) != 0) {
                    return;
                }
                if (this.position == -1L) {
                    this.position = this.offset;
                    this.doRead();
                }
                int missed = 1;
                do {
                    if (this.cancelled) {
                        return;
                    }
                    if (REQUESTED.get(this) <= 0L) continue;
                    boolean emitted = false;
                    boolean d = this.done;
                    ByteBuf bb = this.next;
                    if (bb != null) {
                        this.next = null;
                        this.subscriber.onNext((Object)bb);
                        emitted = true;
                    } else {
                        emitted = false;
                    }
                    if (d) {
                        if (this.error != null) {
                            this.subscriber.onError(this.error);
                            return;
                        }
                        this.subscriber.onComplete();
                        return;
                    }
                    if (!emitted) continue;
                    Operators.produced(REQUESTED, (Object)this, (long)1L);
                    this.doRead();
                } while ((missed = WIP.addAndGet(this, -missed)) != 0);
            }

            private void doRead() {
                long pos = this.position;
                int readSize = Math.min(this.chunkSize, this.maxRequired(pos));
                ByteBuf innerBuf = this.alloc.buffer(readSize, readSize);
                this.fileChannel.read(innerBuf.nioBuffer(0, readSize), pos, innerBuf, this);
            }

            private int maxRequired(long pos) {
                long maxRequired = this.offset + this.length - pos;
                if (maxRequired <= 0L) {
                    return 0;
                }
                int m = (int)maxRequired;
                if (m < 0) {
                    return Integer.MAX_VALUE;
                }
                return m;
            }
        }
    }

    private static class ByteBufToFileSubscriber
    implements Subscriber<ByteBuf> {
        volatile boolean isWriting = false;
        volatile boolean isCompleted = false;
        volatile Subscription subscription;
        volatile long pos;
        AsynchronousFileChannel outFile;
        MonoSink<Void> emitter;
        CompletionHandler<Integer, Object> onWriteCompleted = new CompletionHandler<Integer, Object>(){

            @Override
            public void completed(Integer bytesWritten, Object attachment) {
                isWriting = false;
                if (isCompleted) {
                    emitter.success();
                }
                pos += (long)bytesWritten.intValue();
                subscription.request(1L);
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                subscription.cancel();
                emitter.error(exc);
            }
        };

        private ByteBufToFileSubscriber(AsynchronousFileChannel outFile, long position, MonoSink<Void> emitter) {
            this.outFile = outFile;
            this.pos = position;
            this.emitter = emitter;
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(1L);
        }

        public void onNext(ByteBuf bytes) {
            this.isWriting = true;
            this.outFile.write(bytes.nioBuffer(), this.pos, null, this.onWriteCompleted);
        }

        public void onError(Throwable throwable) {
            this.subscription.cancel();
            this.emitter.error(throwable);
        }

        public void onComplete() {
            this.isCompleted = true;
            if (!this.isWriting) {
                this.emitter.success();
            }
        }
    }
}

