/*
 * Decompiled with CFR 0.152.
 */
package com.pivovarit.collectors;

import com.pivovarit.collectors.AsyncParallelCollector;
import com.pivovarit.collectors.BatchingStream;
import com.pivovarit.collectors.CompletionStrategy;
import com.pivovarit.collectors.Dispatcher;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class ParallelStreamCollector<T, R>
implements Collector<T, Stream.Builder<CompletableFuture<R>>, Stream<R>> {
    private static final EnumSet<Collector.Characteristics> UNORDERED = EnumSet.of(Collector.Characteristics.UNORDERED);
    private final Function<T, R> function;
    private final CompletionStrategy<R> completionStrategy;
    private final Set<Collector.Characteristics> characteristics;
    private final Semaphore limiter;
    private final Executor executor;

    private ParallelStreamCollector(Function<T, R> function, CompletionStrategy<R> completionStrategy, Set<Collector.Characteristics> characteristics, Executor executor, int parallelism) {
        this.completionStrategy = completionStrategy;
        this.characteristics = characteristics;
        this.limiter = new Semaphore(parallelism);
        this.function = function;
        this.executor = executor;
    }

    @Override
    public Supplier<Stream.Builder<CompletableFuture<R>>> supplier() {
        return Stream::builder;
    }

    @Override
    public BiConsumer<Stream.Builder<CompletableFuture<R>>, T> accumulator() {
        return (acc, e) -> {
            try {
                this.limiter.acquire();
                acc.add(CompletableFuture.supplyAsync(() -> {
                    try {
                        R r = this.function.apply(e);
                        return r;
                    }
                    finally {
                        this.limiter.release();
                    }
                }, this.executor));
            }
            catch (InterruptedException interruptedException) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(interruptedException);
            }
        };
    }

    @Override
    public BinaryOperator<Stream.Builder<CompletableFuture<R>>> combiner() {
        return (left, right) -> {
            throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea");
        };
    }

    @Override
    public Function<Stream.Builder<CompletableFuture<R>>, Stream<R>> finisher() {
        return acc -> (Stream)this.completionStrategy.apply(acc.build());
    }

    @Override
    public Set<Collector.Characteristics> characteristics() {
        return this.characteristics;
    }

    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor) {
        return ParallelStreamCollector.streaming(mapper, executor, Dispatcher.getDefaultParallelism());
    }

    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return parallelism == 1 ? BatchingCollectors.syncCollector(mapper) : new ParallelStreamCollector<T, R>(mapper, CompletionStrategy.unordered(), UNORDERED, executor, parallelism);
    }

    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor) {
        return ParallelStreamCollector.streamingOrdered(mapper, executor, Dispatcher.getDefaultParallelism());
    }

    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor, int parallelism) {
        Objects.requireNonNull(executor, "executor can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        AsyncParallelCollector.requireValidParallelism(parallelism);
        return parallelism == 1 ? BatchingCollectors.syncCollector(mapper) : new ParallelStreamCollector<T, R>(mapper, CompletionStrategy.ordered(), Collections.emptySet(), executor, parallelism);
    }

    static final class BatchingCollectors {
        private BatchingCollectors() {
        }

        static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
            Objects.requireNonNull(executor, "executor can't be null");
            Objects.requireNonNull(mapper, "mapper can't be null");
            AsyncParallelCollector.requireValidParallelism(parallelism);
            return parallelism == 1 ? BatchingCollectors.syncCollector(mapper) : BatchingCollectors.batched(new ParallelStreamCollector<List<T>, List<R>>(BatchingStream.batching(mapper), CompletionStrategy.unordered(), UNORDERED, executor, parallelism), parallelism);
        }

        static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor, int parallelism) {
            Objects.requireNonNull(executor, "executor can't be null");
            Objects.requireNonNull(mapper, "mapper can't be null");
            AsyncParallelCollector.requireValidParallelism(parallelism);
            return parallelism == 1 ? BatchingCollectors.syncCollector(mapper) : BatchingCollectors.batched(new ParallelStreamCollector<List<T>, List<R>>(BatchingStream.batching(mapper), CompletionStrategy.ordered(), Collections.emptySet(), executor, parallelism), parallelism);
        }

        private static <T, R> Collector<T, ?, Stream<R>> batched(ParallelStreamCollector<List<T>, List<R>> downstream, int parallelism) {
            return Collectors.collectingAndThen(Collectors.toList(), list -> BatchingStream.partitioned(list, parallelism).collect(Collectors.collectingAndThen(downstream, s -> s.flatMap(Collection::stream))));
        }

        private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {
            return Collector.of(Stream::builder, (rs, t) -> rs.add(mapper.apply(t)), (rs, rs2) -> {
                throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea");
            }, Stream.Builder::build, new Collector.Characteristics[0]);
        }
    }
}

