/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.task.Stream;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestSweeper
implements FailoverSweeper {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(RequestSweeper.class);
    public static final int LIMIT = 100;
    private final StreamMetadataStore metadataStore;
    private final ScheduledExecutorService executor;
    private final StreamMetadataTasks streamMetadataTasks;
    private final int limit;

    public RequestSweeper(StreamMetadataStore metadataStore, ScheduledExecutorService executor, StreamMetadataTasks streamMetadataTasks) {
        this(metadataStore, executor, streamMetadataTasks, 100);
    }

    @VisibleForTesting
    RequestSweeper(StreamMetadataStore metadataStore, ScheduledExecutorService executor, StreamMetadataTasks streamMetadataTasks, int limit) {
        this.metadataStore = metadataStore;
        this.executor = executor;
        this.streamMetadataTasks = streamMetadataTasks;
        this.limit = limit;
    }

    @Override
    public boolean isReady() {
        return true;
    }

    @Override
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> runningProcesses) {
        return RetryHelper.withRetriesAsync(this.metadataStore::listHostsWithPendingTask, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor).thenComposeAsync(registeredHosts -> {
            log.info("Hosts {} have ongoing tasks", registeredHosts);
            registeredHosts.removeAll((Collection)RetryHelper.withRetries(runningProcesses, RetryHelper.UNCONDITIONAL_PREDICATE, Integer.MAX_VALUE));
            log.info("Failed hosts {} have orphaned tasks", registeredHosts);
            return Futures.allOf((Collection)registeredHosts.stream().map(this::handleFailedProcess).collect(Collectors.toList()));
        }, (Executor)this.executor);
    }

    @Override
    public CompletableFuture<Void> handleFailedProcess(String oldHostId) {
        log.info("Sweeping orphaned tasks for host {}", (Object)oldHostId);
        return RetryHelper.withRetriesAsync(() -> Futures.doWhileLoop(() -> this.postRequest(oldHostId), list -> !list.isEmpty(), (Executor)this.executor).whenCompleteAsync((result, ex) -> log.info("Sweeping orphaned tasks for host {} complete", (Object)oldHostId), (Executor)this.executor), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor);
    }

    @VisibleForTesting
    CompletableFuture<List<String>> postRequest(String oldHostId) {
        return this.metadataStore.getPendingsTaskForHost(oldHostId, this.limit).thenComposeAsync(tasks -> Futures.allOfWithResults(tasks.entrySet().stream().map(entry -> ((CompletableFuture)this.streamMetadataTasks.writeEvent((ControllerEvent)entry.getValue()).thenCompose(v -> this.metadataStore.removeTaskFromIndex(oldHostId, (String)entry.getKey()))).thenApply(v -> (String)entry.getKey())).collect(Collectors.toList())), (Executor)this.executor);
    }
}

