/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.management;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.TimeoutStream;
import io.vertx.mutiny.core.Vertx;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.jobs.service.management.MessagingChangeEvent;
import org.kie.kogito.jobs.service.management.ReleaseLeaderEvent;
import org.kie.kogito.jobs.service.messaging.MessagingHandler;
import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class JobServiceInstanceManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceInstanceManager.class);
    @ConfigProperty(name="kogito.jobs-service.management.heartbeat.interval-in-seconds", defaultValue="1")
    long heardBeatIntervalInSeconds;
    @ConfigProperty(name="kogito.jobs-service.management.leader-check.interval-in-seconds", defaultValue="1")
    long leaderCheckIntervalInSeconds;
    @ConfigProperty(name="kogito.jobs-service.management.heartbeat.expiration-in-seconds", defaultValue="10")
    long heartbeatExpirationInSeconds;
    @ConfigProperty(name="kogito.jobs-service.management.heartbeat.management-id", defaultValue="kogito-jobs-service-leader")
    String leaderManagementId;
    @Inject
    Instance<MessagingHandler> messagingHandlerInstance;
    @Inject
    Event<MessagingChangeEvent> messagingChangeEventEvent;
    @Inject
    Vertx vertx;
    @Inject
    JobServiceManagementRepository repository;
    private TimeoutStream checkLeader;
    private TimeoutStream heartbeat;
    private final AtomicReference<JobServiceManagementInfo> currentInfo = new AtomicReference();
    private final AtomicBoolean leader = new AtomicBoolean(false);

    void startup(@Observes StartupEvent startupEvent) {
        this.buildAndSetInstanceInfo();
        this.checkLeader = this.vertx.periodicStream(TimeUnit.SECONDS.toMillis(this.leaderCheckIntervalInSeconds)).handler(id -> this.tryBecomeLeader(this.currentInfo.get(), this.checkLeader, this.heartbeat).subscribe().with(i -> LOGGER.trace("Leader check completed"), ex -> LOGGER.error("Error checking Leader", ex))).pause();
        this.heartbeat = this.vertx.periodicStream(TimeUnit.SECONDS.toMillis(this.heardBeatIntervalInSeconds)).handler(t -> this.heartbeat(this.currentInfo.get()).subscribe().with(i -> LOGGER.trace("Heartbeat completed {}", (Object)this.currentInfo.get()), ex -> LOGGER.error("Error on heartbeat {}", (Object)this.currentInfo.get(), ex))).pause();
        this.tryBecomeLeader(this.currentInfo.get(), this.checkLeader, this.heartbeat).subscribe().with(i -> LOGGER.info("Initial leader check completed"), ex -> LOGGER.error("Error on initial check leader", ex));
    }

    private void disableCommunication() {
        this.messagingHandlerInstance.stream().forEach(MessagingHandler::pause);
        this.messagingChangeEventEvent.fire((Object)new MessagingChangeEvent(false));
        LOGGER.warn("Disabled communication not leader instance");
    }

    private void enableCommunication() {
        this.messagingHandlerInstance.stream().forEach(MessagingHandler::resume);
        this.messagingChangeEventEvent.fire((Object)new MessagingChangeEvent(true));
        LOGGER.info("Enabled communication for leader instance");
    }

    void onShutdown(@Observes ShutdownEvent event) {
        this.shutdown();
    }

    void onReleaseLeader(@Observes ReleaseLeaderEvent event) {
        this.shutdown();
    }

    private void shutdown() {
        this.release(this.currentInfo.get()).onItem().invoke(i -> this.checkLeader.cancel()).onItem().invoke(i -> this.heartbeat.cancel()).subscribe().with(i -> LOGGER.info("Shutting down leader instance check"), ex -> LOGGER.error("Shutdown error", ex));
    }

    protected boolean isLeader() {
        return this.leader.get();
    }

    protected Uni<JobServiceManagementInfo> tryBecomeLeader(JobServiceManagementInfo info, TimeoutStream checkLeader, TimeoutStream heartbeat) {
        LOGGER.debug("Try to become Leader");
        return this.repository.getAndUpdate(info.getId(), c -> {
            OffsetDateTime currentTime = DateUtil.now().toOffsetDateTime();
            if (Objects.isNull(c) || Objects.isNull(c.getToken()) || Objects.equals(c.getToken(), info.getToken()) || Objects.isNull(c.getLastHeartbeat()) || c.getLastHeartbeat().isBefore(currentTime.minusSeconds(this.heartbeatExpirationInSeconds))) {
                info.setLastHeartbeat(currentTime);
                LOGGER.info("SET Leader {}", (Object)info);
                this.leader.set(true);
                this.enableCommunication();
                heartbeat.resume();
                checkLeader.pause();
                return info;
            }
            if (this.isLeader()) {
                LOGGER.info("Not Leader");
                this.leader.set(false);
                this.disableCommunication();
            }
            heartbeat.pause();
            checkLeader.resume();
            return null;
        });
    }

    protected Uni<Void> release(JobServiceManagementInfo info) {
        return this.repository.set(new JobServiceManagementInfo(info.getId(), null, null)).onItem().invoke(this::disableCommunication).onItem().invoke(i -> this.leader.set(false)).onItem().invoke(i -> LOGGER.info("Leader instance released")).onFailure().invoke(ex -> LOGGER.error("Error releasing leader")).replaceWithVoid();
    }

    protected Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo info) {
        if (this.isLeader()) {
            return this.repository.heartbeat(info);
        }
        return Uni.createFrom().nullItem();
    }

    private void buildAndSetInstanceInfo() {
        this.currentInfo.set(new JobServiceManagementInfo(this.leaderManagementId, this.generateToken(), DateUtil.now().toOffsetDateTime()));
        LOGGER.info("Current Job Service Instance {}", (Object)this.currentInfo.get());
    }

    private String generateToken() {
        return UUID.randomUUID().toString();
    }

    protected JobServiceManagementInfo getCurrentInfo() {
        return this.currentInfo.get();
    }

    protected TimeoutStream getCheckLeader() {
        return this.checkLeader;
    }

    protected TimeoutStream getHeartbeat() {
        return this.heartbeat;
    }
}

