/*
 * Decompiled with CFR 0.152.
 */
package com.telamin.mongoose.dutycycle;

import com.fluxtion.agrona.concurrent.Agent;
import com.fluxtion.agrona.concurrent.DynamicCompositeAgent;
import com.fluxtion.agrona.concurrent.OneToOneConcurrentArrayQueue;
import com.fluxtion.runtime.StaticEventProcessor;
import com.fluxtion.runtime.annotations.feature.Experimental;
import com.fluxtion.runtime.input.EventFeed;
import com.fluxtion.runtime.lifecycle.Lifecycle;
import com.fluxtion.runtime.service.Service;
import com.telamin.mongoose.MongooseServer;
import com.telamin.mongoose.dispatch.EventFlowManager;
import com.telamin.mongoose.dispatch.ProcessorContext;
import com.telamin.mongoose.dutycycle.EventQueueToEventProcessor;
import com.telamin.mongoose.dutycycle.NamedEventProcessor;
import com.telamin.mongoose.internal.CoreAffinity;
import com.telamin.mongoose.service.EventSubscriptionKey;
import com.telamin.mongoose.service.scheduler.DeadWheelScheduler;
import com.telamin.mongoose.service.scheduler.SchedulerService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.logging.Logger;
import lombok.Generated;

@Experimental
public class ComposingEventProcessorAgent
extends DynamicCompositeAgent
implements EventFeed<EventSubscriptionKey<?>> {
    @Generated
    private static final Logger log = Logger.getLogger(ComposingEventProcessorAgent.class.getName());
    private final EventFlowManager eventFlowManager;
    private final ConcurrentHashMap<String, Service<?>> registeredServices;
    private final ConcurrentHashMap<String, NamedEventProcessor> registeredEventProcessors = new ConcurrentHashMap();
    private final ConcurrentHashMap<EventSubscriptionKey<?>, EventQueueToEventProcessor> queueProcessorMap = new ConcurrentHashMap();
    private final OneToOneConcurrentArrayQueue<Supplier<NamedEventProcessor>> toStartList = new OneToOneConcurrentArrayQueue(128);
    private final OneToOneConcurrentArrayQueue<String> toStopList = new OneToOneConcurrentArrayQueue(128);
    private final List<EventQueueToEventProcessor> queueReadersToAdd = new ArrayList<EventQueueToEventProcessor>();
    private final MongooseServer mongooseServer;
    private final DeadWheelScheduler scheduler;
    private final Service<SchedulerService> schedulerService;

    public ComposingEventProcessorAgent(String roleName, EventFlowManager eventFlowManager, MongooseServer mongooseServer, DeadWheelScheduler scheduler, ConcurrentHashMap<String, Service<?>> registeredServices) {
        super(roleName, new Agent[]{scheduler});
        this.eventFlowManager = eventFlowManager;
        this.mongooseServer = mongooseServer;
        this.scheduler = scheduler;
        this.registeredServices = registeredServices;
        this.schedulerService = new Service((Object)scheduler, SchedulerService.class);
    }

    public void addNamedEventProcessor(Supplier<NamedEventProcessor> initFunction) {
        this.toStartList.add(initFunction);
    }

    public void removeEventProcessorByName(String name) {
        this.toStopList.add((Object)name);
    }

    public void onStart() {
        Integer coreId;
        if (this.mongooseServer != null && (coreId = this.mongooseServer.resolveCoreIdForAgentName(this.roleName())) != null) {
            CoreAffinity.pinCurrentThreadToCore(coreId);
        }
        log.info("onStart " + this.roleName());
        this.checkForAdded();
        super.onStart();
    }

    public int doWork() throws Exception {
        this.checkForStopped();
        this.checkForAdded();
        return super.doWork();
    }

    public void onClose() {
        log.info("onClose " + this.roleName());
        super.onClose();
    }

    public void registerSubscriber(StaticEventProcessor subscriber) {
        log.info("registerSubscriber:" + String.valueOf(subscriber) + " " + this.roleName());
    }

    public void subscribe(StaticEventProcessor subscriber, EventSubscriptionKey<?> subscriptionKey) {
        Objects.requireNonNull(subscriber, "subscriber is null");
        Objects.requireNonNull(subscriptionKey, "subscriptionKey is null");
        log.info("subscribe subscriptionKey:" + String.valueOf(subscriptionKey) + " subscriber:" + String.valueOf(subscriber));
        EventQueueToEventProcessor eventQueueToEventProcessor = this.queueProcessorMap.get(subscriptionKey);
        if (eventQueueToEventProcessor == null) {
            eventQueueToEventProcessor = this.eventFlowManager.getMappingAgent(subscriptionKey, (Agent)this);
            this.queueProcessorMap.put(subscriptionKey, eventQueueToEventProcessor);
            this.queueReadersToAdd.add(eventQueueToEventProcessor);
            log.info("added new subscribe subscriptionKey:" + String.valueOf(subscriptionKey) + " subscriber:" + String.valueOf(subscriber));
        }
        eventQueueToEventProcessor.registerProcessor(subscriber);
        this.eventFlowManager.subscribe(subscriptionKey);
    }

    public void unSubscribe(StaticEventProcessor subscriber, EventSubscriptionKey<?> subscriptionKey) {
        EventQueueToEventProcessor eventQueueToEventProcessor;
        if (this.queueProcessorMap.containsKey(subscriptionKey) && (eventQueueToEventProcessor = this.queueProcessorMap.get(subscriptionKey)).deregisterProcessor(subscriber) == 0) {
            log.info("EventQueueToEventProcessor listener count = 0, removing subscription:" + String.valueOf(subscriptionKey));
            this.queueProcessorMap.remove(subscriptionKey);
            this.eventFlowManager.unSubscribe(subscriptionKey);
        }
    }

    public void removeAllSubscriptions(StaticEventProcessor subscriber) {
        log.info("removing all subscriptions for:" + String.valueOf(subscriber) + " " + this.roleName());
        this.queueProcessorMap.values().forEach(q -> q.deregisterProcessor(subscriber));
    }

    public Collection<NamedEventProcessor> registeredEventProcessors() {
        return this.registeredEventProcessors.values();
    }

    private void checkForAdded() {
        if (!this.toStartList.isEmpty()) {
            this.toStartList.drain(init -> {
                NamedEventProcessor namedEventProcessor = (NamedEventProcessor)init.get();
                StaticEventProcessor eventProcessor = namedEventProcessor.eventProcessor();
                this.registeredEventProcessors.put(namedEventProcessor.name(), namedEventProcessor);
                ProcessorContext.setCurrentProcessor(eventProcessor);
                eventProcessor.registerService(this.schedulerService);
                this.registeredServices.values().forEach(arg_0 -> ((StaticEventProcessor)eventProcessor).registerService(arg_0));
                eventProcessor.addEventFeed((EventFeed)this);
                if (eventProcessor instanceof Lifecycle) {
                    ((Lifecycle)eventProcessor).start();
                    ((Lifecycle)eventProcessor).startComplete();
                }
                ProcessorContext.removeCurrentProcessor();
            });
        }
        if (!this.queueReadersToAdd.isEmpty() && this.status() == DynamicCompositeAgent.Status.ACTIVE && this.tryAdd(this.queueReadersToAdd.get(0))) {
            this.queueReadersToAdd.remove(0);
        }
    }

    private void checkForStopped() {
        if (this.toStopList.isEmpty()) {
            return;
        }
        this.toStopList.drain(name -> {
            StaticEventProcessor eventProcessor;
            if (this.registeredEventProcessors.containsKey(name) && (eventProcessor = this.registeredEventProcessors.remove(name).eventProcessor()) instanceof Lifecycle) {
                ((Lifecycle)eventProcessor).stop();
                ((Lifecycle)eventProcessor).tearDown();
            }
        });
    }

    public boolean isProcessorRegistered(String processorName) {
        return this.registeredEventProcessors.containsKey(processorName);
    }
}

