/*
 * Decompiled with CFR 0.152.
 */
package com.pivovarit.collectors;

import com.pivovarit.collectors.AsyncParallelCollector;
import com.pivovarit.collectors.CompletionOrderSpliterator;
import com.pivovarit.collectors.Dispatcher;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

class ParallelStreamCollector<T, R>
implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
    private final Dispatcher<R> dispatcher;
    private final Function<T, R> function;
    private final Function<List<CompletableFuture<R>>, Stream<R>> processor;
    private final Set<Collector.Characteristics> characteristics;

    private ParallelStreamCollector(Function<T, R> function, Function<List<CompletableFuture<R>>, Stream<R>> processor, Set<Collector.Characteristics> characteristics, Executor executor, int parallelism) {
        this.processor = processor;
        this.characteristics = characteristics;
        this.dispatcher = new Dispatcher(executor, parallelism);
        this.function = function;
    }

    private ParallelStreamCollector(Function<T, R> function, Function<List<CompletableFuture<R>>, Stream<R>> processor, Set<Collector.Characteristics> characteristics, Executor executor) {
        this.characteristics = characteristics;
        this.dispatcher = new Dispatcher(executor);
        this.function = function;
        this.processor = processor;
    }

    private void startConsuming() {
        if (!this.dispatcher.isRunning()) {
            this.dispatcher.start();
        }
    }

    @Override
    public Supplier<List<CompletableFuture<R>>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
        return (acc, e) -> {
            this.startConsuming();
            acc.add(this.dispatcher.enqueue(() -> this.function.apply(e)));
        };
    }

    @Override
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
        return (left, right) -> {
            left.addAll(right);
            return left;
        };
    }

    @Override
    public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
        return this.processor.compose(i -> {
            this.dispatcher.stop();
            return i;
        });
    }

    @Override
    public Set<Collector.Characteristics> characteristics() {
        return this.characteristics;
    }

    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor) {
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        return new ParallelStreamCollector<T, R>(mapper, ParallelStreamCollector.streamInCompletionOrderStrategy(), EnumSet.of(Collector.Characteristics.UNORDERED), executor);
    }

    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return new ParallelStreamCollector<T, R>(mapper, ParallelStreamCollector.streamInCompletionOrderStrategy(), EnumSet.of(Collector.Characteristics.UNORDERED), executor, parallelism);
    }

    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor) {
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        return new ParallelStreamCollector<T, R>(mapper, ParallelStreamCollector.streamOrderedStrategy(), Collections.emptySet(), executor);
    }

    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor, int parallelism) {
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return new ParallelStreamCollector<T, R>(mapper, ParallelStreamCollector.streamOrderedStrategy(), Collections.emptySet(), executor, parallelism);
    }

    private static <R> Function<List<CompletableFuture<R>>, Stream<R>> streamInCompletionOrderStrategy() {
        return futures -> StreamSupport.stream(new CompletionOrderSpliterator(futures), false);
    }

    private static <R> Function<List<CompletableFuture<R>>, Stream<R>> streamOrderedStrategy() {
        return futures -> futures.stream().map(CompletableFuture::join);
    }
}

