/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.server.eventProcessor.requesthandlers.AbstractRequestProcessor;
import io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitRequestHandler
extends AbstractRequestProcessor<CommitEvent>
implements StreamTask<CommitEvent> {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(CommitRequestHandler.class);
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private final BucketStore bucketStore;
    private final ScheduledExecutorService executor;
    private final BlockingQueue<CommitEvent> processedEvents;

    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, BucketStore bucketStore, ScheduledExecutorService executor) {
        this(streamMetadataStore, streamMetadataTasks, streamTransactionMetadataTasks, bucketStore, executor, null);
    }

    @VisibleForTesting
    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, BucketStore bucketStore, ScheduledExecutorService executor, BlockingQueue<CommitEvent> queue) {
        super(streamMetadataStore, executor);
        this.bucketStore = bucketStore;
        Preconditions.checkNotNull((Object)streamMetadataStore);
        Preconditions.checkNotNull((Object)streamMetadataTasks);
        Preconditions.checkNotNull((Object)executor);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamTransactionMetadataTasks = streamTransactionMetadataTasks;
        this.executor = executor;
        this.processedEvents = queue;
    }

    @Override
    public CompletableFuture<Void> processCommitTxnRequest(CommitEvent event) {
        return this.withCompletion(this, event, event.getScope(), event.getStream(), OPERATION_NOT_ALLOWED_PREDICATE);
    }

    @Override
    public CompletableFuture<Void> execute(CommitEvent event) {
        String scope = event.getScope();
        String stream = event.getStream();
        OperationContext context = this.streamMetadataStore.createContext(scope, stream);
        log.debug("Attempting to commit available transactions on stream {}/{}", (Object)event.getScope(), (Object)event.getStream());
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.tryCommitTransactions(scope, stream, context).whenComplete((r, e) -> {
            if (e != null) {
                Throwable cause = Exceptions.unwrap((Throwable)e);
                if (cause instanceof StoreException.OperationNotAllowedException) {
                    log.debug("Cannot commit transaction on stream {}/{}. Postponing", (Object)scope, (Object)stream);
                } else {
                    log.error("Exception while attempting to commit transaction on stream {}/{}", new Object[]{scope, stream, e});
                }
                future.completeExceptionally(cause);
            } else {
                if (r >= 0) {
                    log.info("Successfully committed transactions on epoch {} on stream {}/{}", new Object[]{r, scope, stream});
                } else {
                    log.info("No transactions found in committing state on stream {}/{}", new Object[]{r, scope, stream});
                }
                if (this.processedEvents != null) {
                    try {
                        this.processedEvents.offer(event);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                future.complete(null);
            }
        });
        return future;
    }

    private CompletableFuture<Integer> tryCommitTransactions(String scope, String stream, OperationContext context) {
        return this.streamMetadataStore.getVersionedState(scope, stream, context, this.executor).thenComposeAsync(state -> {
            AtomicReference<VersionedMetadata> stateRecord = new AtomicReference<VersionedMetadata>((VersionedMetadata)state);
            CompletionStage commitFuture = this.streamMetadataStore.startCommitTransactions(scope, stream, context, this.executor).thenComposeAsync(versionedMetadata -> {
                if (((CommittingTransactionsRecord)versionedMetadata.getObject()).equals(CommittingTransactionsRecord.EMPTY)) {
                    return CompletableFuture.completedFuture(versionedMetadata);
                }
                int txnEpoch = ((CommittingTransactionsRecord)versionedMetadata.getObject()).getEpoch();
                ImmutableList<UUID> txnList = ((CommittingTransactionsRecord)versionedMetadata.getObject()).getTransactionsToCommit();
                log.info("Committing {} transactions on epoch {} on stream {}/{}", new Object[]{txnList, txnEpoch, scope, stream});
                CompletionStage<Object> future = ((State)((Object)((Object)((Object)state.getObject())))).equals((Object)State.SEALING) ? CompletableFuture.completedFuture(null) : this.streamMetadataStore.updateVersionedState(scope, stream, State.COMMITTING_TXN, (VersionedMetadata<State>)state, context, this.executor).thenAccept(stateRecord::set);
                return future.thenCompose(v -> this.getEpochRecords(scope, stream, txnEpoch, context).thenCompose(records -> {
                    EpochRecord txnEpochRecord = (EpochRecord)records.get(0);
                    EpochRecord activeEpochRecord = (EpochRecord)records.get(1);
                    if (activeEpochRecord.getEpoch() == txnEpoch || activeEpochRecord.getReferenceEpoch() == txnEpochRecord.getReferenceEpoch()) {
                        return this.commitTransactions(scope, stream, (List<Long>)new ArrayList<Long>(activeEpochRecord.getSegmentIds()), (List<UUID>)txnList, context).thenApply(x -> versionedMetadata);
                    }
                    return this.rollTransactions(scope, stream, txnEpochRecord, activeEpochRecord, (VersionedMetadata<CommittingTransactionsRecord>)versionedMetadata, context);
                }));
            }, (Executor)this.executor);
            return ((CompletableFuture)commitFuture).thenCompose(versionedMetadata -> ((CompletableFuture)this.streamMetadataStore.completeCommitTransactions(scope, stream, (VersionedMetadata<CommittingTransactionsRecord>)versionedMetadata, context, this.executor).thenCompose(v -> this.resetStateConditionally(scope, stream, (VersionedMetadata)stateRecord.get(), context))).thenApply(v -> ((CommittingTransactionsRecord)versionedMetadata.getObject()).getEpoch()));
        }, (Executor)this.executor);
    }

    private CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> rollTransactions(String scope, String stream, EpochRecord txnEpoch, EpochRecord activeEpoch, VersionedMetadata<CommittingTransactionsRecord> existing, OperationContext context) {
        CompletionStage<VersionedMetadata<CommittingTransactionsRecord>> future = CompletableFuture.completedFuture(existing);
        if (!existing.getObject().isRollingTxnRecord()) {
            future = future.thenCompose(x -> this.streamMetadataStore.startRollingTxn(scope, stream, activeEpoch.getEpoch(), existing, context, this.executor));
        }
        return future.thenCompose(record -> {
            if (activeEpoch.getEpoch() > ((CommittingTransactionsRecord)record.getObject()).getCurrentEpoch()) {
                return CompletableFuture.completedFuture(record);
            }
            return this.runRollingTxn(scope, stream, txnEpoch, activeEpoch, (VersionedMetadata<CommittingTransactionsRecord>)record, context).thenApply(v -> record);
        });
    }

    private CompletableFuture<Void> runRollingTxn(String scope, String stream, EpochRecord txnEpoch, EpochRecord activeEpoch, VersionedMetadata<CommittingTransactionsRecord> existing, OperationContext context) {
        String delegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        long timestamp = System.currentTimeMillis();
        int newTxnEpoch = existing.getObject().getNewTxnEpoch();
        int newActiveEpoch = existing.getObject().getNewActiveEpoch();
        List<Long> txnEpochDuplicate = txnEpoch.getSegments().stream().map(segment -> StreamSegmentNameUtils.computeSegmentId((int)segment.getSegmentNumber(), (int)newTxnEpoch)).collect(Collectors.toList());
        ArrayList<Long> activeEpochSegmentIds = new ArrayList<Long>(activeEpoch.getSegmentIds());
        List activeEpochDuplicate = activeEpoch.getSegments().stream().map(segment -> StreamSegmentNameUtils.computeSegmentId((int)segment.getSegmentNumber(), (int)newActiveEpoch)).collect(Collectors.toList());
        ImmutableList<UUID> transactionsToCommit = existing.getObject().getTransactionsToCommit();
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.copyTxnEpochSegmentsAndCommitTxns(scope, stream, (List<UUID>)transactionsToCommit, txnEpochDuplicate, context).thenCompose(v -> this.streamMetadataTasks.notifyNewSegments(scope, stream, activeEpochDuplicate, context, delegationToken))).thenCompose(v -> this.streamMetadataTasks.getSealedSegmentsSize(scope, stream, txnEpochDuplicate, delegationToken))).thenCompose(sealedSegmentsMap -> {
            log.info("Rolling transaction, created duplicate of active epoch {} for stream {}/{}", new Object[]{activeEpoch, scope, stream});
            return this.streamMetadataStore.rollingTxnCreateDuplicateEpochs(scope, stream, (Map<Long, Long>)sealedSegmentsMap, timestamp, existing, context, this.executor);
        })).thenCompose(v -> ((CompletableFuture)this.streamMetadataTasks.notifySealedSegments(scope, stream, activeEpochSegmentIds, delegationToken).thenCompose(x -> this.streamMetadataTasks.getSealedSegmentsSize(scope, stream, activeEpochSegmentIds, delegationToken))).thenCompose(sealedSegmentsMap -> {
            log.info("Rolling transaction, sealed active epoch {} for stream {}/{}", new Object[]{activeEpoch, scope, stream});
            return this.streamMetadataStore.completeRollingTxn(scope, stream, (Map<Long, Long>)sealedSegmentsMap, existing, context, this.executor);
        }));
    }

    private CompletableFuture<Void> copyTxnEpochSegmentsAndCommitTxns(String scope, String stream, List<UUID> transactionsToCommit, List<Long> segmentIds, OperationContext context) {
        String delegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        CompletableFuture createSegmentsFuture = Futures.allOf((Collection)segmentIds.stream().map(segment -> this.streamMetadataTasks.notifyNewSegment(scope, stream, (long)segment, ScalingPolicy.fixed((int)1), delegationToken)).collect(Collectors.toList()));
        return ((CompletableFuture)createSegmentsFuture.thenCompose(v -> {
            log.info("Rolling transaction, successfully created duplicate txn epoch {} for stream {}/{}", new Object[]{segmentIds, scope, stream});
            return this.commitTransactions(scope, stream, segmentIds, transactionsToCommit, context);
        })).thenCompose(v -> this.streamMetadataTasks.notifySealedSegments(scope, stream, segmentIds, delegationToken));
    }

    private CompletableFuture<Void> commitTransactions(String scope, String stream, List<Long> segments, List<UUID> transactionsToCommit, OperationContext context) {
        CompletionStage<Object> future = CompletableFuture.completedFuture(null);
        for (UUID txnId : transactionsToCommit) {
            log.info("Committing transaction {} on stream {}/{}", new Object[]{txnId, scope, stream});
            future = ((CompletableFuture)((CompletableFuture)future.thenCompose(v -> this.streamMetadataTasks.notifyTxnCommit(scope, stream, segments, txnId))).thenCompose(v -> this.streamMetadataTasks.getCurrentSegmentSizes(scope, stream, segments))).thenCompose(map -> this.streamMetadataStore.recordCommitOffsets(scope, stream, txnId, (Map<Long, Long>)map, context, this.executor));
        }
        return future.thenCompose(v -> this.bucketStore.addStreamToBucketStore(BucketStore.ServiceType.WatermarkingService, scope, stream, this.executor));
    }

    private CompletableFuture<List<EpochRecord>> getEpochRecords(String scope, String stream, int epoch, OperationContext context) {
        ArrayList<CompletableFuture<EpochRecord>> list = new ArrayList<CompletableFuture<EpochRecord>>();
        list.add(this.streamMetadataStore.getEpoch(scope, stream, epoch, context, this.executor));
        list.add(this.streamMetadataStore.getActiveEpoch(scope, stream, context, true, this.executor));
        return Futures.allOfWithResults(list);
    }

    @Override
    public CompletableFuture<Void> writeBack(CommitEvent event) {
        return this.streamTransactionMetadataTasks.writeCommitEvent(event);
    }

    private CompletableFuture<Void> resetStateConditionally(String scope, String stream, VersionedMetadata<State> state, OperationContext context) {
        if (state.getObject().equals((Object)State.COMMITTING_TXN)) {
            return Futures.toVoid(this.streamMetadataStore.updateVersionedState(scope, stream, State.ACTIVE, state, context, this.executor));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Boolean> hasTaskStarted(CommitEvent event) {
        return this.streamMetadataStore.getState(event.getScope(), event.getStream(), true, null, this.executor).thenApply(state -> state.equals((Object)State.COMMITTING_TXN) || state.equals((Object)State.SEALING));
    }
}

