/*
 * Decompiled with CFR 0.152.
 */
package com.skytix.velocity.scheduler;

import com.skytix.schedulerclient.BaseSchedulerEventHandler;
import com.skytix.velocity.entities.VelocityTask;
import com.skytix.velocity.repository.TaskRepository;
import com.skytix.velocity.scheduler.OfferSubscriber;
import com.skytix.velocity.scheduler.TaskUpdateEvent;
import com.skytix.velocity.scheduler.UpdateSubscriber;
import com.skytix.velocity.scheduler.VelocitySchedulerConfig;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.scheduler.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class VelocitySchedulerHandler
extends BaseSchedulerEventHandler {
    private static final Logger log = LoggerFactory.getLogger(VelocitySchedulerHandler.class);
    private final SubmissionPublisher<Protos.Offer> mOfferPublisher;
    private final SubmissionPublisher<Protos.Event.Update> mUpdatePublisher;
    private final SubmissionPublisher<TaskUpdateEvent> mTaskUpdatePublisher;
    private final TaskRepository<VelocityTask> mTaskRepository;
    private final MeterRegistry mMeterRegistry;
    private final VelocitySchedulerConfig mSchedulerConfig;
    private LocalDateTime mLastHeartbeat = null;
    private int mHeartbeatInterval = 0;

    public VelocitySchedulerHandler(TaskRepository<VelocityTask> aTaskRepository, MeterRegistry aMeterRegistry, VelocitySchedulerConfig aConfig, Executor aMainThreadPool, Executor aGeneralThreadPool) {
        this.mTaskRepository = aTaskRepository;
        this.mMeterRegistry = aMeterRegistry;
        this.mSchedulerConfig = aConfig;
        Integer maxOfferQueueSize = aConfig.getMaxOfferQueueSize();
        Integer maxUpdateQueueSize = aConfig.getMaxUpdateQueueSize();
        if (maxOfferQueueSize <= 0) {
            throw new IllegalArgumentException("maxOfferQueueSize must be greater than zero");
        }
        if (maxUpdateQueueSize <= 0) {
            throw new IllegalArgumentException("maxUpdateQueueSize must be create than zero");
        }
        this.mOfferPublisher = new SubmissionPublisher(aMainThreadPool, maxOfferQueueSize);
        this.mUpdatePublisher = new SubmissionPublisher(aMainThreadPool, maxUpdateQueueSize);
        this.mTaskUpdatePublisher = new SubmissionPublisher(aGeneralThreadPool, 1000);
    }

    public void onSubscribe(Protos.Event.Subscribed aSubscribeEvent) {
        this.mOfferPublisher.subscribe(new OfferSubscriber(this.mTaskRepository, () -> ((VelocitySchedulerHandler)this).getSchedulerRemote(), this.mMeterRegistry));
        this.mUpdatePublisher.subscribe(new UpdateSubscriber(this.mTaskRepository, this.mTaskUpdatePublisher, () -> ((VelocitySchedulerHandler)this).getSchedulerRemote(), this.mSchedulerConfig.getDefaultTaskEventHandler(), this.mMeterRegistry));
        this.getSchedulerRemote().reconcile(this.buildFromRunningTasks());
        this.mHeartbeatInterval = (int)aSubscribeEvent.getHeartbeatIntervalSeconds();
    }

    public void handleEvent(Protos.Event aEvent) {
        switch (aEvent.getType()) {
            case INVERSE_OFFERS: {
                Protos.Event.InverseOffers inverseOffers = aEvent.getInverseOffers();
                for (int i = 0; i < inverseOffers.getInverseOffersCount(); ++i) {
                    Protos.InverseOffer inverseOffer = inverseOffers.getInverseOffers(i);
                    this.getSchedulerRemote().decline(Collections.singletonList(inverseOffer.getId()));
                }
            }
            case OFFERS: {
                Protos.Event.Offers offers = aEvent.getOffers();
                for (int i = 0; i < offers.getOffersCount(); ++i) {
                    this.mOfferPublisher.offer(offers.getOffers(i), 2L, TimeUnit.SECONDS, (subscriber, offer) -> {
                        log.error(String.format("Timeout adding offer '%s' to queue.  Queue full.  Declining offer.", offer.getId().getValue()));
                        this.getSchedulerRemote().decline(Collections.singletonList(offer.getId()));
                        return false;
                    });
                }
                break;
            }
            case RESCIND: {
                this.handleRescind(aEvent.getRescind());
                break;
            }
            case UPDATE: {
                this.mUpdatePublisher.offer(aEvent.getUpdate(), 2L, TimeUnit.SECONDS, (subscriber, update) -> {
                    log.error(String.format("Timeout adding update for task '%s' to queue.  Queue full.  Retrying...", update.getStatus().getTaskId()));
                    return true;
                });
                break;
            }
            case HEARTBEAT: {
                this.mLastHeartbeat = LocalDateTime.now();
                if (this.mSchedulerConfig.getHeartbeatListener() == null) break;
                this.mSchedulerConfig.getHeartbeatListener().beat();
            }
        }
    }

    public LocalDateTime getLastHeartbeat() {
        return this.mLastHeartbeat;
    }

    public int getHeartbeatInterval() {
        return this.mHeartbeatInterval;
    }

    private void handleRescind(Protos.Event.Rescind aRescind) {
    }

    private List<Protos.Call.Reconcile.Task> buildFromRunningTasks() {
        List<VelocityTask> activeTasks = this.mTaskRepository.getActiveTasks();
        ArrayList<Protos.Call.Reconcile.Task> results = new ArrayList<Protos.Call.Reconcile.Task>(activeTasks.size());
        activeTasks.forEach(aVelocityTask -> results.add(Protos.Call.Reconcile.Task.newBuilder().setTaskId(aVelocityTask.getTaskInfo().getTaskId()).setAgentId(aVelocityTask.getTaskInfo().getAgentId()).build()));
        return results;
    }

    public void onTerminate(Exception aException) {
        this.onDisconnect();
    }

    public abstract void onHeartbeatFail();

    public void onDisconnect() {
        this.mOfferPublisher.close();
        this.mUpdatePublisher.close();
        this.mTaskUpdatePublisher.close();
    }
}

