/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.util.executor;

import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.logging.ILogger;
import com.hazelcast.util.HashUtil;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.executor.StripedRunnable;
import com.hazelcast.util.executor.TimeoutRunnable;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public final class StripedExecutor
implements Executor {
    public static final AtomicLong THREAD_ID_GENERATOR = new AtomicLong();
    private final int size;
    private final ILogger logger;
    private final Worker[] workers;
    private final Random rand = new Random();
    private volatile boolean live = true;

    public StripedExecutor(ILogger logger, String threadNamePrefix, int threadCount, int maximumQueueCapacity) {
        Preconditions.checkPositive(threadCount, "threadCount should be positive but found " + threadCount);
        Preconditions.checkPositive(maximumQueueCapacity, "maximumQueueCapacity should be positive but found " + maximumQueueCapacity);
        this.logger = logger;
        this.size = threadCount;
        this.workers = new Worker[threadCount];
        int perThreadMaxQueueCapacity = (int)Math.ceil(1.0 * (double)maximumQueueCapacity / (double)threadCount);
        for (int i = 0; i < threadCount; ++i) {
            Worker worker = new Worker(threadNamePrefix, perThreadMaxQueueCapacity);
            worker.start();
            this.workers[i] = worker;
        }
    }

    public int getWorkQueueSize() {
        int size = 0;
        for (Worker worker : this.workers) {
            size += worker.workQueue.size();
        }
        return size;
    }

    public long processedCount() {
        long size = 0L;
        for (Worker worker : this.workers) {
            size += worker.processed.inc();
        }
        return size;
    }

    public void shutdown() {
        this.live = false;
        for (Worker worker : this.workers) {
            worker.workQueue.clear();
            worker.interrupt();
        }
    }

    public boolean isLive() {
        return this.live;
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException("command can't be null");
        }
        if (!this.live) {
            throw new RejectedExecutionException("Executor is terminated!");
        }
        Worker worker = this.getWorker(command);
        worker.schedule(command);
    }

    private Worker getWorker(Runnable command) {
        int key = command instanceof StripedRunnable ? ((StripedRunnable)command).getKey() : this.rand.nextInt();
        int index = HashUtil.hashToIndex(key, this.size);
        return this.workers[index];
    }

    Worker[] getWorkers() {
        return this.workers;
    }

    final class Worker
    extends Thread {
        private final BlockingQueue<Runnable> workQueue;
        private final SwCounter processed;
        private final int queueCapacity;

        private Worker(String threadNamePrefix, int queueCapacity) {
            super(threadNamePrefix + "-" + THREAD_ID_GENERATOR.incrementAndGet());
            this.processed = SwCounter.newSwCounter();
            this.workQueue = new LinkedBlockingQueue<Runnable>(queueCapacity);
            this.queueCapacity = queueCapacity;
        }

        private void schedule(Runnable command) {
            boolean offered;
            long timeout = 0L;
            TimeUnit timeUnit = TimeUnit.SECONDS;
            if (command instanceof TimeoutRunnable) {
                TimeoutRunnable timeoutRunnable = (TimeoutRunnable)command;
                timeout = timeoutRunnable.getTimeout();
                timeUnit = timeoutRunnable.getTimeUnit();
            }
            try {
                offered = timeout == 0L ? this.workQueue.offer(command) : this.workQueue.offer(command, timeout, timeUnit);
            }
            catch (InterruptedException e) {
                throw new RejectedExecutionException("Thread is interrupted while offering work");
            }
            if (!offered) {
                throw new RejectedExecutionException("Task: " + command + " is rejected, the worker queue is full!");
            }
        }

        @Override
        public void run() {
            block4: while (true) {
                try {
                    while (true) {
                        try {
                            Runnable task = this.workQueue.take();
                            this.process(task);
                            continue block4;
                        }
                        catch (InterruptedException e) {
                            if (StripedExecutor.this.live) continue;
                            return;
                        }
                        break;
                    }
                }
                catch (Throwable t) {
                    StripedExecutor.this.logger.severe(this.getName() + " caught an exception", t);
                    continue;
                }
                break;
            }
        }

        private void process(Runnable task) {
            this.processed.inc();
            try {
                task.run();
            }
            catch (Throwable e) {
                OutOfMemoryErrorDispatcher.inspectOutOfMemoryError(e);
                StripedExecutor.this.logger.severe(this.getName() + " caught an exception while processing task:" + task, e);
            }
        }

        int getQueueCapacity() {
            return this.queueCapacity;
        }
    }
}

