/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.eos.akka;

import akka.actor.ActorSystem;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.Scheduler;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.typed.Cluster;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.eos.akka.CandidateRegistration;
import org.opendaylight.controller.eos.akka.DataCenterControl;
import org.opendaylight.controller.eos.akka.ListenerRegistration;
import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingCodecTree;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
import org.opendaylight.mdsal.eos.common.api.GenericEntity;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.Rpc;
import org.opendaylight.yangtools.yang.binding.RpcOutput;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Component(immediate=true, service={DOMEntityOwnershipService.class, DataCenterControl.class})
public class AkkaEntityOwnershipService
implements DOMEntityOwnershipService,
DataCenterControl,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
    private static final String DATACENTER_PREFIX = "dc";
    private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20L);
    private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(10L);
    private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
    private final String localCandidate;
    private final Scheduler scheduler;
    private final String datacenter;
    private final ActorRef<BootstrapCommand> bootstrap;
    private final RunningContext runningContext;
    private final ActorRef<CandidateRegistryCommand> candidateRegistry;
    private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
    private final ActorRef<StateCheckerCommand> ownerStateChecker;
    protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
    private final BindingInstanceIdentifierCodec iidCodec;
    private Registration reg;

    @VisibleForTesting
    protected AkkaEntityOwnershipService(ActorSystem actorSystem, BindingCodecTree codecTree) throws ExecutionException, InterruptedException {
        akka.actor.typed.ActorSystem typedActorSystem = Adapter.toTyped((ActorSystem)actorSystem);
        this.scheduler = typedActorSystem.scheduler();
        Cluster cluster = Cluster.get((akka.actor.typed.ActorSystem)typedActorSystem);
        this.datacenter = cluster.selfMember().dataCenter();
        this.localCandidate = cluster.selfMember().getRoles().stream().filter(role -> !role.contains(DATACENTER_PREFIX)).findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
        this.iidCodec = codecTree.getInstanceIdentifierCodec();
        this.bootstrap = Adapter.spawn((ActorSystem)actorSystem, (Behavior)Behaviors.setup((akka.japi.function.Function & Serializable)context -> EOSMain.create(this.iidCodec)), (String)"EOSBootstrap");
        CompletionStage ask = AskPattern.ask(this.bootstrap, GetRunningContext::new, (Duration)Duration.ofSeconds(5L), (Scheduler)this.scheduler);
        this.runningContext = (RunningContext)ask.toCompletableFuture().get();
        this.candidateRegistry = this.runningContext.getCandidateRegistry();
        this.listenerRegistry = this.runningContext.getListenerRegistry();
        this.ownerStateChecker = this.runningContext.getOwnerStateChecker();
        this.ownerSupervisor = this.runningContext.getOwnerSupervisor();
    }

    @Inject
    @Activate
    @SuppressFBWarnings(value={"MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR"}, justification="Non-final for testing 'this' reference is expected to be stable at registration time")
    public AkkaEntityOwnershipService(@Reference ActorSystemProvider actorProvider, @Reference RpcProviderService rpcProvider, @Reference BindingCodecTree codecTree) throws ExecutionException, InterruptedException {
        this(actorProvider.getActorSystem(), codecTree);
        this.reg = rpcProvider.registerRpcImplementations(new Rpc[]{this::getEntity, this::getEntities, this::getEntityOwner});
    }

    @Override
    @PreDestroy
    @Deactivate
    public void close() throws InterruptedException, ExecutionException {
        if (this.reg != null) {
            this.reg.close();
            this.reg = null;
        }
        AskPattern.ask(this.bootstrap, Terminate::new, (Duration)Duration.ofSeconds(5L), (Scheduler)this.scheduler).toCompletableFuture().get();
    }

    public Registration registerCandidate(DOMEntity entity) throws CandidateAlreadyRegisteredException {
        if (!this.registeredEntities.add(entity)) {
            throw new CandidateAlreadyRegisteredException((GenericEntity)entity);
        }
        RegisterCandidate msg = new RegisterCandidate(entity, this.localCandidate);
        LOG.debug("Registering candidate with message: {}", (Object)msg);
        this.candidateRegistry.tell((Object)msg);
        return new CandidateRegistration(entity, this);
    }

    public Registration registerListener(String entityType, DOMEntityOwnershipListener listener) {
        LOG.debug("Registering listener {} for type {}", (Object)listener, (Object)entityType);
        this.listenerRegistry.tell((Object)new RegisterListener(entityType, listener));
        return new ListenerRegistration(listener, entityType, this);
    }

    public Optional<EntityOwnershipState> getOwnershipState(DOMEntity entity) {
        GetOwnershipStateReply reply;
        LOG.debug("Retrieving ownership state for {}", (Object)entity);
        CompletionStage result = AskPattern.ask(this.ownerStateChecker, (akka.japi.function.Function & Serializable)replyTo -> new GetOwnershipState(entity, (ActorRef<GetOwnershipStateReply>)replyTo), (Duration)Duration.ofSeconds(5L), (Scheduler)this.scheduler);
        try {
            reply = (GetOwnershipStateReply)result.toCompletableFuture().get();
        }
        catch (InterruptedException | ExecutionException exception) {
            LOG.warn("Failed to retrieve ownership state for {}", (Object)entity, (Object)exception);
            return Optional.empty();
        }
        return Optional.ofNullable(reply.getOwnershipState());
    }

    public boolean isCandidateRegistered(DOMEntity forEntity) {
        return this.registeredEntities.contains(forEntity);
    }

    @Override
    public ListenableFuture<Empty> activateDataCenter() {
        LOG.debug("Activating datacenter: {}", (Object)this.datacenter);
        return AkkaEntityOwnershipService.toListenableFuture("Activate", AskPattern.ask(this.ownerSupervisor, ActivateDataCenter::new, (Duration)DATACENTER_OP_TIMEOUT, (Scheduler)this.scheduler));
    }

    @Override
    public ListenableFuture<Empty> deactivateDataCenter() {
        LOG.debug("Deactivating datacenter: {}", (Object)this.datacenter);
        return AkkaEntityOwnershipService.toListenableFuture("Deactivate", AskPattern.ask(this.ownerSupervisor, DeactivateDataCenter::new, (Duration)DATACENTER_OP_TIMEOUT, (Scheduler)this.scheduler));
    }

    @VisibleForTesting
    final ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(GetEntitiesInput input) {
        return AkkaEntityOwnershipService.toRpcFuture(AskPattern.ask(this.ownerStateChecker, GetEntitiesRequest::new, (Duration)QUERY_TIMEOUT, (Scheduler)this.scheduler), reply -> reply.toOutput(this.iidCodec));
    }

    @VisibleForTesting
    final ListenableFuture<RpcResult<GetEntityOutput>> getEntity(GetEntityInput input) {
        return AkkaEntityOwnershipService.toRpcFuture(AskPattern.ask(this.ownerStateChecker, (akka.japi.function.Function & Serializable)replyTo -> new GetEntityRequest((ActorRef<GetEntityReply>)replyTo, (EntityId)input), (Duration)QUERY_TIMEOUT, (Scheduler)this.scheduler), GetEntityReply::toOutput);
    }

    @VisibleForTesting
    final ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(GetEntityOwnerInput input) {
        return AkkaEntityOwnershipService.toRpcFuture(AskPattern.ask(this.ownerStateChecker, (akka.japi.function.Function & Serializable)replyTo -> new GetEntityOwnerRequest((ActorRef<GetEntityOwnerReply>)replyTo, (EntityId)input), (Duration)QUERY_TIMEOUT, (Scheduler)this.scheduler), GetEntityOwnerReply::toOutput);
    }

    void unregisterCandidate(DOMEntity entity) {
        LOG.debug("Unregistering candidate for {}", (Object)entity);
        if (this.registeredEntities.remove(entity)) {
            this.candidateRegistry.tell((Object)new UnregisterCandidate(entity, this.localCandidate));
        }
    }

    void unregisterListener(String entityType, DOMEntityOwnershipListener listener) {
        LOG.debug("Unregistering listener {} for type {}", (Object)listener, (Object)entityType);
        this.listenerRegistry.tell((Object)new UnregisterListener(entityType, listener));
    }

    @VisibleForTesting
    RunningContext getRunningContext() {
        return this.runningContext;
    }

    private static <R extends StateCheckerReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(CompletionStage<R> stage, Function<R, O> outputFunction) {
        SettableFuture future = SettableFuture.create();
        stage.whenComplete((reply, failure) -> {
            if (failure != null) {
                future.setException(failure);
            } else {
                future.set((Object)RpcResultBuilder.success((Object)((RpcOutput)outputFunction.apply(reply))).build());
            }
        });
        return future;
    }

    private static ListenableFuture<Empty> toListenableFuture(String op, CompletionStage<?> stage) {
        SettableFuture future = SettableFuture.create();
        stage.whenComplete((reply, failure) -> {
            if (failure != null) {
                LOG.warn("{} DataCenter failed", (Object)op, failure);
                future.setException(failure);
            } else {
                LOG.debug("{} DataCenter successful", (Object)op);
                future.set((Object)Empty.value());
            }
        });
        return future;
    }
}

