/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.JobStatus;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
import org.hawkular.metrics.scheduler.api.RetryPolicy;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.metrics.scheduler.impl.JobDetailsImpl;
import org.hawkular.metrics.scheduler.impl.JobsService;
import org.hawkular.metrics.scheduler.impl.Lock;
import org.hawkular.metrics.scheduler.impl.LockManager;
import org.hawkular.metrics.scheduler.impl.UnregisteredJobException;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import org.joda.time.DateTime;
import org.joda.time.Minutes;
import rx.Completable;
import rx.Observable;
import rx.Scheduler;
import rx.Single;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class SchedulerImpl
implements org.hawkular.metrics.scheduler.api.Scheduler {
    private static Logger logger = Logger.getLogger(SchedulerImpl.class);
    private Map<String, Func1<JobDetails, Completable>> jobFactories;
    private Map<String, Func2<JobDetails, Throwable, RetryPolicy>> retryFunctions;
    private ScheduledExecutorService tickExecutor;
    private Scheduler tickScheduler;
    private ExecutorService queryExecutor;
    private Scheduler queryScheduler;
    private RxSession session;
    private PreparedStatement deleteScheduleJob;
    private PreparedStatement deleteScheduledJobs;
    private PreparedStatement findFinishedJobs;
    private PreparedStatement deleteFinishedJobs;
    private PreparedStatement updateJobToFinished;
    private PreparedStatement addActiveTimeSlice;
    private PreparedStatement findActiveTimeSlices;
    private PreparedStatement deleteActiveTimeSlice;
    private LockManager lockManager;
    private JobsService jobsService;
    private boolean running;
    private AtomicInteger ticks = new AtomicInteger();
    private static Func2<JobDetails, Throwable, RetryPolicy> NO_RETRY = (details, throwable) -> RetryPolicy.NONE;
    static final String QUEUE_LOCK_PREFIX = "org.hawkular.metrics.scheduler.queue.";
    static final String SCHEDULING_LOCK = "scheduling";
    static final String TIME_SLICE_EXECUTION_LOCK = "executing";
    static final String JOB_EXECUTION_LOCK = "locked";
    static final int SCHEDULING_LOCK_TIMEOUT_IN_SEC = 5;
    static final int JOB_EXECUTION_LOCK_TIMEOUT_IN_SEC = 1800;
    private Optional<PublishSubject<Date>> finishedTimeSlices;
    private Optional<PublishSubject<JobDetails>> jobFinished;
    private final Object lock = new Object();
    private String hostname;

    public SchedulerImpl(RxSession session, String hostname) {
        this(session, hostname, new JobsService(session));
    }

    public SchedulerImpl(RxSession session, String hostname, JobsService jobsService) {
        this.session = session;
        this.jobFactories = new HashMap<String, Func1<JobDetails, Completable>>();
        this.retryFunctions = new HashMap<String, Func2<JobDetails, Throwable, RetryPolicy>>();
        this.tickExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("ticker-pool-%d").build(), new ThreadPoolExecutor.DiscardPolicy());
        this.tickScheduler = Schedulers.from(this.tickExecutor);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("query-thread-pool-%d").build();
        this.queryExecutor = new ThreadPoolExecutor(this.getQueryThreadPoolSize(), this.getQueryThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.DiscardPolicy());
        this.queryScheduler = Schedulers.from(this.queryExecutor);
        this.lockManager = new LockManager(session);
        this.jobsService = jobsService;
        this.deleteScheduledJobs = this.initQuery("DELETE FROM scheduled_jobs_idx WHERE time_slice = ?");
        this.findFinishedJobs = this.initQuery("SELECT job_id FROM finished_jobs_idx WHERE time_slice = ?");
        this.deleteFinishedJobs = this.initQuery("DELETE FROM finished_jobs_idx WHERE time_slice = ?");
        this.updateJobToFinished = this.initQuery("INSERT INTO finished_jobs_idx (time_slice, job_id) VALUES (?, ?)");
        this.addActiveTimeSlice = this.initQuery("INSERT INTO active_time_slices (time_slice) VALUES (?)");
        this.findActiveTimeSlices = this.initQuery("SELECT DISTINCT time_slice FROM active_time_slices");
        this.deleteActiveTimeSlice = this.initQuery("DELETE FROM active_time_slices WHERE time_slice = ?");
        this.finishedTimeSlices = Optional.empty();
        this.jobFinished = Optional.empty();
        this.hostname = hostname;
    }

    private PreparedStatement initQuery(String cql) {
        return this.session.getSession().prepare(cql).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
    }

    private int getQueryThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors() / 2, 1);
    }

    public void setTickScheduler(Scheduler scheduler) {
        this.tickScheduler = scheduler;
    }

    public void setTimeSlicesSubject(PublishSubject<Date> timeSlicesSubject) {
        this.finishedTimeSlices = Optional.of(timeSlicesSubject);
    }

    public void setJobFinishedSubject(PublishSubject<JobDetails> subject) {
        this.jobFinished = Optional.of(subject);
    }

    @Override
    public void register(String jobType, Func1<JobDetails, Completable> jobProducer) {
        this.jobFactories.put(jobType, jobProducer);
    }

    @Override
    public void register(String jobType, Func1<JobDetails, Completable> jobProducer, Func2<JobDetails, Throwable, RetryPolicy> retryFunction) {
        this.jobFactories.put(jobType, jobProducer);
        this.retryFunctions.put(jobType, retryFunction);
    }

    @Override
    public Single<? extends JobDetails> scheduleJob(String type, String name, Map<String, String> parameters, Trigger trigger) {
        if (DateTimeService.now.get().getMillis() >= trigger.getTriggerTime()) {
            return Single.error(new RuntimeException("Trigger time has already passed"));
        }
        String lockName = QUEUE_LOCK_PREFIX + trigger.getTriggerTime();
        return this.lockManager.acquireLock(lockName, SCHEDULING_LOCK, 5, false).map(lock -> {
            if (lock.isLocked()) {
                UUID jobId = UUID.randomUUID();
                return this.jobsService.createJobDetails(jobId, type, name, parameters, trigger, new Date(trigger.getTriggerTime()));
            }
            throw new RuntimeException("Failed to acquire scheduling lock [" + lockName + "]");
        }).flatMap(details -> this.jobsService.insert(new Date(trigger.getTriggerTime()), (JobDetails)details).map(resultSet -> details)).toSingle().doOnError(t -> logger.warn((Object)("Failed to schedule job " + name), (Throwable)t));
    }

    @Override
    public Completable unscheduleJobById(String jobId) {
        return this.jobsService.deleteJob(UUID.fromString(jobId), this.queryScheduler);
    }

    @Override
    public Completable unscheduleJobByTypeAndName(String jobType, String jobName) {
        return Completable.merge(this.jobsService.findAllScheduledJobs(this.queryScheduler).filter(details -> details.getJobType().equals(jobType) && details.getJobName().equals(jobName)).map(details -> this.jobsService.deleteJob(details.getJobId(), this.queryScheduler)));
    }

    @Override
    public void start() {
        this.running = true;
        ConcurrentSkipListSet activeTimeSlices = new ConcurrentSkipListSet();
        ConcurrentSkipListSet activeJobs = new ConcurrentSkipListSet();
        this.doOnTick(() -> {
            logger.debugf("Activating scheduler for [%s]", (Object)DateTimeService.currentMinute().toDate());
            Observable.just(DateTimeService.currentMinute().toDate()).concatMap(time -> this.jobsService.findActiveTimeSlices((Date)time, this.queryScheduler)).filter(d -> {
                Object object = this.lock;
                synchronized (object) {
                    if (!activeTimeSlices.contains(d)) {
                        activeTimeSlices.add(d);
                        return true;
                    }
                    return false;
                }
            }).concatMap(this::acquireTimeSliceLock).concatMap(timeSliceLock -> this.findScheduledJobs(timeSliceLock.getTimeSlice()).doOnError(t -> logger.warnf("Failed to find schedule jobs for time slice %s", (Object)((TimeSliceLock)timeSliceLock).timeSlice)).doOnNext(jobs -> logger.debugf("[%s] scheduled jobs: %s", (Object)((TimeSliceLock)timeSliceLock).timeSlice, jobs)).flatMap(scheduledJobs -> this.computeRemainingJobs((Set<JobDetailsImpl>)scheduledJobs, timeSliceLock.getTimeSlice(), activeJobs)).doOnNext(jobs -> logger.debugf("[%s] remaining jobs: %s", (Object)((TimeSliceLock)timeSliceLock).timeSlice, jobs)).flatMap(Observable::from).filter(jobDetails -> !activeJobs.contains(jobDetails.getJobId())).flatMap(this::acquireJobLock).filter(jobLock -> jobLock.acquired).map(jobLock -> jobLock.jobDetails).doOnNext(details -> {
                logger.debugf("Acquired job lock for %s in time slice %s", details, (Object)((TimeSliceLock)timeSliceLock).timeSlice);
                activeJobs.add(details.getJobId());
            }).observeOn(Schedulers.io()).flatMap(details -> this.executeJob((JobDetailsImpl)details, ((TimeSliceLock)timeSliceLock).timeSlice, activeJobs).doOnTerminate(() -> activeJobs.remove(details.getJobId())).toObservable().map(o -> timeSliceLock.getTimeSlice())).defaultIfEmpty(timeSliceLock.getTimeSlice())).observeOn(this.queryScheduler).concatMap(time -> {
                Observable<HashSet> scheduled = this.jobsService.findScheduledJobsForTime((Date)time, this.queryScheduler).map(JobDetails::getJobId).collect(HashSet::new, HashSet::add);
                Observable<? extends Set<UUID>> finished = this.findFinishedJobs((Date)time);
                return Observable.sequenceEqual(scheduled, finished).flatMap(allFinished -> {
                    if (allFinished.booleanValue()) {
                        logger.debugf("All jobs for time slice [%s] have finished", time);
                        return this.deleteFinishedJobs((Date)time).mergeWith(this.deleteScheduledJobs((Date)time)).toObservable().reduce(null, (o1, o2) -> o2).map(o -> time);
                    }
                    return Observable.just(time);
                });
            }).subscribe(d -> {
                logger.debugf("Finished post job execution clean up for [%s]", d);
                activeTimeSlices.remove(d);
                this.finishedTimeSlices.ifPresent(subject -> subject.onNext(d));
            }, t -> {
                logger.warn((Object)"Job execution failed", (Throwable)t);
                Object object = this.lock;
                synchronized (object) {
                    activeTimeSlices.clear();
                }
            }, () -> logger.debug("Done!"));
        });
    }

    private Observable<TimeSliceLock> acquireTimeSliceLock(Date timeSlice) {
        String lockName = QUEUE_LOCK_PREFIX + timeSlice.getTime();
        int delay = 5;
        Observable observable = Observable.create(subscriber -> this.lockManager.acquireLock(lockName, TIME_SLICE_EXECUTION_LOCK, 1800, false).map(lock -> {
            if (!lock.isLocked()) {
                logger.debugf("Failed to acquire time slice lock for [%s]. Will attempt to acquire it again in %d seconds", (Object)timeSlice, (Object)delay);
                throw new RuntimeException();
            }
            return new TimeSliceLock(timeSlice, lockName, lock.isLocked());
        }).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted));
        return observable.retryWhen(errors -> errors.flatMap(e -> Observable.timer((long)delay, TimeUnit.SECONDS, this.queryScheduler)));
    }

    private Observable<JobLock> acquireJobLock(JobDetailsImpl jobDetails) {
        String jobLock = "org.hawkular.metrics.scheduler.job." + jobDetails.getJobId();
        int timeout = this.calculateTimeout(jobDetails.getTrigger());
        return this.lockManager.acquireLock(jobLock, this.hostname, timeout, true).map(lock -> new JobLock(jobDetails, (Lock)lock)).doOnNext(lock -> logger.debugf("Acquired lock for %s? %s", (Object)jobDetails.getJobName(), (Object)lock.acquired));
    }

    private int calculateTimeout(Trigger trigger) {
        if (trigger instanceof RepeatingTrigger) {
            int interval = (int)(((RepeatingTrigger)trigger).getInterval() / 1000L);
            return (int)((double)interval + (double)interval * 0.25);
        }
        return 1800;
    }

    private Completable executeJob(final JobDetailsImpl details, Date timeSlice, Set<UUID> activeJobs) {
        Completable job;
        logger.debugf("Starting execution for %s in time slice [%s]", (Object)details, (Object)timeSlice);
        Stopwatch stopwatch = Stopwatch.createStarted();
        Func1<JobDetails, Completable> factory = this.jobFactories.get(details.getJobType());
        if (factory == null) {
            job = Completable.error(new UnregisteredJobException(details, timeSlice));
        } else if (details.getStatus() == JobStatus.FINISHED) {
            Observable<Completable> observable = this.jobsService.findScheduledExecutions(details.getJobId(), this.queryScheduler).filter(scheduledExecution -> scheduledExecution.getJobDetails().getStatus() == JobStatus.NONE && scheduledExecution.getJobDetails().getTrigger().getTriggerTime() > details.getTrigger().getTriggerTime()).isEmpty().map(isEmpty -> {
                if (isEmpty.booleanValue()) {
                    return this.doPostJobExecution(Completable.complete(), details, timeSlice, activeJobs);
                }
                return this.doPostJobExecutionWithoutRescheduling(Completable.complete(), details, timeSlice, activeJobs);
            });
            job = Completable.merge(observable);
        } else {
            this.jobsService.prepareJobDetailsForExecution(details, timeSlice);
            job = factory.call(details).onErrorResumeNext(t -> {
                logger.infof((Throwable)t, "Execution of %s in time slice [%s] failed", (Object)details, (Object)timeSlice);
                final RetryPolicy retryPolicy = this.retryFunctions.getOrDefault(details.getJobType(), NO_RETRY).call(details, (Throwable)t);
                if (retryPolicy == RetryPolicy.NONE) {
                    return Completable.complete();
                }
                if (details.getTrigger().nextTrigger() != null) {
                    logger.warnf("Retry policies cannot be used with jobs that repeat. %s will execute again according to its next trigger", (Object)details);
                    return Completable.complete();
                }
                if (retryPolicy == RetryPolicy.NOW) {
                    return (Completable)factory.call(details);
                }
                Trigger newTrigger = new Trigger(){

                    @Override
                    public long getTriggerTime() {
                        return details.getTrigger().getTriggerTime();
                    }

                    @Override
                    public Trigger nextTrigger() {
                        return new SingleExecutionTrigger.Builder().withDelay(retryPolicy.getDelay(), TimeUnit.MILLISECONDS).build();
                    }
                };
                JobDetailsImpl newDetails = new JobDetailsImpl(details, newTrigger);
                return this.reschedule(new JobExecutionState(newDetails, activeJobs)).toCompletable();
            }).doOnTerminate(() -> this.jobsService.resetJobDetails(details)).doOnCompleted(() -> {
                stopwatch.stop();
                if (logger.isDebugEnabled()) {
                    logger.debugf("Finished executing %s in time slice [%s] in %s ms", (Object)details, (Object)timeSlice, (Object)stopwatch.elapsed(TimeUnit.MILLISECONDS));
                }
            });
            job = this.doPostJobExecution(job, details, timeSlice, activeJobs);
        }
        return job.subscribeOn(Schedulers.io());
    }

    private Completable doPostJobExecution(Completable job, JobDetailsImpl jobDetails, Date timeSlice, Set<UUID> activeJobs) {
        return job.toSingle(() -> new JobExecutionState(jobDetails, timeSlice, null, null, activeJobs)).flatMap(state -> this.jobsService.updateStatusToFinished(timeSlice, state.currentDetails.getJobId()).toSingle().map(resultSet -> state)).flatMap(this::reschedule).flatMap(state -> this.releaseJobExecutionLock((JobExecutionState)state).flatMap(this::setJobFinished)).doOnError(t -> {
            logger.debug((Object)"There was an error during post-job execution", (Throwable)t);
            this.publishJobFinished(jobDetails);
        }).doOnSuccess(states -> this.publishJobFinished(states.currentDetails)).toCompletable();
    }

    private Completable doPostJobExecutionWithoutRescheduling(Completable job, JobDetailsImpl jobDetails, Date timeSlice, Set<UUID> activeJobs) {
        return job.toSingle(() -> new JobExecutionState(jobDetails, activeJobs)).flatMap(this::releaseJobExecutionLock).flatMap(this::setJobFinished).doOnError(t -> {
            logger.debug((Object)"There was an error during post-job execution, but the job has already been rescheduled.", (Throwable)t);
            this.publishJobFinished(jobDetails);
        }).doOnSuccess(states -> this.publishJobFinished(states.currentDetails)).toCompletable();
    }

    private void publishJobFinished(JobDetails details) {
        this.jobFinished.ifPresent(subject -> subject.onNext(details));
    }

    private Single<JobExecutionState> setJobFinished(JobExecutionState state) {
        return this.session.execute(this.updateJobToFinished.bind(state.timeSlice, state.currentDetails.getJobId()), this.queryScheduler).toSingle().map(resultSet -> state).doOnError(t -> logger.warnf((Throwable)t, "There was an error while updated the finished jobs index for %s", (Object)state.currentDetails));
    }

    private Single<JobExecutionState> reschedule(JobExecutionState executionState) {
        Trigger nextTrigger = executionState.currentDetails.getTrigger().nextTrigger();
        if (nextTrigger == null) {
            logger.debugf("No more scheduled executions for %s", (Object)executionState.currentDetails);
            return Single.just(executionState);
        }
        JobDetailsImpl details = executionState.currentDetails;
        JobDetailsImpl newDetails = new JobDetailsImpl(details, nextTrigger);
        if (nextTrigger.getTriggerTime() <= DateTimeService.now.get().getMillis()) {
            logger.infof("%s missed its next execution at %d. It will be rescheduled for immediate execution.", (Object)details, (Object)nextTrigger.getTriggerTime());
            AtomicLong nextTimeSlice = new AtomicLong(DateTimeService.currentMinute().getMillis());
            Observable<Lock> scheduled = Observable.defer(() -> this.lockManager.acquireLock(QUEUE_LOCK_PREFIX + nextTimeSlice.addAndGet(60000L), SCHEDULING_LOCK, 5, false)).map(lock -> {
                if (!lock.isLocked()) {
                    throw new RuntimeException();
                }
                return lock;
            }).retry();
            return scheduled.map(lock -> new JobExecutionState(executionState.currentDetails, executionState.timeSlice, newDetails, new Date(nextTimeSlice.get()), executionState.activeJobs)).flatMap(state -> this.jobsService.insert(state.nextTimeSlice, state.nextDetails).map(updated -> state)).doOnNext(state -> logger.debugf("Rescheduled %s to execute in time slice %s with trigger time of %s", (Object)state.nextDetails.getJobName(), (Object)state.nextTimeSlice, (Object)new Date(state.nextDetails.getTrigger().getTriggerTime()))).toSingle();
        }
        logger.debugf("Scheduling %s for next execution at %s", (Object)newDetails, (Object)new Date(nextTrigger.getTriggerTime()));
        JobExecutionState newState = new JobExecutionState(details, executionState.timeSlice, newDetails, new Date(nextTrigger.getTriggerTime()), executionState.activeJobs);
        return this.jobsService.insert(newState.nextTimeSlice, newState.nextDetails).map(updated -> newState).toSingle();
    }

    private Single<JobExecutionState> releaseJobExecutionLock(JobExecutionState state) {
        String jobLock = "org.hawkular.metrics.scheduler.job." + state.currentDetails.getJobId();
        return this.lockManager.releaseLock(jobLock, this.hostname).map(released -> {
            if (!released.booleanValue()) {
                logger.warnf("Failed to release job lock for %s", (Object)state.currentDetails);
            }
            return state;
        }).toSingle().doOnError(t -> logger.warnf((Throwable)t, "There was an error trying to release job lock [%s] for %s", (Object)jobLock, (Object)state.currentDetails));
    }

    private Completable deleteScheduledJobs(Date timeSlice) {
        return this.session.execute(this.deleteScheduledJobs.bind(timeSlice), this.queryScheduler).doOnCompleted(() -> logger.debugf("Deleted scheduled jobs time slice [%s]", (Object)timeSlice)).toCompletable();
    }

    private Completable deleteFinishedJobs(Date timeSlice) {
        return this.session.execute(this.deleteFinishedJobs.bind(timeSlice), this.queryScheduler).doOnCompleted(() -> logger.debugf("Deleted finished jobs time slice [%s]", (Object)timeSlice)).toCompletable();
    }

    private Completable deleteActiveTimeSlice(Date timeSlice) {
        return this.session.execute(this.deleteActiveTimeSlice.bind(timeSlice), this.queryScheduler).toCompletable();
    }

    @Override
    public void shutdown() {
        try {
            this.running = false;
            this.tickExecutor.shutdown();
            this.tickExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            this.queryExecutor.shutdown();
            this.queryExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            this.lockManager.shutdown();
            logger.info("Shutdown complete");
        }
        catch (InterruptedException e) {
            logger.warn((Object)"Interrupted during shutdown", e);
        }
    }

    private Observable<? extends Set<JobDetailsImpl>> findScheduledJobs(Date timeSlice) {
        logger.debugf("Fetching scheduled jobs for [%s]", (Object)timeSlice);
        return this.jobsService.findScheduledJobsForTime(timeSlice, this.queryScheduler).collect(HashSet::new, HashSet::add);
    }

    private Observable<Set<JobDetailsImpl>> computeRemainingJobs(Set<JobDetailsImpl> scheduledJobs, Date timeSlice, Set<UUID> activeJobs) {
        Observable<? extends Set<UUID>> finished = this.findFinishedJobs(timeSlice);
        return finished.map(finishedJobs -> {
            HashSet active = new HashSet(activeJobs);
            active.removeAll((Collection<?>)finishedJobs);
            Set jobs = scheduledJobs.stream().filter(details -> !finishedJobs.contains(details.getJobId())).collect(Collectors.toSet());
            return jobs.stream().filter(details -> !active.contains(details.getJobId())).collect(Collectors.toSet());
        });
    }

    private Observable<? extends Set<UUID>> findFinishedJobs(Date timeSlice) {
        return this.session.execute(this.findFinishedJobs.bind(timeSlice), this.queryScheduler).flatMap(Observable::from).map(row -> row.getUUID(0)).collect(HashSet::new, HashSet::add);
    }

    private void doOnTick(Action0 action) {
        Action0 wrapper = () -> {
            Date timeSlice = DateTimeService.getTimeSlice(new DateTime(this.tickScheduler.now()), Minutes.minutes(1).toStandardDuration()).toDate();
            action.call();
        };
        AtomicReference previousTimeSliceRef = new AtomicReference();
        Observable.interval(0L, 1L, TimeUnit.MINUTES, this.tickScheduler).filter(tick -> {
            DateTime time = DateTimeService.currentMinute();
            if (previousTimeSliceRef.get() == null) {
                previousTimeSliceRef.set(time);
                return true;
            }
            if (((DateTime)previousTimeSliceRef.get()).equals(time)) {
                return false;
            }
            previousTimeSliceRef.set(time);
            return true;
        }).takeUntil(d -> !this.running).subscribe(tick -> wrapper.call(), t -> logger.warn(t));
    }

    private Observable<Void> updateActiveTimeSlices(Date timeSlice) {
        return this.session.execute(this.addActiveTimeSlice.bind(timeSlice)).map(resultSet -> null);
    }

    private Observable<Date> findTimeSlices() {
        return this.session.execute(this.findActiveTimeSlices.bind(), this.queryScheduler).flatMap(Observable::from).map(row -> row.getTimestamp(0)).toSortedList().flatMap(Observable::from);
    }

    private static class JobExecutionState {
        final JobDetailsImpl currentDetails;
        final JobDetailsImpl nextDetails;
        final Set<UUID> activeJobs;
        final Date timeSlice;
        final Date nextTimeSlice;

        public JobExecutionState(JobDetailsImpl details, Set<UUID> activeJobs) {
            this.currentDetails = details;
            this.activeJobs = activeJobs;
            this.timeSlice = new Date(details.getTrigger().getTriggerTime());
            this.nextDetails = null;
            this.nextTimeSlice = null;
        }

        public JobExecutionState(JobDetailsImpl details, Date timeSlice, JobDetailsImpl nextDetails, Date nextTimeSlice, Set<UUID> activeJobs) {
            this.currentDetails = details;
            this.timeSlice = timeSlice;
            this.nextDetails = nextDetails;
            this.nextTimeSlice = nextTimeSlice;
            this.activeJobs = activeJobs;
        }

        public boolean isRepeating() {
            return this.nextDetails != null && this.nextTimeSlice != null;
        }

        public boolean isBehindSchedule() {
            return this.isRepeating() && this.nextTimeSlice != null && this.nextTimeSlice.getTime() > this.nextDetails.getTrigger().getTriggerTime();
        }
    }

    private static class JobLock {
        final JobDetailsImpl jobDetails;
        final boolean acquired;
        final String name;

        public JobLock(JobDetailsImpl jobDetails, Lock lock) {
            this.jobDetails = jobDetails;
            this.acquired = lock.isLocked();
            this.name = "org.hawkular.metrics.scheduler.job." + jobDetails.getJobId();
        }
    }

    private static class TimeSliceLock {
        private Date timeSlice;
        private String name;
        private boolean acquired;

        public TimeSliceLock(Date timeSlice, String name, boolean acquired) {
            this.timeSlice = timeSlice;
            this.name = name;
            this.acquired = acquired;
        }

        public Date getTimeSlice() {
            return this.timeSlice;
        }

        public String getName() {
            return this.name;
        }

        public boolean isAcquired() {
            return this.acquired;
        }
    }
}

