/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.eventProcessor.impl.SerializedRequestHandler;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.controller.event.AbortEvent;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbortRequestHandler
extends SerializedRequestHandler<AbortEvent> {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(AbortRequestHandler.class);
    private final StreamMetadataStore streamMetadataStore;
    private final StreamMetadataTasks streamMetadataTasks;
    private final ScheduledExecutorService executor;
    private final BlockingQueue<AbortEvent> processedEvents;

    @VisibleForTesting
    public AbortRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, ScheduledExecutorService executor, BlockingQueue<AbortEvent> queue) {
        super(executor);
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.executor = executor;
        this.processedEvents = queue;
    }

    public AbortRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, ScheduledExecutorService executor) {
        super(executor);
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.executor = executor;
        this.processedEvents = null;
    }

    @Override
    public CompletableFuture<Void> processEvent(AbortEvent event) {
        String scope = event.getScope();
        String stream = event.getStream();
        int epoch = event.getEpoch();
        UUID txId = event.getTxid();
        OperationContext context = this.streamMetadataStore.createContext(scope, stream);
        log.debug("Aborting transaction {} on stream {}/{}", new Object[]{event.getTxid(), event.getScope(), event.getStream()});
        return Futures.toVoid((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.streamMetadataStore.getSegmentsInEpoch(event.getScope(), event.getStream(), epoch, context, this.executor).thenApply(segments -> segments.stream().map(StreamSegmentRecord::segmentId).collect(Collectors.toList()))).thenCompose(segments -> this.streamMetadataTasks.notifyTxnAbort(scope, stream, (List<Long>)segments, txId))).thenCompose(x -> this.streamMetadataStore.abortTransaction(scope, stream, txId, context, this.executor))).whenComplete((result, error) -> {
            if (error != null) {
                log.error("Failed aborting transaction {} on stream {}/{}", new Object[]{event.getTxid(), event.getScope(), event.getStream()});
            } else {
                log.debug("Successfully aborted transaction {} on stream {}/{}", new Object[]{event.getTxid(), event.getScope(), event.getStream()});
                if (this.processedEvents != null) {
                    this.processedEvents.offer(event);
                }
            }
        }));
    }
}

