/*
 * Decompiled with CFR 0.152.
 */
package com.github.paganini2008.devtools.multithreads;

import com.github.paganini2008.devtools.Sequence;
import com.github.paganini2008.devtools.multithreads.ExecutorUtils;
import com.github.paganini2008.devtools.multithreads.ThreadPool;
import com.github.paganini2008.devtools.multithreads.latch.CounterLatch;
import com.github.paganini2008.devtools.multithreads.latch.Latch;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

public abstract class ForEach<E> {
    private final Latch latch;
    private final Worker worker;
    private final Executor executor;

    public ForEach(int nThreads) {
        this(Executors.newFixedThreadPool(nThreads), nThreads * 2);
    }

    public ForEach(Executor executor, int maxPermits) {
        this(executor, new ConcurrentLinkedQueue(), maxPermits);
    }

    public ForEach(Executor executor, Queue<E> workQueue, int maxPermits) {
        this.worker = new Worker(workQueue);
        this.executor = executor;
        this.latch = maxPermits > 0 ? new CounterLatch(maxPermits) : CounterLatch.newUnlimitedLatch();
    }

    public void accept(Iterable<E> iterable) {
        for (E element : iterable) {
            this.accept(element);
        }
    }

    public void accept(E element) {
        if (element != null) {
            this.worker.push(element);
            this.executor.execute(this.worker);
        }
    }

    public void join(boolean shutdown) {
        this.latch.join();
        if (shutdown) {
            if (this.executor instanceof ThreadPool) {
                ((ThreadPool)this.executor).shutdown();
            } else {
                ExecutorUtils.gracefulShutdown(this.executor, 60000L);
            }
        }
    }

    protected abstract void process(E var1);

    public static <E> void run(Iterable<E> iterable, final Consumer<E> consumer) {
        int nThreads = Runtime.getRuntime().availableProcessors() * 2;
        ForEach forEach = new ForEach<E>(nThreads){

            @Override
            protected void process(E element) {
                consumer.accept(element);
            }
        };
        forEach.accept((E)iterable);
        forEach.join(true);
    }

    public static void main(String[] args) {
        ForEach.run(Sequence.forEach(0, 10000), e -> System.out.println(e));
    }

    private class Worker
    implements Runnable {
        private final Queue<E> queue;

        Worker(Queue<E> queue) {
            this.queue = queue;
        }

        public void push(E element) {
            ForEach.this.latch.acquire();
            this.queue.add(element);
        }

        @Override
        public void run() {
            Object element = this.queue.poll();
            try {
                ForEach.this.process(element);
            }
            finally {
                ForEach.this.latch.release();
            }
        }
    }
}

