/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.rs;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import net.pincette.rs.BlockingSubscriber;
import net.pincette.rs.Chain;
import net.pincette.rs.First;
import net.pincette.rs.LambdaSubscriber;
import net.pincette.rs.Last;
import net.pincette.rs.PassThrough;
import net.pincette.rs.Reducer;
import net.pincette.rs.Source;
import net.pincette.util.ScheduledCompletionStage;
import net.pincette.util.State;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class Util {
    private Util() {
    }

    public static <T> List<T> asList(Publisher<T> publisher) {
        return Util.asListAsync(publisher).toCompletableFuture().join();
    }

    public static <T> CompletionStage<List<T>> asListAsync(Publisher<T> publisher) {
        CompletableFuture<List<T>> future = new CompletableFuture<List<T>>();
        publisher.subscribe(Util.completerList(future));
        return future;
    }

    public static <T> T asValue(Publisher<T> publisher) {
        return Util.asValueAsync(publisher).toCompletableFuture().join();
    }

    public static <T> CompletionStage<T> asValueAsync(Publisher<T> publisher) {
        CompletableFuture future = new CompletableFuture();
        Chain.with(publisher).first().get().subscribe(Util.completerFirst(future));
        return future;
    }

    public static void empty(Publisher<Void> publisher) {
        Util.emptyAsync(publisher).toCompletableFuture().join();
    }

    public static CompletionStage<Void> emptyAsync(Publisher<Void> publisher) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        publisher.subscribe(Util.completerEmpty(future));
        return future;
    }

    private static LambdaSubscriber<Void> completerEmpty(CompletableFuture<Void> future) {
        return new LambdaSubscriber<Void>(v -> {}, () -> future.complete(null), future::completeExceptionally);
    }

    private static <T> LambdaSubscriber<T> completerFirst(CompletableFuture<T> future) {
        return new LambdaSubscriber<Object>(future::complete, () -> {}, future::completeExceptionally);
    }

    private static <T> LambdaSubscriber<T> completerList(CompletableFuture<List<T>> future) {
        ArrayList result = new ArrayList();
        return new LambdaSubscriber<Object>(result::add, () -> future.complete(result), future::completeExceptionally);
    }

    private static <T> CompletionStage<Optional<T>> element(Publisher<T> publisher, Processor<T, T> terminator) {
        return Reducer.reduce(Util.subscribe(publisher, terminator), () -> null, (result, value) -> value).thenApply(Optional::ofNullable);
    }

    public static <T> Publisher<T> empty() {
        return Source.of(Collections.emptyList());
    }

    public static <T> CompletionStage<Optional<T>> first(Publisher<T> publisher) {
        return Util.element(publisher, new First());
    }

    public static <T> Publisher<T> generate(final Supplier<T> supplier) {
        return subscriber -> subscriber.onSubscribe(new Subscription(){

            @Override
            public void cancel() {
            }

            @Override
            public void request(long n) {
                for (long i = 0L; i < n; ++i) {
                    subscriber.onNext(supplier.get());
                }
            }
        });
    }

    public static <T> Publisher<T> generate(Supplier<T> initial, UnaryOperator<T> next) {
        State state = new State();
        return Util.generate(() -> state.set(state.get() == null ? initial.get() : next.apply(state.get())));
    }

    public static <T> Iterable<T> iterate(Publisher<T> publisher) {
        return Util.iterate(publisher, 100L);
    }

    public static <T> Iterable<T> iterate(Publisher<T> publisher, long requestSize) {
        BlockingSubscriber subscriber = new BlockingSubscriber(requestSize);
        publisher.subscribe(subscriber);
        return subscriber;
    }

    public static <T> void join(Publisher<T> publisher) {
        Reducer.reduce(publisher, (v1, v2) -> v1).toCompletableFuture().join();
    }

    public static <T> CompletionStage<Optional<T>> last(Publisher<T> publisher) {
        return Util.element(publisher, new Last());
    }

    static void parking(Object blocker, long timeout) {
        if (timeout != -1L) {
            LockSupport.parkNanos(blocker, timeout * 1000L);
        } else {
            LockSupport.park(blocker);
        }
    }

    public static <T> Publisher<T> retryPublisher(Supplier<Publisher<T>> publisher, Duration retryInterval) {
        return Util.retryPublisher(publisher, retryInterval, null);
    }

    public static <T> Publisher<T> retryPublisher(Supplier<Publisher<T>> publisher, Duration retryInterval, Consumer<Throwable> onException) {
        Retry<T> result = new Retry<T>(publisher, retryInterval, onException);
        publisher.get().subscribe(result);
        return result;
    }

    public static <T, R> Publisher<R> subscribe(Publisher<T> publisher, Processor<T, R> processor) {
        publisher.subscribe(processor);
        return processor;
    }

    public static <T, R> Subscriber<T> subscribe(Processor<T, R> processor, Subscriber<R> subscriber) {
        processor.subscribe(subscriber);
        return processor;
    }

    private static class Retry<T>
    extends PassThrough<T> {
        private final Consumer<Throwable> onException;
        private final Supplier<Publisher<T>> publisher;
        private final Duration retryInterval;
        private Subscriber<? super T> subscriber;

        private Retry(Supplier<Publisher<T>> publisher, Duration retryInterval, Consumer<Throwable> onException) {
            this.publisher = publisher;
            this.retryInterval = retryInterval;
            this.onException = onException;
        }

        @Override
        public void onError(Throwable t) {
            this.setError(true);
            if (this.onException != null) {
                this.onException.accept(t);
            }
            ScheduledCompletionStage.composeAsyncAfter(() -> CompletableFuture.completedFuture(this.publisher.get()), this.retryInterval).thenAccept(p -> {
                this.setError(false);
                p.subscribe(this);
                this.subscribe(this.subscriber);
            });
        }

        @Override
        public void subscribe(Subscriber<? super T> subscriber) {
            super.subscribe(subscriber);
            this.subscriber = subscriber;
        }
    }
}

