/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal.gridfs;

import com.mongodb.MongoGridFSException;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher;
import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
import com.mongodb.reactivestreams.client.internal.TimeoutHelper;
import com.mongodb.reactivestreams.client.internal.gridfs.ResizingByteBufferFlux;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Binary;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GridFSDownloadPublisherImpl
implements GridFSDownloadPublisher {
    private static final String TIMEOUT_ERROR_MESSAGE = "Finding chunks exceeded the timeout limit.";
    private final ClientSession clientSession;
    private final Function<Timeout, GridFSFindPublisher> gridFSFileMono;
    private final MongoCollection<Document> chunksCollection;
    private Integer bufferSizeBytes;
    private volatile GridFSFile fileInfo;
    @Nullable
    private final Long timeoutMs;

    public GridFSDownloadPublisherImpl(@Nullable ClientSession clientSession, Function<Timeout, GridFSFindPublisher> gridFSFilePublisherCreator, MongoCollection<Document> chunksCollection) {
        this.clientSession = clientSession;
        this.gridFSFileMono = (Function)Assertions.notNull((String)"gridFSFilePublisherCreator", gridFSFilePublisherCreator);
        this.chunksCollection = (MongoCollection)Assertions.notNull((String)"chunksCollection", chunksCollection);
        this.timeoutMs = chunksCollection.getTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public Publisher<GridFSFile> getGridFSFile() {
        if (this.fileInfo != null) {
            return Mono.fromCallable(() -> this.fileInfo);
        }
        return Mono.from((Publisher)this.gridFSFileMono.apply(TimeoutContext.startTimeout((Long)this.timeoutMs))).doOnNext(gridFSFile -> {
            this.fileInfo = gridFSFile;
        });
    }

    @Override
    public GridFSDownloadPublisher bufferSizeBytes(int bufferSizeBytes) {
        this.bufferSizeBytes = bufferSizeBytes;
        return this;
    }

    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        Flux.defer(() -> {
            Timeout operationTimeout = TimeoutContext.startTimeout((Long)this.timeoutMs);
            return Mono.from((Publisher)this.gridFSFileMono.apply(operationTimeout)).doOnSuccess(gridFSFile -> {
                if (gridFSFile == null) {
                    throw new MongoGridFSException("File not found");
                }
                this.fileInfo = gridFSFile;
            }).flatMapMany(gridFSFile -> this.getChunkPublisher((GridFSFile)gridFSFile, operationTimeout));
        }).subscribe(subscriber);
    }

    private Flux<ByteBuffer> getChunkPublisher(GridFSFile gridFSFile, @Nullable Timeout timeout) {
        Document filter = new Document("files_id", (Object)gridFSFile.getId());
        FindPublisher<Document> chunkPublisher = this.clientSession != null ? TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).find(this.clientSession, (Bson)filter) : TimeoutHelper.collectionWithTimeout(this.chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).find((Bson)filter);
        AtomicInteger chunkCounter = new AtomicInteger(0);
        int numberOfChunks = (int)Math.ceil((double)gridFSFile.getLength() / (double)gridFSFile.getChunkSize());
        Flux byteBufferFlux = Flux.from(chunkPublisher.sort((Bson)new Document("n", (Object)1))).map(chunk -> {
            int expectedChunkIndex = chunkCounter.getAndAdd(1);
            if (chunk == null || chunk.getInteger((Object)"n") != expectedChunkIndex) {
                throw new MongoGridFSException(String.format("Could not find file chunk for files_id: %s at chunk index %s.", gridFSFile.getId(), expectedChunkIndex));
            }
            if (!(chunk.get((Object)"data") instanceof Binary)) {
                throw new MongoGridFSException("Unexpected data format for the chunk");
            }
            byte[] data = ((Binary)chunk.get((Object)"data", Binary.class)).getData();
            long expectedDataLength = 0L;
            if (numberOfChunks > 0) {
                long l = expectedDataLength = expectedChunkIndex + 1 == numberOfChunks ? gridFSFile.getLength() - (long)expectedChunkIndex * (long)gridFSFile.getChunkSize() : (long)gridFSFile.getChunkSize();
            }
            if ((long)data.length != expectedDataLength) {
                throw new MongoGridFSException(String.format("Chunk size data length is not the expected size. The size was %s for file_id: %s chunk index %s it should be %s bytes.", data.length, gridFSFile.getId(), expectedChunkIndex, expectedDataLength));
            }
            return ByteBuffer.wrap(data);
        }).doOnComplete(() -> {
            if (chunkCounter.get() < numberOfChunks) {
                throw new MongoGridFSException(String.format("Could not find file chunk for files_id: %s at chunk index %s.", gridFSFile.getId(), chunkCounter.get()));
            }
        });
        return this.bufferSizeBytes == null ? byteBufferFlux : new ResizingByteBufferFlux((Publisher<ByteBuffer>)byteBufferFlux, this.bufferSizeBytes);
    }
}

