/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal;

import com.mongodb.reactivestreams.client.internal.BatchCursor;
import com.mongodb.reactivestreams.client.internal.BatchCursorPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

class BatchCursorFlux<T>
implements Publisher<T> {
    private final BatchCursorPublisher<T> batchCursorPublisher;
    private final AtomicBoolean inProgress = new AtomicBoolean(false);
    private final AtomicLong demandDelta = new AtomicLong(0L);
    private volatile BatchCursor<T> batchCursor;
    private FluxSink<T> sink;

    BatchCursorFlux(BatchCursorPublisher<T> batchCursorPublisher) {
        this.batchCursorPublisher = batchCursorPublisher;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        Flux.create(sink -> {
            this.sink = sink;
            sink.onRequest(demand -> {
                if (this.calculateDemand(demand) > 0L && this.inProgress.compareAndSet(false, true)) {
                    if (this.batchCursor == null) {
                        int batchSize = this.calculateBatchSize(sink.requestedFromDownstream());
                        Context initialContext = subscriber instanceof CoreSubscriber ? ((CoreSubscriber)subscriber).currentContext() : null;
                        this.batchCursorPublisher.batchCursor(batchSize).subscribe(bc -> {
                            this.batchCursor = bc;
                            this.inProgress.set(false);
                            if (sink.isCancelled()) {
                                this.closeCursor();
                            } else {
                                this.recurseCursor();
                            }
                        }, arg_0 -> ((FluxSink)sink).error(arg_0), null, initialContext);
                    } else {
                        this.inProgress.set(false);
                        this.recurseCursor();
                    }
                }
            });
            sink.onCancel(this::closeCursor);
            sink.onDispose(this::closeCursor);
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.BUFFER).subscribe(subscriber);
    }

    private void closeCursor() {
        if (this.batchCursor != null) {
            this.batchCursor.close();
        }
    }

    private void recurseCursor() {
        if (!this.sink.isCancelled() && this.sink.requestedFromDownstream() > 0L && this.inProgress.compareAndSet(false, true)) {
            if (this.batchCursor.isClosed()) {
                this.sink.complete();
            } else {
                this.batchCursor.setBatchSize(this.calculateBatchSize(this.sink.requestedFromDownstream()));
                Mono.from(this.batchCursor.next(() -> this.sink.isCancelled())).doOnCancel(this::closeCursor).doOnError(e -> {
                    try {
                        this.closeCursor();
                    }
                    finally {
                        this.sink.error(e);
                    }
                }).doOnSuccess(results -> {
                    if (results != null) {
                        results.forEach(arg_0 -> this.sink.next(arg_0));
                        this.calculateDemand(-results.size());
                    }
                    if (this.batchCursor.isClosed()) {
                        this.sink.complete();
                    } else {
                        this.inProgress.set(false);
                        this.recurseCursor();
                    }
                }).subscribe();
            }
        }
    }

    long calculateDemand(long demand) {
        return this.demandDelta.accumulateAndGet(demand, (originalValue, update) -> {
            long newValue = originalValue + update;
            return update > 0L && newValue < originalValue ? Long.MAX_VALUE : newValue;
        });
    }

    int calculateBatchSize(long demand) {
        Integer setBatchSize = this.batchCursorPublisher.getBatchSize();
        if (setBatchSize != null) {
            return setBatchSize;
        }
        if (demand > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        return Math.max(2, (int)demand);
    }
}

