/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class ElementsStream {
    private static <V> void take(final Callable<RFuture<V>> factory, final FluxSink<V> emitter, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) {
        RFuture<V> future;
        try {
            future = factory.call();
        }
        catch (Exception e) {
            emitter.error(e);
            return;
        }
        futureRef.set(future);
        future.addListener(new FutureListener<V>(){

            @Override
            public void operationComplete(Future<V> future) throws Exception {
                if (!future.isSuccess()) {
                    emitter.error(future.cause());
                    return;
                }
                emitter.next(future.getNow());
                if (counter.decrementAndGet() == 0L) {
                    emitter.complete();
                }
                ElementsStream.take(factory, emitter, counter, futureRef);
            }
        });
    }

    public static <V> Flux<V> takeElements(Callable<RFuture<V>> callable) {
        return Flux.create(emitter -> emitter.onRequest(n -> {
            AtomicLong counter = new AtomicLong(n);
            AtomicReference futureRef = new AtomicReference();
            ElementsStream.take(callable, emitter, counter, futureRef);
            emitter.onDispose(() -> ((RFuture)futureRef.get()).cancel(true));
        }));
    }
}

