/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx2;

import com.github.davidmoten.guavamini.Optional;
import com.github.davidmoten.rx2.flowable.CachedFlowable;
import com.github.davidmoten.rx2.flowable.CloseableFlowableWithReset;
import com.github.davidmoten.rx2.internal.flowable.FlowableFetchPagesByRequest;
import com.github.davidmoten.rx2.internal.flowable.FlowableMatch;
import com.github.davidmoten.rx2.internal.flowable.FlowableMergeInterleave;
import com.github.davidmoten.rx2.internal.flowable.FlowableRepeat;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

public final class Flowables {
    private static final int DEFAULT_MATCH_BATCH_SIZE = 128;
    private static final int DEFAULT_RING_BUFFER_SIZE = 128;

    private Flowables() {
    }

    public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b, Function<? super A, K> aKey, Function<? super B, K> bKey, BiFunction<? super A, ? super B, C> combiner, int requestSize) {
        return new FlowableMatch<A, B, K, C>(a, b, aKey, bKey, combiner, requestSize);
    }

    public static <A, B, K, C> Flowable<C> match(Flowable<A> a, Flowable<B> b, Function<? super A, K> aKey, Function<? super B, K> bKey, BiFunction<? super A, ? super B, C> combiner) {
        return Flowables.match(a, b, aKey, bKey, combiner, 128);
    }

    public static <T> Flowable<T> repeat(T t) {
        return new FlowableRepeat<T>(t, -1L);
    }

    public static <T> Flowable<T> repeat(T t, long count) {
        return new FlowableRepeat<T>(t, count);
    }

    public static <T> Flowable<T> fetchPagesByRequest(BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start, int maxConcurrent) {
        return FlowableFetchPagesByRequest.create(fetch, start, maxConcurrent);
    }

    public static <T> Flowable<T> fetchPagesByRequest(BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, long start) {
        return Flowables.fetchPagesByRequest(fetch, start, 2);
    }

    public static <T> Flowable<T> fetchPagesByRequest(BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch) {
        return Flowables.fetchPagesByRequest(fetch, 0L, 2);
    }

    public static <T> CachedFlowable<T> cache(Flowable<T> source) {
        return new CachedFlowable<T>(source);
    }

    public static <T> Flowable<T> cache(Flowable<T> source, final long duration, final TimeUnit unit, final Scheduler.Worker worker) {
        final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
        CachedFlowable<T> cache = new CachedFlowable<T>(source);
        cacheRef.set(cache);
        return cache.doOnSubscribe((Consumer)new Consumer<Subscription>(){

            public void accept(Subscription d) {
                Runnable action = new Runnable(){

                    @Override
                    public void run() {
                        ((CachedFlowable)((Object)cacheRef.get())).reset();
                    }
                };
                worker.schedule(action, duration, unit);
            }
        });
    }

    public static <T> CloseableFlowableWithReset<T> cache(Flowable<T> source, final long duration, final TimeUnit unit, final Scheduler scheduler) {
        final AtomicReference<CachedFlowable<T>> cacheRef = new AtomicReference<CachedFlowable<T>>();
        final AtomicReference<Optional> workerRef = new AtomicReference<Optional>(Optional.absent());
        CachedFlowable<T> cache = new CachedFlowable<T>(source);
        cacheRef.set(cache);
        Runnable closeAction = new Runnable(){

            @Override
            public void run() {
                Optional w;
                while ((w = (Optional)workerRef.get()) != null) {
                    if (!workerRef.compareAndSet(w, null)) continue;
                    if (w.isPresent()) {
                        ((Scheduler.Worker)w.get()).dispose();
                    }
                    workerRef.set(null);
                    break;
                }
            }
        };
        Runnable resetAction = new Runnable(){

            @Override
            public void run() {
                Flowables.startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
            }
        };
        return new CloseableFlowableWithReset<T>(cache, closeAction, resetAction);
    }

    private static <T> void startScheduledResetAgain(long duration, TimeUnit unit, Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef, AtomicReference<Optional<Scheduler.Worker>> workerRef) {
        Optional w;
        Optional<Scheduler.Worker> wOld;
        Runnable action = new Runnable(){

            @Override
            public void run() {
                ((CachedFlowable)((Object)cacheRef.get())).reset();
            }
        };
        do {
            if ((wOld = workerRef.get()) != null) continue;
            return;
        } while (!workerRef.compareAndSet(wOld, (Optional<Scheduler.Worker>)(w = Optional.of((Object)scheduler.createWorker()))));
        if (wOld.isPresent()) {
            ((Scheduler.Worker)wOld.get()).dispose();
        }
        ((Scheduler.Worker)w.get()).schedule(action, duration, unit);
    }

    public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers, int maxConcurrency, int batchSize, boolean delayErrors) {
        return new FlowableMergeInterleave(publishers, maxConcurrency, batchSize, delayErrors);
    }

    public static <T> Flowable<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers, int maxConcurrency) {
        return Flowables.mergeInterleaved(publishers, maxConcurrency, 128, false);
    }

    public static <T> MergeInterleaveBuilder<T> mergeInterleaved(Publisher<? extends Publisher<? extends T>> publishers) {
        return new MergeInterleaveBuilder(publishers);
    }

    public static final class MergeInterleaveBuilder<T> {
        private final Publisher<? extends Publisher<? extends T>> publishers;
        private int maxConcurrency = 4;
        private int batchSize = 128;
        private boolean delayErrors = false;

        MergeInterleaveBuilder(Publisher<? extends Publisher<? extends T>> publishers) {
            this.publishers = publishers;
        }

        public MergeInterleaveBuilder<T> maxConcurrency(int maxConcurrency) {
            this.maxConcurrency = maxConcurrency;
            return this;
        }

        public MergeInterleaveBuilder<T> batchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        public MergeInterleaveBuilder<T> delayErrors(boolean delayErrors) {
            this.delayErrors = delayErrors;
            return this;
        }

        public Flowable<T> build() {
            return Flowables.mergeInterleaved(this.publishers, this.maxConcurrency, this.batchSize, this.delayErrors);
        }
    }
}

