/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.LangUtil;
import org.agrona.concurrent.AgentRunner;
import org.reaktivity.reaktor.ReaktorAffinity;
import org.reaktivity.reaktor.ReaktorBuilder;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.ReaktorLoad;
import org.reaktivity.reaktor.ext.ReaktorExtContext;
import org.reaktivity.reaktor.ext.ReaktorExtSpi;
import org.reaktivity.reaktor.internal.Info;
import org.reaktivity.reaktor.internal.LabelManager;
import org.reaktivity.reaktor.internal.Tuning;
import org.reaktivity.reaktor.internal.context.ConfigureTask;
import org.reaktivity.reaktor.internal.context.DispatchAgent;
import org.reaktivity.reaktor.internal.stream.NamespacedId;
import org.reaktivity.reaktor.nukleus.Configuration;
import org.reaktivity.reaktor.nukleus.Nukleus;

public final class Reaktor
implements AutoCloseable {
    private final Collection<Nukleus> nuklei;
    private final ExecutorService tasks;
    private final Callable<Void> configure;
    private final Collection<AgentRunner> runners;
    private final ToLongFunction<String> counter;
    private final Tuning tuning;
    private final List<ReaktorExtSpi> extensions;
    private final ReaktorExtContext context;
    private final AtomicInteger nextTaskId = new AtomicInteger();
    private final ThreadFactory factory = Executors.defaultThreadFactory();
    private final ToIntFunction<String> supplyLabelId;
    private Future<Void> configureRef;

    Reaktor(ReaktorConfiguration config, Collection<Nukleus> nuklei, ErrorHandler errorHandler, URL configURL, int coreCount, Collection<ReaktorAffinity> affinities) {
        ExecutorService tasks = null;
        if (config.taskParallelism() > 0) {
            tasks = Executors.newFixedThreadPool(config.taskParallelism(), this::newTaskThread);
        }
        Info info = new Info(config.directory(), coreCount);
        info.reset();
        LabelManager labels = new LabelManager(config.directory());
        Tuning tuning = new Tuning(config.directory(), coreCount);
        tuning.reset();
        for (ReaktorAffinity affinity : affinities) {
            int namespaceId = labels.supplyLabelId(affinity.namespace);
            int bindingId = labels.supplyLabelId(affinity.binding);
            long routeId = NamespacedId.id(namespaceId, bindingId);
            tuning.affinity(routeId, affinity.mask);
        }
        this.tuning = tuning;
        LinkedHashSet<DispatchAgent> dispatchers = new LinkedHashSet<DispatchAgent>();
        for (int coreIndex = 0; coreIndex < coreCount; ++coreIndex) {
            DispatchAgent agent = new DispatchAgent(config, configURL, tasks, labels, errorHandler, tuning::affinity, nuklei, coreIndex);
            dispatchers.add(agent);
        }
        Consumer<String> logger = config.verbose() ? System.out::print : m -> {};
        List<ReaktorExtSpi> extensions = ServiceLoader.load(ReaktorExtSpi.class).stream().map(ServiceLoader.Provider::get).collect(Collectors.toList());
        ContextImpl context = new ContextImpl(config, errorHandler, dispatchers);
        ConfigureTask configure = new ConfigureTask(configURL, labels::supplyLabelId, tuning, dispatchers, errorHandler, logger, context, extensions);
        ArrayList<AgentRunner> runners = new ArrayList<AgentRunner>(dispatchers.size());
        dispatchers.forEach(d -> runners.add(d.runner()));
        ToLongFunction<String> counter = name -> dispatchers.stream().mapToLong(d -> d.counter((String)name)).sum();
        this.supplyLabelId = labels::supplyLabelId;
        this.nuklei = nuklei;
        this.tasks = tasks;
        this.configure = configure;
        this.extensions = extensions;
        this.context = context;
        this.runners = runners;
        this.counter = counter;
    }

    public <T> T nukleus(Class<T> kind) {
        return this.nuklei.stream().filter(kind::isInstance).map(kind::cast).findFirst().orElse(null);
    }

    public ReaktorLoad load(String namespace, String binding) {
        return this.context.load(namespace, binding);
    }

    public long counter(String name) {
        return this.counter.applyAsLong(name);
    }

    public Future<Void> start() {
        for (AgentRunner runner : this.runners) {
            AgentRunner.startOnThread((AgentRunner)runner, Thread::new);
        }
        this.configureRef = ForkJoinPool.commonPool().submit(this.configure);
        return this.configureRef;
    }

    @Override
    public void close() throws Exception {
        ArrayList<Throwable> errors = new ArrayList<Throwable>();
        this.configureRef.cancel(true);
        for (AgentRunner runner : this.runners) {
            try {
                CloseHelper.close((AutoCloseable)runner);
            }
            catch (Throwable ex) {
                errors.add(ex);
            }
        }
        if (this.tasks != null) {
            this.tasks.shutdownNow();
        }
        this.tuning.close();
        this.extensions.forEach(e -> e.onClosed(this.context));
        if (!errors.isEmpty()) {
            Throwable t = (Throwable)errors.get(0);
            errors.stream().filter(x -> x != t).forEach(x -> t.addSuppressed((Throwable)x));
            LangUtil.rethrowUnchecked((Throwable)t);
        }
    }

    public static ReaktorBuilder builder() {
        return new ReaktorBuilder();
    }

    private Thread newTaskThread(Runnable r) {
        Thread t = this.factory.newThread(r);
        if (t != null) {
            t.setName(String.format("reaktor/task#%d", this.nextTaskId.getAndIncrement()));
        }
        return t;
    }

    private final class ContextImpl
    implements ReaktorExtContext {
        private final Configuration config;
        private final ErrorHandler errorHandler;
        private final Collection<DispatchAgent> dispatchers;

        private ContextImpl(Configuration config, ErrorHandler errorHandler, Collection<DispatchAgent> dispatchers) {
            this.config = config;
            this.errorHandler = errorHandler;
            this.dispatchers = dispatchers;
        }

        @Override
        public Configuration config() {
            return this.config;
        }

        @Override
        public ReaktorLoad load(String namespace, String binding) {
            int namespaceId = Reaktor.this.supplyLabelId.applyAsInt(namespace);
            int bindingId = Reaktor.this.supplyLabelId.applyAsInt(binding);
            long namespacedId = NamespacedId.id(namespaceId, bindingId);
            return new LoadImpl(namespacedId);
        }

        @Override
        public void onError(Exception error) {
            this.errorHandler.onError((Throwable)error);
        }

        private final class LoadImpl
        implements ReaktorLoad {
            private final ToLongFunction<? super DispatchAgent> initialOpens = d -> d.initialOpens(id);
            private final ToLongFunction<? super DispatchAgent> initialCloses = d -> d.initialCloses(id);
            private final ToLongFunction<? super DispatchAgent> initialErrors = d -> d.initialErrors(id);
            private final ToLongFunction<? super DispatchAgent> initialBytes = d -> d.initialBytes(id);
            private final ToLongFunction<? super DispatchAgent> replyOpens = d -> d.replyOpens(id);
            private final ToLongFunction<? super DispatchAgent> replyCloses = d -> d.replyCloses(id);
            private final ToLongFunction<? super DispatchAgent> replyErrors = d -> d.replyErrors(id);
            private final ToLongFunction<? super DispatchAgent> replyBytes = d -> d.replyBytes(id);

            private LoadImpl(long id) {
            }

            @Override
            public long initialOpens() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.initialOpens).sum();
            }

            @Override
            public long initialCloses() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.initialCloses).sum();
            }

            @Override
            public long initialBytes() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.initialBytes).sum();
            }

            @Override
            public long initialErrors() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.initialErrors).sum();
            }

            @Override
            public long replyOpens() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.replyOpens).sum();
            }

            @Override
            public long replyCloses() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.replyCloses).sum();
            }

            @Override
            public long replyBytes() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.replyBytes).sum();
            }

            @Override
            public long replyErrors() {
                return ContextImpl.this.dispatchers.stream().mapToLong(this.replyErrors).sum();
            }
        }
    }
}

