/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor;

import java.net.URL;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongFunction;
import java.util.function.ToLongFunction;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.LangUtil;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.AgentRunner;
import org.reaktivity.reaktor.ReaktorAffinity;
import org.reaktivity.reaktor.ReaktorBuilder;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.internal.LabelManager;
import org.reaktivity.reaktor.internal.context.ConfigureTask;
import org.reaktivity.reaktor.internal.context.DispatchAgent;
import org.reaktivity.reaktor.internal.stream.NamespacedId;
import org.reaktivity.reaktor.nukleus.Nukleus;

public final class Reaktor
implements AutoCloseable {
    private final Collection<Nukleus> nuklei;
    private final ExecutorService tasks;
    private final Callable<Void> configure;
    private final Collection<AgentRunner> runners;
    private final ToLongFunction<String> counter;
    private final AtomicInteger nextTaskId = new AtomicInteger();
    private final ThreadFactory factory = Executors.defaultThreadFactory();
    private Future<Void> configureRef;

    Reaktor(ReaktorConfiguration config, Collection<Nukleus> nuklei, ErrorHandler errorHandler, URL configURL, int coreCount, Collection<ReaktorAffinity> affinities) {
        ExecutorService tasks = null;
        if (config.taskParallelism() > 0) {
            tasks = Executors.newFixedThreadPool(config.taskParallelism(), this::newTaskThread);
        }
        LabelManager labels = new LabelManager(config.directory());
        LinkedHashSet<DispatchAgent> dispatchers = new LinkedHashSet<DispatchAgent>();
        for (int coreIndex = 0; coreIndex < coreCount; ++coreIndex) {
            BitSet defaultMask = BitSet.valueOf(new long[]{(1L << coreCount) - 1L});
            Long2ObjectHashMap affinityMasks = new Long2ObjectHashMap();
            for (ReaktorAffinity affinity : affinities) {
                int namespaceId = labels.supplyLabelId(affinity.namespace);
                int bindingId = labels.supplyLabelId(affinity.binding);
                long routeId = NamespacedId.id(namespaceId, bindingId);
                BitSet mask = BitSet.valueOf(new long[]{affinity.mask});
                affinityMasks.put(routeId, (Object)mask);
            }
            LongFunction<BitSet> defaulter = r -> defaultMask;
            LongFunction<BitSet> affinityMask = r -> (BitSet)affinityMasks.computeIfAbsent(r, defaulter);
            DispatchAgent agent = new DispatchAgent(config, configURL, tasks, labels, errorHandler, affinityMask, nuklei, coreIndex);
            dispatchers.add(agent);
        }
        ConfigureTask configure = new ConfigureTask(configURL, labels::supplyLabelId, dispatchers, errorHandler);
        ArrayList<AgentRunner> runners = new ArrayList<AgentRunner>(dispatchers.size());
        dispatchers.forEach(d -> runners.add(d.runner()));
        ToLongFunction<String> counter = name -> dispatchers.stream().mapToLong(d -> d.counter((String)name)).sum();
        this.nuklei = nuklei;
        this.tasks = tasks;
        this.configure = configure;
        this.runners = runners;
        this.counter = counter;
    }

    public <T> T nukleus(Class<T> kind) {
        return this.nuklei.stream().filter(kind::isInstance).map(kind::cast).findFirst().orElse(null);
    }

    public long counter(String name) {
        return this.counter.applyAsLong(name);
    }

    public Future<Void> start() {
        for (AgentRunner runner : this.runners) {
            AgentRunner.startOnThread((AgentRunner)runner, Thread::new);
        }
        this.configureRef = ForkJoinPool.commonPool().submit(this.configure);
        return this.configureRef;
    }

    @Override
    public void close() throws Exception {
        ArrayList<Throwable> errors = new ArrayList<Throwable>();
        this.configureRef.cancel(true);
        for (AgentRunner runner : this.runners) {
            try {
                CloseHelper.close((AutoCloseable)runner);
            }
            catch (Throwable ex) {
                errors.add(ex);
            }
        }
        if (this.tasks != null) {
            this.tasks.shutdownNow();
        }
        if (!errors.isEmpty()) {
            Throwable t = (Throwable)errors.get(0);
            errors.stream().filter(x -> x != t).forEach(x -> t.addSuppressed((Throwable)x));
            LangUtil.rethrowUnchecked((Throwable)t);
        }
    }

    public static ReaktorBuilder builder() {
        return new ReaktorBuilder();
    }

    private Thread newTaskThread(Runnable r) {
        Thread t = this.factory.newThread(r);
        if (t != null) {
            t.setName(String.format("reaktor/task#%d", this.nextTaskId.getAndIncrement()));
        }
        return t;
    }
}

