/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.internal.agent;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.broadcast.BroadcastTransmitter;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.reaktivity.nukleus.AgentBuilder;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.function.CommandHandler;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.internal.LabelManager;
import org.reaktivity.reaktor.internal.agent.ElektronAgent;
import org.reaktivity.reaktor.internal.layouts.ControlLayout;
import org.reaktivity.reaktor.internal.router.Router;
import org.reaktivity.reaktor.internal.types.OctetsFW;
import org.reaktivity.reaktor.internal.types.control.CommandFW;
import org.reaktivity.reaktor.internal.types.control.ErrorFW;
import org.reaktivity.reaktor.internal.types.control.FreezeFW;
import org.reaktivity.reaktor.internal.types.control.FrozenFW;
import org.reaktivity.reaktor.internal.types.control.Role;
import org.reaktivity.reaktor.internal.types.control.RouteFW;
import org.reaktivity.reaktor.internal.types.control.RoutedFW;
import org.reaktivity.reaktor.internal.types.control.UnrouteFW;
import org.reaktivity.reaktor.internal.types.control.UnroutedFW;

public class NukleusAgent
implements Agent {
    private final CommandFW commandRO = new CommandFW();
    private final RouteFW routeRO = new RouteFW();
    private final UnrouteFW unrouteRO = new UnrouteFW();
    private final FreezeFW freezeRO = new FreezeFW();
    private final ErrorFW.Builder errorRW = new ErrorFW.Builder();
    private final RoutedFW.Builder routedRW = new RoutedFW.Builder();
    private final UnroutedFW.Builder unroutedRW = new UnroutedFW.Builder();
    private final FrozenFW.Builder frozenRW = new FrozenFW.Builder();
    private final ReaktorConfiguration config;
    private final Supplier<AgentBuilder> supplyAgentBuilder;
    private final LabelManager labels;
    private final List<ElektronAgent> elektronAgents;
    private final Map<String, Nukleus> nukleiByName;
    private final Router router;
    private final ControlLayout control;
    private final RingBuffer commandBuffer;
    private final BroadcastTransmitter responseBuffer;
    private final MutableDirectBuffer sendBuffer;
    private final MessageHandler commandHandler;

    public NukleusAgent(ReaktorConfiguration config, Supplier<AgentBuilder> supplyAgentBuilder) {
        this.config = config;
        this.supplyAgentBuilder = supplyAgentBuilder;
        this.labels = new LabelManager(config.directory());
        this.elektronAgents = new ArrayList<ElektronAgent>();
        this.nukleiByName = new HashMap<String, Nukleus>();
        this.control = new ControlLayout.Builder().controlPath(config.directory().resolve("control")).commandBufferCapacity(config.commandBufferCapacity()).responseBufferCapacity(config.responseBufferCapacity()).readonly(false).build();
        this.commandBuffer = new ManyToOneRingBuffer(this.control.commandBuffer());
        this.responseBuffer = new BroadcastTransmitter(this.control.responseBuffer());
        this.sendBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(this.responseBuffer.maxMsgLength()));
        this.router = new Router(config, this.labels, this.commandBuffer.maxMsgLength());
        this.commandHandler = this::handleCommand;
    }

    public String roleName() {
        return "reaktor/control";
    }

    public int doWork() throws Exception {
        return this.commandBuffer.read(this.commandHandler);
    }

    public void onClose() {
        CloseHelper.quietClose((AutoCloseable)this.control);
    }

    public Nukleus nukleus(String name) {
        return this.nukleiByName.get(name);
    }

    public <T extends Nukleus> T nukleus(Class<T> kind) {
        return (T)((Nukleus)this.nukleiByName.values().stream().filter(kind::isInstance).map(kind::cast).findFirst().orElse(null));
    }

    public LabelManager labels() {
        return this.labels;
    }

    public ElektronAgent supplyElektronAgent(int index, int count, ExecutorService executor, Function<String, BitSet> affinityMask) {
        ElektronAgent newElektronAgent = new ElektronAgent(index, count, this.config, this.labels, executor, affinityMask, this.router::readonlyRoutesBuffer, this.supplyAgentBuilder);
        this.elektronAgents.add(newElektronAgent);
        return newElektronAgent;
    }

    public void assign(Nukleus nukleus) {
        this.nukleiByName.putIfAbsent(nukleus.name(), nukleus);
    }

    public void unassign(Nukleus nukleus) {
        this.nukleiByName.remove(nukleus.name());
    }

    public boolean isEmpty() {
        return this.nukleiByName.isEmpty();
    }

    Router router() {
        return this.router;
    }

    public void onRouteable(long routeId, Nukleus nukleus) {
        this.elektronAgents.forEach(a -> a.onRouteable(routeId, nukleus));
    }

    public void onRouted(Nukleus nukleus, RouteKind routeKind, long routeId, OctetsFW extension) {
        this.elektronAgents.forEach(a -> a.onRouted(nukleus, routeKind, routeId, extension));
    }

    void onUnrouted(Nukleus nukleus, RouteKind routeKind, long routeId) {
        this.elektronAgents.forEach(a -> a.onUnrouted(nukleus, routeKind, routeId));
    }

    private void doError(long correlationId) {
        ErrorFW error = this.errorRW.wrap(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(correlationId).build();
        this.responseBuffer.transmit(error.typeId(), error.buffer(), error.offset(), error.sizeof());
    }

    private void doRouted(long correlationId, long routeId) {
        RoutedFW routed = this.routedRW.wrap(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(correlationId).routeId(routeId).build();
        this.responseBuffer.transmit(routed.typeId(), routed.buffer(), routed.offset(), routed.sizeof());
    }

    private void doUnrouted(long correlationId) {
        UnroutedFW unrouted = this.unroutedRW.wrap(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(correlationId).build();
        this.responseBuffer.transmit(unrouted.typeId(), unrouted.buffer(), unrouted.offset(), unrouted.sizeof());
    }

    private void doFrozen(long correlationId) {
        FrozenFW frozen = this.frozenRW.wrap(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(correlationId).build();
        this.responseBuffer.transmit(frozen.typeId(), frozen.buffer(), frozen.offset(), frozen.sizeof());
    }

    private void handleCommand(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 1: {
                RouteFW route = this.routeRO.wrap((DirectBuffer)buffer, index, index + length);
                this.onRoute(route);
                break;
            }
            case 2: {
                UnrouteFW unroute = this.unrouteRO.wrap((DirectBuffer)buffer, index, index + length);
                this.onUnroute(unroute);
                break;
            }
            case 3: {
                FreezeFW freeze = this.freezeRO.wrap((DirectBuffer)buffer, index, index + length);
                this.onFreeze(freeze);
                break;
            }
            default: {
                this.onUnrecognized(msgTypeId, (DirectBuffer)buffer, index, length);
            }
        }
    }

    private void onRoute(RouteFW route) {
        long correlationId = route.correlationId();
        String nukleusName = route.nukleus().asString();
        Nukleus nukleus = this.nukleus(nukleusName);
        try {
            Role role = route.role().get();
            RouteKind routeKind = RouteKind.valueOf((int)role.ordinal());
            MessagePredicate routeHandler = nukleus.routeHandler(routeKind);
            if (routeHandler == null) {
                routeHandler = (t, b, i, l) -> true;
            }
            RouteFW newRoute = this.router.generateRouteId(route);
            long newRouteId = newRoute.correlationId();
            this.onRouteable(newRouteId, nukleus);
            if (this.router.doRoute(newRoute, routeHandler)) {
                this.onRouted(nukleus, routeKind, newRouteId, route.extension());
                Thread.sleep(this.config.routedDelayMillis());
                this.doRouted(correlationId, newRouteId);
            } else {
                this.doError(correlationId);
            }
        }
        catch (Exception ex) {
            this.doError(correlationId);
            LangUtil.rethrowUnchecked((Throwable)ex);
        }
    }

    private void onUnroute(UnrouteFW unroute) {
        long correlationId = unroute.correlationId();
        String nukleusName = unroute.nukleus().asString();
        Nukleus nukleus = this.nukleus(nukleusName);
        try {
            long routeId = unroute.routeId();
            RouteKind routeKind = this.replyRouteKind(routeId);
            MessagePredicate routeHandler = nukleus.routeHandler(routeKind);
            if (routeHandler == null) {
                routeHandler = (t, b, i, l) -> true;
            }
            if (this.router.doUnroute(unroute, routeHandler)) {
                this.onUnrouted(nukleus, routeKind, routeId);
                this.doUnrouted(correlationId);
            } else {
                this.doError(correlationId);
            }
        }
        catch (Exception ex) {
            this.doError(correlationId);
            LangUtil.rethrowUnchecked((Throwable)ex);
        }
    }

    private void onUnrecognized(int msgTypeId, DirectBuffer buffer, int index, int length) {
        CommandFW command = this.commandRO.wrap(buffer, index, index + length);
        String nukleusName = command.nukleus().asString();
        Nukleus nukleus = this.nukleus(nukleusName);
        CommandHandler handler = nukleus.commandHandler(msgTypeId);
        if (handler != null) {
            handler.handle(buffer, index, length, (arg_0, arg_1, arg_2, arg_3) -> ((BroadcastTransmitter)this.responseBuffer).transmit(arg_0, arg_1, arg_2, arg_3), this.sendBuffer);
        } else {
            this.doError(command.correlationId());
        }
    }

    private void onFreeze(FreezeFW freeze) {
        long correlationId = freeze.correlationId();
        String nukleusName = freeze.nukleus().asString();
        Nukleus nukleus = this.nukleus(nukleusName);
        this.unassign(nukleus);
        this.doFrozen(correlationId);
    }

    private RouteKind replyRouteKind(long routeId) {
        return RouteKind.valueOf((int)((int)(routeId >> 28) & 0xF));
    }
}

