/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.repository.impl;

import io.vertx.core.Vertx;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobStreams;

public abstract class BaseReactiveJobRepository
implements ReactiveJobRepository {
    private Vertx vertx;
    private JobStreams jobStreams;

    public BaseReactiveJobRepository(Vertx vertx, JobStreams jobStreams) {
        this.vertx = vertx;
        this.jobStreams = jobStreams;
    }

    public <T> CompletionStage<T> runAsync(Supplier<T> function) {
        CompletableFuture future = new CompletableFuture();
        this.vertx.executeBlocking(v -> future.complete(function.get()), r -> {});
        return future;
    }

    @Override
    public PublisherBuilder<ScheduledJob> findByStatus(JobStatus ... status) {
        return this.findAll().filter(job -> Objects.nonNull((Object)job.getStatus())).filter(job -> Arrays.stream(status).anyMatch(job.getStatus()::equals));
    }

    @Override
    public CompletionStage<ScheduledJob> save(ScheduledJob job) {
        return this.doSave(job).thenApply(this.jobStreams::publishJobStatusChange);
    }

    public abstract CompletionStage<ScheduledJob> doSave(ScheduledJob var1);

    @Override
    public CompletionStage<ScheduledJob> delete(ScheduledJob job) {
        return this.delete(job.getId()).thenApply(j -> this.jobStreams.publishJobStatusChange(job));
    }
}

