/*
 * Decompiled with CFR 0.152.
 */
package com.github.xgp.util;

import com.github.xgp.util.Managed;
import com.github.xgp.util.Reactor;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class ScheduledReactor<T>
implements Reactor<T>,
Managed {
    protected final ScheduledExecutorService pending;
    protected final Function<T, ? extends Object> function;
    protected final Set<T> processing;
    protected final int threads;

    public ScheduledReactor(Function<T, ? extends Object> function) {
        this(function, Runtime.getRuntime().availableProcessors());
    }

    public ScheduledReactor(Function<T, ? extends Object> function, int threads) {
        this.function = function;
        this.threads = threads;
        this.processing = Collections.newSetFromMap(new WeakHashMap());
        this.pending = Executors.newScheduledThreadPool(threads);
    }

    @Override
    public Set<T> getProcessing() {
        return this.processing;
    }

    @Override
    public boolean isRunning() {
        return !this.pending.isShutdown();
    }

    @Override
    public void start() throws Exception {
        Managed.addShutdownHook(this);
    }

    @Override
    public void stop() {
        this.pending.shutdown();
    }

    @Override
    public void await() {
        try {
            this.pending.awaitTermination((long)this.threads * 1000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void schedule(T e) {
        this.schedule(e, 0L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void schedule(T e, Reactor.FailureHandler<T> handler) {
        this.schedule(e, 0L, TimeUnit.MILLISECONDS, handler);
    }

    @Override
    public void schedule(T e, long delay, TimeUnit unit) {
        this.schedule(e, delay, unit, null);
    }

    @Override
    public void schedule(T e, long delay, TimeUnit unit, Reactor.FailureHandler<T> handler) {
        this.schedule(new Reactor.Task<T>(e, handler), delay, unit);
    }

    @Override
    void schedule(final Reactor.Task<T> task, long delay, TimeUnit unit) {
        final ScheduledReactor reactor = this;
        this.pending.schedule(new Runnable(){

            @Override
            public void run() {
                try {
                    ScheduledReactor.this.processing.add(task.getTask());
                    ScheduledReactor.this.function.apply(task.getTask());
                }
                catch (Throwable t) {
                    if (task.getHandler() != null) {
                        try {
                            task.getHandler().onFailure(t, reactor, task.getTask());
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                }
                finally {
                    ScheduledReactor.this.processing.remove(task.getTask());
                }
            }
        }, delay, unit);
    }
}

