/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.task.Stream;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.VersionedTransactionData;
import io.pravega.controller.store.task.TxnResource;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.AbortEvent;
import io.pravega.shared.controller.event.CommitEvent;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TxnSweeper
implements FailoverSweeper {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(TxnSweeper.class);
    private final StreamMetadataStore streamMetadataStore;
    private final StreamTransactionMetadataTasks transactionMetadataTasks;
    private final long maxTxnTimeoutMillis;
    private final ScheduledExecutorService executor;

    public TxnSweeper(StreamMetadataStore streamMetadataStore, StreamTransactionMetadataTasks transactionMetadataTasks, long maxTxnTimeoutMillis, ScheduledExecutorService executor) {
        Preconditions.checkNotNull((Object)streamMetadataStore, (Object)"streamMetadataStore");
        Preconditions.checkNotNull((Object)transactionMetadataTasks, (Object)"transactionMetadataTasks");
        Preconditions.checkNotNull((Object)executor, (Object)"executor");
        Preconditions.checkArgument((maxTxnTimeoutMillis > 0L ? 1 : 0) != 0, (Object)"maxTxnTimeoutMillis should be a positive number");
        this.streamMetadataStore = streamMetadataStore;
        this.transactionMetadataTasks = transactionMetadataTasks;
        this.maxTxnTimeoutMillis = maxTxnTimeoutMillis;
        this.executor = executor;
    }

    public void awaitInitialization() throws InterruptedException {
        this.transactionMetadataTasks.awaitInitialization();
    }

    @Override
    public boolean isReady() {
        return this.transactionMetadataTasks.isReady();
    }

    @Override
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> activeHosts) {
        if (!this.transactionMetadataTasks.isReady()) {
            return Futures.failedFuture((Throwable)new IllegalStateException(this.getClass().getName() + " not yet ready"));
        }
        CompletableFuture hostsOwningTxns = RetryHelper.withRetriesAsync(this.streamMetadataStore::listHostsOwningTxn, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor);
        return hostsOwningTxns.thenComposeAsync(index -> {
            index.removeAll((Collection)activeHosts.get());
            log.info("Failed hosts {} have orphaned tasks", index);
            return Futures.allOf((Collection)index.stream().map(this::handleFailedProcess).collect(Collectors.toList()));
        }, (Executor)this.executor);
    }

    @Override
    public CompletableFuture<Void> handleFailedProcess(String failedHost) {
        if (!this.transactionMetadataTasks.isReady()) {
            return Futures.failedFuture((Throwable)new IllegalStateException(this.getClass().getName() + " not yet ready"));
        }
        log.info("Host={}, sweeping orphaned transactions", (Object)failedHost);
        CompletableFuture delay = Futures.delayedFuture((Duration)Duration.ofMillis(2L * this.maxTxnTimeoutMillis), (ScheduledExecutorService)this.executor);
        return delay.thenComposeAsync(x -> RetryHelper.withRetriesAsync(() -> this.sweepOrphanedTxnsWithoutDelay(failedHost), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
    }

    private CompletableFuture<Void> sweepOrphanedTxnsWithoutDelay(String failedHost) {
        CompletableFuture failOverTxns = Futures.doWhileLoop(() -> this.failOverTxns(failedHost), x -> x != null, (Executor)this.executor);
        return failOverTxns.whenCompleteAsync((v, e) -> {
            if (e != null) {
                log.warn("Host={}, Caught exception sweeping orphaned transactions", (Object)failedHost, e);
            } else {
                log.debug("Host={}, sweeping orphaned transactions complete", (Object)failedHost);
            }
        }, (Executor)this.executor);
    }

    private CompletableFuture<Result> failOverTxns(String failedHost) {
        return this.streamMetadataStore.getRandomTxnFromIndex(failedHost).thenComposeAsync(resourceOpt -> {
            if (resourceOpt.isPresent()) {
                TxnResource resource = (TxnResource)resourceOpt.get();
                return this.failOverTxn(failedHost, resource);
            }
            return this.streamMetadataStore.removeHostFromIndex(failedHost).thenApplyAsync(x -> null, (Executor)this.executor);
        }, (Executor)this.executor);
    }

    private CompletableFuture<Result> failOverTxn(String failedHost, TxnResource txn) {
        String scope = txn.getScope();
        String stream = txn.getStream();
        UUID txnId = txn.getTxnId();
        log.debug("Host = {}, processing transaction {}/{}/{}", new Object[]{failedHost, scope, stream, txnId});
        return ((CompletableFuture)((CompletableFuture)this.streamMetadataStore.getTransactionData(scope, stream, txnId, null, this.executor).handle((r, e) -> {
            if (e != null) {
                if (Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException) {
                    return VersionedTransactionData.EMPTY;
                }
                throw new CompletionException((Throwable)e);
            }
            return r;
        })).thenComposeAsync(txData -> {
            int epoch = txData.getEpoch();
            switch (txData.getStatus()) {
                case OPEN: {
                    return this.failOverOpenTxn(failedHost, txn).handleAsync((v, e) -> new Result(txn, v, (Throwable)e), (Executor)this.executor);
                }
                case ABORTING: {
                    return this.failOverAbortingTxn(failedHost, epoch, txn).handleAsync((v, e) -> new Result(txn, v, (Throwable)e), (Executor)this.executor);
                }
                case COMMITTING: {
                    return this.failOverCommittingTxn(failedHost, epoch, txn).handleAsync((v, e) -> new Result(txn, v, (Throwable)e), (Executor)this.executor);
                }
            }
            return this.streamMetadataStore.removeTxnFromIndex(failedHost, txn, true).thenApply(x -> new Result(txn, null, null));
        }, (Executor)this.executor)).whenComplete((v, e) -> log.debug("Host = {}, processing transaction {}/{}/{} complete", new Object[]{failedHost, scope, stream, txnId}));
    }

    private CompletableFuture<Void> failOverCommittingTxn(String failedHost, int epoch, TxnResource txn) {
        String scope = txn.getScope();
        String stream = txn.getStream();
        UUID txnId = txn.getTxnId();
        log.debug("Host = {}, failing over committing transaction {}/{}/{}", new Object[]{failedHost, scope, stream, txnId});
        return this.transactionMetadataTasks.writeCommitEvent(new CommitEvent(scope, stream, epoch)).thenComposeAsync(status -> this.streamMetadataStore.removeTxnFromIndex(failedHost, txn, true), (Executor)this.executor);
    }

    private CompletableFuture<Void> failOverAbortingTxn(String failedHost, int epoch, TxnResource txn) {
        String scope = txn.getScope();
        String stream = txn.getStream();
        UUID txnId = txn.getTxnId();
        log.debug("Host = {}, failing over aborting transaction {}/{}/{}", new Object[]{failedHost, scope, stream, txnId});
        return this.transactionMetadataTasks.writeAbortEvent(new AbortEvent(scope, stream, epoch, txnId)).thenComposeAsync(status -> this.streamMetadataStore.removeTxnFromIndex(failedHost, txn, true), (Executor)this.executor);
    }

    private CompletableFuture<Void> failOverOpenTxn(String failedHost, TxnResource txn) {
        String scope = txn.getScope();
        String stream = txn.getStream();
        UUID txnId = txn.getTxnId();
        log.debug("Host = {}, failing over open transaction {}/{}/{}", new Object[]{failedHost, scope, stream, txnId});
        return ((CompletableFuture)this.streamMetadataStore.getTransactionData(scope, stream, txnId, null, this.executor).thenCompose(txnData -> this.transactionMetadataTasks.pingTxn(scope, stream, txn.getTxnId(), Config.MAX_LEASE_VALUE, null))).thenComposeAsync(status -> this.streamMetadataStore.removeTxnFromIndex(failedHost, txn, true), (Executor)this.executor);
    }

    private static class Result {
        private final TxnResource txnResource;
        private final Object value;
        private final Throwable error;

        @ConstructorProperties(value={"txnResource", "value", "error"})
        @SuppressFBWarnings(justification="generated code")
        public Result(TxnResource txnResource, Object value, Throwable error) {
            this.txnResource = txnResource;
            this.value = value;
            this.error = error;
        }

        @SuppressFBWarnings(justification="generated code")
        public TxnResource getTxnResource() {
            return this.txnResource;
        }

        @SuppressFBWarnings(justification="generated code")
        public Object getValue() {
            return this.value;
        }

        @SuppressFBWarnings(justification="generated code")
        public Throwable getError() {
            return this.error;
        }

        @SuppressFBWarnings(justification="generated code")
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Result)) {
                return false;
            }
            Result other = (Result)o;
            if (!other.canEqual(this)) {
                return false;
            }
            TxnResource this$txnResource = this.getTxnResource();
            TxnResource other$txnResource = other.getTxnResource();
            if (this$txnResource == null ? other$txnResource != null : !((Object)this$txnResource).equals(other$txnResource)) {
                return false;
            }
            Object this$value = this.getValue();
            Object other$value = other.getValue();
            if (this$value == null ? other$value != null : !this$value.equals(other$value)) {
                return false;
            }
            Throwable this$error = this.getError();
            Throwable other$error = other.getError();
            return !(this$error == null ? other$error != null : !this$error.equals(other$error));
        }

        @SuppressFBWarnings(justification="generated code")
        protected boolean canEqual(Object other) {
            return other instanceof Result;
        }

        @SuppressFBWarnings(justification="generated code")
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            TxnResource $txnResource = this.getTxnResource();
            result = result * 59 + ($txnResource == null ? 43 : ((Object)$txnResource).hashCode());
            Object $value = this.getValue();
            result = result * 59 + ($value == null ? 43 : $value.hashCode());
            Throwable $error = this.getError();
            result = result * 59 + ($error == null ? 43 : $error.hashCode());
            return result;
        }

        @SuppressFBWarnings(justification="generated code")
        public String toString() {
            return "TxnSweeper.Result(txnResource=" + this.getTxnResource() + ", value=" + this.getValue() + ", error=" + this.getError() + ")";
        }
    }
}

