/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.eos.akka.registry.listener.type;

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.ddata.ORMap;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.typed.javadsl.DistributedData;
import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
import akka.japi.function.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor;
import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.TerminateListener;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntityTypeListenerActor
extends AbstractBehavior<TypeListenerCommand> {
    private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class);
    private final Map<DOMEntity, ActorRef<ListenerCommand>> activeListeners = new HashMap<DOMEntity, ActorRef<ListenerCommand>>();
    private final String localMember;
    private final String entityType;
    private final DOMEntityOwnershipListener listener;

    public EntityTypeListenerActor(ActorContext<TypeListenerCommand> context, String localMember, String entityType, DOMEntityOwnershipListener listener) {
        super(context);
        this.localMember = localMember;
        this.entityType = entityType;
        this.listener = listener;
        new ReplicatorMessageAdapter(context, DistributedData.get((ActorSystem)context.getSystem()).replicator(), Duration.ofSeconds(5L)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
    }

    public static Behavior<TypeListenerCommand> create(String localMember, String entityType, DOMEntityOwnershipListener listener) {
        return Behaviors.setup((Function & Serializable)ctx -> new EntityTypeListenerActor((ActorContext<TypeListenerCommand>)ctx, localMember, entityType, listener));
    }

    public Receive<TypeListenerCommand> createReceive() {
        return this.newReceiveBuilder().onMessage(CandidatesChanged.class, this::onCandidatesChanged).onMessage(EntityOwnerChanged.class, this::onOwnerChanged).onMessage(TerminateListener.class, this::onTerminate).build();
    }

    private Behavior<TypeListenerCommand> onCandidatesChanged(CandidatesChanged notification) {
        Replicator.SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response = notification.getResponse();
        if (response instanceof Replicator.Changed) {
            this.processCandidates(((ORMap)((Replicator.Changed)response).get(response.key())).getEntries());
        } else {
            LOG.warn("Unexpected notification from replicator: {}", response);
        }
        return this;
    }

    private void processCandidates(Map<DOMEntity, ORSet<String>> entries) {
        Map<DOMEntity, ORSet> filteredCandidates = entries.entrySet().stream().filter(entry -> ((DOMEntity)entry.getKey()).getType().equals(this.entityType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        LOG.debug("Entity-type: {} current candidates: {}", (Object)this.entityType, filteredCandidates);
        ImmutableSet removed = ImmutableSet.copyOf((Collection)Sets.difference(this.activeListeners.keySet(), filteredCandidates.keySet()));
        if (!removed.isEmpty()) {
            LOG.debug("Stopping listeners for {}", (Object)removed);
            removed.forEach(removedEntity -> this.getContext().stop(this.activeListeners.remove(removedEntity)));
        }
        for (Map.Entry<DOMEntity, ORSet> entry2 : filteredCandidates.entrySet()) {
            this.activeListeners.computeIfAbsent(entry2.getKey(), key -> {
                LOG.debug("Starting listener for {}", key);
                return this.getContext().spawn(SingleEntityListenerActor.create(this.localMember, key, (ActorRef<TypeListenerCommand>)this.getContext().getSelf()), "SingleEntityListener-" + EntityTypeListenerActor.encodeEntityToActorName(key));
            });
        }
    }

    private Behavior<TypeListenerCommand> onOwnerChanged(EntityOwnerChanged rsp) {
        LOG.debug("{} : Entity-type: {} listener, owner change: {}", new Object[]{this.localMember, this.entityType, rsp});
        this.listener.ownershipChanged(rsp.entity(), rsp.change(), false);
        return this;
    }

    private Behavior<TypeListenerCommand> onTerminate(TerminateListener command) {
        LOG.debug("Terminating listener for type: {}, listener: {}", (Object)this.entityType, (Object)this.listener);
        return Behaviors.stopped();
    }

    private static String encodeEntityToActorName(DOMEntity entity) {
        return "type=" + entity.getType() + ",entity=" + ((YangInstanceIdentifier)entity.getIdentifier()).getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID();
    }
}

