/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.TaskExceptions;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.SealStreamEvent;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;

public class SealStreamTask
implements StreamTask<SealStreamEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(SealStreamTask.class));
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;

    public SealStreamTask(StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService executor) {
        Preconditions.checkNotNull((Object)streamMetadataStore);
        Preconditions.checkNotNull((Object)streamMetadataTasks);
        Preconditions.checkNotNull((Object)streamTransactionMetadataTasks);
        Preconditions.checkNotNull((Object)executor);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamTransactionMetadataTasks = streamTransactionMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = executor;
    }

    @Override
    public CompletableFuture<Void> execute(SealStreamEvent request) {
        String scope = request.getScope();
        String stream = request.getStream();
        long requestId = request.getRequestId();
        OperationContext context = this.streamMetadataStore.createContext(scope, stream);
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.streamMetadataStore.getState(scope, stream, true, context, this.executor).thenAccept(state -> {
            if (!state.equals((Object)State.SEALING) && !state.equals((Object)State.SEALED)) {
                throw new TaskExceptions.StartException("Seal stream task not started yet.");
            }
        })).thenCompose(x -> this.abortTransaction(context, scope, stream, requestId).thenAccept(noTransactions -> {
            if (!noTransactions.booleanValue()) {
                log.debug(requestId, "Found open transactions on stream {}/{}. Postponing its sealing.", new Object[]{scope, stream});
                throw StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Found ongoing transactions. Abort transaction requested.Sealing stream segments should wait until transactions are aborted.");
            }
        }))).thenCompose(x -> this.streamMetadataStore.getActiveSegments(scope, stream, context, this.executor))).thenCompose(activeSegments -> {
            if (activeSegments.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            return this.notifySealed(scope, stream, context, (List<StreamSegmentRecord>)activeSegments, requestId);
        });
    }

    private CompletableFuture<Boolean> abortTransaction(OperationContext context, String scope, String stream, long requestId) {
        return this.streamMetadataStore.getActiveTxns(scope, stream, context, this.executor).thenCompose(activeTxns -> {
            if (activeTxns == null || activeTxns.isEmpty()) {
                return CompletableFuture.completedFuture(true);
            }
            return Futures.allOf((Collection)activeTxns.entrySet().stream().map(txIdPair -> {
                CompletableFuture voidCompletableFuture = ((ActiveTxnRecord)txIdPair.getValue()).getTxnStatus().equals((Object)TxnStatus.OPEN) ? Futures.toVoid((CompletableFuture)this.streamTransactionMetadataTasks.abortTxn(scope, stream, (UUID)txIdPair.getKey(), null, context).exceptionally(e -> {
                    Throwable cause = Exceptions.unwrap((Throwable)e);
                    if (cause instanceof StoreException.IllegalStateException || cause instanceof StoreException.WriteConflictException || cause instanceof StoreException.DataNotFoundException) {
                        log.debug(requestId, "A known exception thrown during seal stream while trying to abort transaction on stream {}/{}", new Object[]{scope, stream, cause});
                    } else {
                        log.warn(requestId, "Exception thrown during seal stream while trying to abort transaction on stream {}/{}", new Object[]{scope, stream, cause});
                    }
                    return null;
                })) : CompletableFuture.completedFuture(null);
                return voidCompletableFuture;
            }).collect(Collectors.toList())).thenApply(v -> false);
        });
    }

    private CompletionStage<Void> notifySealed(String scope, String stream, OperationContext context, List<StreamSegmentRecord> activeSegments, long requestId) {
        List<Long> segmentsToBeSealed = activeSegments.stream().map(StreamSegmentRecord::segmentId).collect(Collectors.toList());
        log.debug(requestId, "Sending notification to segment store to seal segments for stream {}/{}", new Object[]{scope, stream});
        return this.streamMetadataTasks.notifySealedSegments(scope, stream, segmentsToBeSealed, this.streamMetadataTasks.retrieveDelegationToken(), requestId).thenCompose(v -> this.setSealed(scope, stream, context));
    }

    private CompletableFuture<Void> setSealed(String scope, String stream, OperationContext context) {
        return this.streamMetadataStore.setSealed(scope, stream, context, this.executor);
    }

    @Override
    public CompletableFuture<Void> writeBack(SealStreamEvent event) {
        return this.streamMetadataTasks.writeEvent((ControllerEvent)event);
    }

    @Override
    public CompletableFuture<Boolean> hasTaskStarted(SealStreamEvent event) {
        return this.streamMetadataStore.getState(event.getScope(), event.getStream(), true, null, this.executor).thenApply(state -> state.equals((Object)State.SEALING));
    }
}

