/*
 * Decompiled with CFR 0.152.
 */
package com.pivovarit.collectors;

import com.pivovarit.collectors.BatchingSpliterator;
import com.pivovarit.collectors.Dispatcher;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

final class AsyncParallelCollector<T, R, C>
implements Collector<T, List<CompletableFuture<R>>, CompletableFuture<C>> {
    private final Dispatcher<R> dispatcher;
    private final Function<T, R> task;
    private final Function<Stream<R>, C> finalizer;

    private AsyncParallelCollector(Function<T, R> task, Dispatcher<R> dispatcher, Function<Stream<R>, C> finalizer) {
        this.dispatcher = dispatcher;
        this.finalizer = finalizer;
        this.task = task;
    }

    @Override
    public Supplier<List<CompletableFuture<R>>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
        return (left, right) -> {
            throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea");
        };
    }

    @Override
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
        return (acc, e) -> {
            if (!this.dispatcher.isRunning()) {
                this.dispatcher.start();
            }
            acc.add(this.dispatcher.enqueue(() -> this.task.apply(e)));
        };
    }

    @Override
    public Function<List<CompletableFuture<R>>, CompletableFuture<C>> finisher() {
        return futures -> {
            this.dispatcher.stop();
            return AsyncParallelCollector.combine(futures).thenApply(this.finalizer);
        };
    }

    @Override
    public Set<Collector.Characteristics> characteristics() {
        return Collections.emptySet();
    }

    private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T>> futures) {
        CompletionStage combined = CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new)).thenApply(__ -> futures.stream().map(CompletableFuture::join));
        for (CompletableFuture<T> future : futures) {
            future.whenComplete((arg_0, arg_1) -> AsyncParallelCollector.lambda$combine$6((CompletableFuture)combined, arg_0, arg_1));
        }
        return combined;
    }

    static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return new AsyncParallelCollector(mapper, Dispatcher.virtual(), Function.identity());
    }

    static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, int parallelism) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return new AsyncParallelCollector(mapper, Dispatcher.virtual(parallelism), Function.identity());
    }

    static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) {
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return parallelism == 1 ? AsyncParallelCollector.asyncCollector(mapper, executor, i -> i) : new AsyncParallelCollector(mapper, Dispatcher.from(executor, parallelism), Function.identity());
    }

    static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper) {
        Objects.requireNonNull(collector, "collector can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        return new AsyncParallelCollector<T, R, Object>(mapper, Dispatcher.virtual(), s -> s.collect(collector));
    }

    static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, int parallelism) {
        Objects.requireNonNull(collector, "collector can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return parallelism == 1 ? AsyncParallelCollector.asyncCollector(mapper, Executors.newVirtualThreadPerTaskExecutor(), s -> s.collect(collector)) : new AsyncParallelCollector<T, R, Object>(mapper, Dispatcher.virtual(parallelism), s -> s.collect(collector));
    }

    static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) {
        Objects.requireNonNull(collector, "collector can't be null");
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return parallelism == 1 ? AsyncParallelCollector.asyncCollector(mapper, executor, s -> s.collect(collector)) : new AsyncParallelCollector<T, R, Object>(mapper, Dispatcher.from(executor, parallelism), s -> s.collect(collector));
    }

    static void requireValidParallelism(int parallelism) {
        if (parallelism < 1) {
            throw new IllegalArgumentException("Parallelism can't be lower than 1");
        }
    }

    static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) {
        return Collectors.collectingAndThen(Collectors.toList(), list -> CompletableFuture.supplyAsync(() -> {
            Stream.Builder acc = Stream.builder();
            for (Object t : list) {
                acc.add(mapper.apply(t));
            }
            return finisher.apply(acc.build());
        }, executor));
    }

    private static /* synthetic */ void lambda$combine$6(CompletableFuture combined, Object o, Throwable ex) {
        if (ex != null) {
            combined.completeExceptionally(ex);
        }
    }

    static final class BatchingCollectors {
        private BatchingCollectors() {
        }

        static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) {
            Objects.requireNonNull(collector, "collector can't be null");
            Objects.requireNonNull(executor, "executor can't be null");
            Objects.requireNonNull(mapper, "mapper can't be null");
            AsyncParallelCollector.requireValidParallelism(parallelism);
            return parallelism == 1 ? AsyncParallelCollector.asyncCollector(mapper, executor, s -> s.collect(collector)) : BatchingCollectors.batchingCollector(mapper, executor, parallelism, s -> s.collect(collector));
        }

        static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) {
            Objects.requireNonNull(executor, "executor can't be null");
            Objects.requireNonNull(mapper, "mapper can't be null");
            AsyncParallelCollector.requireValidParallelism(parallelism);
            return parallelism == 1 ? AsyncParallelCollector.asyncCollector(mapper, executor, i -> i) : BatchingCollectors.batchingCollector(mapper, executor, parallelism, s -> s);
        }

        private static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism, Function<Stream<R>, RR> finisher) {
            return Collectors.collectingAndThen(Collectors.toList(), list -> {
                if (list.size() == parallelism) {
                    return (CompletableFuture)list.stream().collect(new AsyncParallelCollector(mapper, Dispatcher.from(executor, parallelism), finisher));
                }
                return (CompletableFuture)BatchingSpliterator.partitioned(list, parallelism).collect(new AsyncParallelCollector(BatchingSpliterator.batching(mapper), Dispatcher.from(executor, parallelism), listStream -> finisher.apply(listStream.flatMap(Collection::stream))));
            });
        }
    }
}

