/*
 * Decompiled with CFR 0.152.
 */
package one.microstream.functional;

import java.util.function.Consumer;
import one.microstream.X;
import one.microstream.collections.BulkList;
import one.microstream.concurrency.XThreads;
import one.microstream.math.XMath;
import one.microstream.typing.XTypes;

public interface ParallelProcedure<E>
extends Consumer<E> {
    @Override
    public void accept(E var1);

    public static final class Default<E>
    implements ParallelProcedure<E> {
        private static final int DEFAULT_THREAD_TIMEOUT = 1000;
        private static final ThreadTimeoutProvider DEFAULT_THREAD_TIMEOUT_PROVIDER = () -> 1000;
        private final LogicProvider<? super E, ? extends Consumer<? super E>> logicProvider;
        private final BulkList<WorkerThread> threads;
        private final ThreadCountProvider threadCountProvider;
        private final ThreadTimeoutProvider threadTimeoutProvider;
        private Entry<E> head;
        private Entry<E> tail;
        private long lastTouched = System.currentTimeMillis();

        public Default(LogicProvider<? super E, ? extends Consumer<? super E>> logicProvider, int threadCount) {
            this(logicProvider, new ThreadCountProvider.Constant(threadCount), DEFAULT_THREAD_TIMEOUT_PROVIDER);
        }

        public Default(LogicProvider<? super E, ? extends Consumer<? super E>> logicProvider, int threadCount, int threadTimeout) {
            this(logicProvider, new ThreadCountProvider.Constant(threadCount), new ThreadTimeoutProvider.Constant(threadTimeout));
        }

        public Default(LogicProvider<? super E, ? extends Consumer<? super E>> logicProvider, ThreadCountProvider threadCountProvider) {
            this(logicProvider, threadCountProvider, null);
        }

        public Default(LogicProvider<? super E, ? extends Consumer<? super E>> logicProvider, ThreadCountProvider threadCountProvider, ThreadTimeoutProvider threadTimeout) {
            this.logicProvider = X.notNull(logicProvider);
            this.threadCountProvider = X.notNull(threadCountProvider);
            this.threadTimeoutProvider = X.coalesce(threadTimeout, DEFAULT_THREAD_TIMEOUT_PROVIDER);
            this.threads = new BulkList();
        }

        private void touch() {
            this.lastTouched = System.currentTimeMillis();
        }

        private void checkWorkerCreation() {
            if (XTypes.to_int(this.threads.size()) < this.threadCountProvider.maxThreadCount()) {
                this.threads.add(XThreads.start(this.createWorkerThread()));
            }
        }

        private WorkerThread createWorkerThread() {
            return new WorkerThread(XTypes.to_int(this.threads.size()));
        }

        private boolean isTimedOut() {
            return System.currentTimeMillis() - this.lastTouched > (long)this.threadTimeoutProvider.threadTimeout();
        }

        private boolean isOversized() {
            return this.threadCountProvider.maxThreadCount() < XTypes.to_int(this.threads.size());
        }

        private void checkThreadTimeout() {
            if ((this.isTimedOut() || this.isOversized()) && XTypes.to_int(this.threads.size()) > 0) {
                this.touch();
                this.threads.last().interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int currentThreadCount() {
            BulkList<WorkerThread> bulkList = this.threads;
            synchronized (bulkList) {
                return XTypes.to_int(this.threads.size());
            }
        }

        @Override
        public final void accept(E element) {
            this.enqueueEntry(new Entry<E>(element));
        }

        private synchronized void enqueueEntry(Entry<E> newEntry) {
            Entry<E> entry;
            if (this.head == null) {
                this.head = newEntry;
                entry = this.head;
            } else {
                this.tail.next = newEntry;
                entry = this.tail.next;
            }
            this.tail = entry;
            this.touch();
            this.checkWorkerCreation();
            this.notifyAll();
        }

        final <S, E1 extends S, P extends Consumer<? super S>> void runWorker(WorkerThread worker) {
            Default.delegateRun(this, worker, this.logicProvider);
        }

        static <S, E extends S, P extends Consumer<? super S>> void delegateRun(Default<E> instance, WorkerThread thread, LogicProvider<S, P> provider) {
            try {
                P logic = provider.provideLogic();
                thread.setName(Default.buildThreadName(logic.getClass(), thread.number));
                try {
                    while (true) {
                        logic.accept(instance.get());
                    }
                }
                catch (Throwable t) {
                    provider.disposeLogic(logic, t);
                }
            }
            finally {
                instance.removeThread(thread);
            }
        }

        static String buildThreadName(Class<?> logicClass, int threadNumber) {
            String logicClassName = logicClass.getName();
            int i = logicClassName.length();
            while (i-- > 0) {
                if (logicClassName.charAt(i) != '.') continue;
                logicClassName = logicClassName.substring(i + 1);
                break;
            }
            return "Worker-" + threadNumber + '-' + logicClassName;
        }

        final synchronized E get() throws InterruptedException {
            while (this.head == null) {
                this.wait(this.threadTimeoutProvider.threadTimeout());
                this.checkThreadTimeout();
            }
            Object element = this.head.element;
            this.head = this.head.next;
            return element;
        }

        final synchronized void removeThread(WorkerThread thread) {
            this.threads.removeOne(thread);
        }

        private static final class Entry<E> {
            final E element;
            Entry<E> next;

            Entry(E element) {
                this.element = element;
            }
        }

        private final class WorkerThread
        extends Thread {
            final int number;

            WorkerThread(int number) {
                this.number = number;
            }

            @Override
            public void run() {
                Default.this.runWorker(this);
            }
        }
    }

    public static interface LogicProvider<S, P extends Consumer<? super S>> {
        public P provideLogic();

        public void disposeLogic(P var1, Throwable var2);

        public static final class SingletonLogic<S, P extends Consumer<? super S>>
        implements LogicProvider<S, P> {
            private final P logic;

            public SingletonLogic(P logic) {
                this.logic = logic;
            }

            @Override
            public P provideLogic() {
                return this.logic;
            }

            @Override
            public void disposeLogic(P logic, Throwable cause) {
            }
        }
    }

    public static interface ThreadCountProvider {
        public int maxThreadCount();

        public static class Constant
        implements ThreadCountProvider {
            private final int maxThreadCount;

            public Constant(int maxThreadCount) {
                this.maxThreadCount = XMath.positive(maxThreadCount);
            }

            @Override
            public int maxThreadCount() {
                return this.maxThreadCount;
            }
        }
    }

    public static interface ThreadTimeoutProvider {
        public int threadTimeout();

        public static class Constant
        implements ThreadTimeoutProvider {
            private final int threadTimeout;

            public Constant(int threadTimeout) {
                this.threadTimeout = XMath.positive(threadTimeout);
            }

            @Override
            public int threadTimeout() {
                return this.threadTimeout;
            }
        }
    }
}

