/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.segmentstore.server.logs;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.AbstractThreadPoolService;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.function.Callbacks;
import io.pravega.common.util.BlockingDrainingQueue;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.SegmentStoreMetrics;
import io.pravega.segmentstore.server.UpdateableContainerMetadata;
import io.pravega.segmentstore.server.logs.DataFrameBuilder;
import io.pravega.segmentstore.server.logs.MemoryStateUpdater;
import io.pravega.segmentstore.server.logs.MetadataCheckpointPolicy;
import io.pravega.segmentstore.server.logs.OperationMetadataUpdater;
import io.pravega.segmentstore.server.logs.Throttler;
import io.pravega.segmentstore.server.logs.ThrottlerCalculator;
import io.pravega.segmentstore.server.logs.operations.CompletableOperation;
import io.pravega.segmentstore.server.logs.operations.Operation;
import io.pravega.segmentstore.server.logs.operations.OperationSerializer;
import io.pravega.segmentstore.storage.DataLogWriterNotPrimaryException;
import io.pravega.segmentstore.storage.DurableDataLog;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class OperationProcessor
extends AbstractThreadPoolService
implements AutoCloseable {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(OperationProcessor.class);
    private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(10L);
    private static final int MAX_READ_AT_ONCE = 1000;
    private static final int MAX_COMMIT_QUEUE_SIZE = 50;
    private final UpdateableContainerMetadata metadata;
    private final MemoryStateUpdater stateUpdater;
    @GuardedBy(value="stateLock")
    private final OperationMetadataUpdater metadataUpdater;
    private final BlockingDrainingQueue<CompletableOperation> operationQueue;
    private final BlockingDrainingQueue<List<CompletableOperation>> commitQueue;
    private final Object stateLock = new Object();
    private final QueueProcessingState state;
    @GuardedBy(value="stateLock")
    private final DataFrameBuilder<Operation> dataFrameBuilder;
    private final SegmentStoreMetrics.OperationProcessor metrics;
    private final Throttler throttler;

    OperationProcessor(UpdateableContainerMetadata metadata, MemoryStateUpdater stateUpdater, DurableDataLog durableDataLog, MetadataCheckpointPolicy checkpointPolicy, ScheduledExecutorService executor) {
        super(String.format("OperationProcessor[%d]", metadata.getContainerId()), executor);
        Preconditions.checkNotNull((Object)durableDataLog, (Object)"durableDataLog");
        this.metadata = metadata;
        this.stateUpdater = (MemoryStateUpdater)Preconditions.checkNotNull((Object)stateUpdater, (Object)"stateUpdater");
        this.metadataUpdater = new OperationMetadataUpdater(this.metadata);
        this.operationQueue = new BlockingDrainingQueue();
        this.commitQueue = new BlockingDrainingQueue();
        this.state = new QueueProcessingState(checkpointPolicy);
        DataFrameBuilder.Args args = new DataFrameBuilder.Args(this.state::frameSealed, this.state::commit, this.state::fail, this.executor);
        this.dataFrameBuilder = new DataFrameBuilder<Operation>(durableDataLog, OperationSerializer.DEFAULT, args);
        this.metrics = new SegmentStoreMetrics.OperationProcessor(this.metadata.getContainerId());
        ThrottlerCalculator throttlerCalculator = ThrottlerCalculator.builder().cacheThrottler(stateUpdater::getCacheUtilization, stateUpdater.getCacheTargetUtilization(), stateUpdater.getCacheMaxUtilization()).commitBacklogThrottler(() -> this.commitQueue.size()).batchingThrottler(() -> ((DurableDataLog)durableDataLog).getQueueStatistics()).build();
        this.throttler = new Throttler(this.metadata.getContainerId(), throttlerCalculator, executor, this.metrics);
        this.stateUpdater.registerCleanupListener(this.throttler);
    }

    protected Duration getShutdownTimeout() {
        return SHUTDOWN_TIMEOUT;
    }

    protected CompletableFuture<Void> doRun() {
        CompletableFuture queueProcessor = Futures.loop(() -> ((OperationProcessor)this).isRunning(), () -> ((CompletableFuture)this.throttler.throttle().thenComposeAsync(v -> this.operationQueue.take(1000), (Executor)this.executor)).thenAcceptAsync(this::processOperations, (Executor)this.executor), (Executor)this.executor);
        CompletionStage commitProcessor = Futures.loop(() -> this.isRunning() || this.commitQueue.size() > 0, () -> this.commitQueue.take(50).thenAcceptAsync(this::processCommits, (Executor)this.executor), (Executor)this.executor).whenComplete((r, ex) -> {
            this.commitQueue.close();
            if (ex != null) {
                throw new CompletionException((Throwable)ex);
            }
        });
        return CompletableFuture.allOf(new CompletableFuture[]{queueProcessor, commitProcessor}).exceptionally(this::iterationErrorHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doStop() {
        CancellationException ex = new CancellationException("OperationProcessor is shutting down.");
        this.closeQueue(ex);
        Object object = this.stateLock;
        synchronized (object) {
            this.dataFrameBuilder.close();
        }
        this.state.fail(ex, null);
        this.throttler.close();
        this.metrics.close();
        super.doStop();
    }

    protected void errorHandler(Throwable ex) {
        ex = Exceptions.unwrap((Throwable)ex);
        this.closeQueue(ex);
        if (!this.isShutdownException(ex)) {
            super.errorHandler(ex);
            this.stopAsync();
        }
    }

    private Void iterationErrorHandler(Throwable ex) {
        boolean isExpected;
        ex = Exceptions.unwrap((Throwable)ex);
        Service.State s = this.state();
        boolean bl = isExpected = this.isShutdownException(ex) && (s == Service.State.STOPPING || s == Service.State.TERMINATED || s == Service.State.FAILED);
        if (!isExpected) {
            throw ex;
        }
        return null;
    }

    private boolean isShutdownException(Throwable ex) {
        return ex instanceof ObjectClosedException || ex instanceof CancellationException;
    }

    public CompletableFuture<Void> process(Operation operation) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        if (!this.isRunning()) {
            result.completeExceptionally(new IllegalContainerStateException("OperationProcessor is not running."));
        } else {
            log.debug("{}: process {}.", (Object)this.traceObjectId, (Object)operation);
            try {
                this.operationQueue.add((Object)new CompletableOperation(operation, result));
            }
            catch (Throwable e) {
                if (Exceptions.mustRethrow((Throwable)e)) {
                    throw e;
                }
                result.completeExceptionally(e);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void processOperations(Queue<CompletableOperation> operations) {
        OperationProcessor.log.debug("{}: processOperations (OperationCount = {}).", (Object)this.traceObjectId, (Object)operations.size());
        processTimer = new Timer();
        count = 0;
        while (true) {
            if (operations.isEmpty()) {
                return;
            }
            try {
                while (!operations.isEmpty()) {
                    o = (CompletableOperation)operations.poll();
                    this.metrics.operationQueueWaitTime(o.getTimer().getElapsedMillis());
                    try {
                        this.processOperation((CompletableOperation)o);
                        this.state.addPending((CompletableOperation)o);
                        ++count;
                    }
                    catch (Throwable ex) {
                        ex = Exceptions.unwrap((Throwable)ex);
                        this.state.failOperation((CompletableOperation)o, ex);
                        if (!OperationProcessor.isFatalException(ex)) continue;
                        throw ex;
                    }
                }
                if (!operations.isEmpty()) continue;
                this.metrics.currentState(this.operationQueue.size() + count, this.state.getPendingCount());
                this.metrics.processOperations(count, processTimer.getElapsedMillis());
                processTimer = new Timer();
                count = 0;
                if (!this.throttler.isThrottlingRequired()) {
                    operations = this.operationQueue.poll(1000);
                }
                if (operations.isEmpty()) {
                    OperationProcessor.log.debug("{}: processOperations (Flush).", (Object)this.traceObjectId);
                    o = this.stateLock;
                    synchronized (o) {
                        this.dataFrameBuilder.flush();
                        continue;
                    }
                }
                OperationProcessor.log.debug("{}: processOperations (Add OperationCount = {}).", (Object)this.traceObjectId, (Object)operations.size());
                continue;
            }
            catch (Throwable ex) {
                ex = Exceptions.unwrap((Throwable)ex);
                this.state.fail(ex, null);
                if (OperationProcessor.isFatalException(ex)) ** break;
                continue;
                this.cancelIncompleteOperations(operations, ex);
                throw Exceptions.sneakyThrow((Throwable)ex);
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOperation(CompletableOperation operation) throws Exception {
        Preconditions.checkState((!operation.isDone() ? 1 : 0) != 0, (Object)"The Operation has already been processed.");
        Operation entry = operation.getOperation();
        Object object = this.stateLock;
        synchronized (object) {
            this.metadataUpdater.preProcessOperation(entry);
            entry.setSequenceNumber(this.metadataUpdater.nextOperationSequenceNumber());
            this.dataFrameBuilder.append(entry);
            this.metadataUpdater.acceptOperation(entry);
        }
        log.trace("{}: DataFrameBuilder.Append {}.", (Object)this.traceObjectId, (Object)entry);
    }

    private void closeQueue(Throwable causingException) {
        Queue remainingOperations = this.operationQueue.close();
        if (remainingOperations != null && remainingOperations.size() > 0) {
            Throwable failException = causingException != null ? causingException : new CancellationException();
            this.cancelIncompleteOperations(remainingOperations, failException);
        }
        this.commitQueue.cancelPendingTake();
    }

    private void cancelIncompleteOperations(Iterable<CompletableOperation> operations, Throwable failException) {
        assert (failException != null) : "no exception to set";
        int cancelCount = 0;
        for (CompletableOperation o : operations) {
            if (o.isDone()) continue;
            this.state.failOperation(o, failException);
            ++cancelCount;
        }
        log.warn("{}: Cancelling {} operations with exception: {}.", new Object[]{this.traceObjectId, cancelCount, failException.toString()});
    }

    private static boolean isFatalException(Throwable ex) {
        return ex instanceof DataCorruptionException || ex instanceof DataLogWriterNotPrimaryException || ex instanceof ObjectClosedException;
    }

    private void processCommits(Collection<List<CompletableOperation>> items) {
        block3: {
            try {
                do {
                    Timer memoryCommitTimer = new Timer();
                    this.stateUpdater.process(items.stream().flatMap(Collection::stream).map(CompletableOperation::getOperation).iterator());
                    this.metrics.memoryCommit(items.size(), memoryCommitTimer.getElapsed());
                } while (!(items = this.commitQueue.poll(50)).isEmpty());
            }
            catch (Throwable ex) {
                log.error("{}: MemoryStateUpdater.process failure.", (Object)this.traceObjectId, (Object)ex);
                if (!OperationProcessor.isFatalException(ex)) break block3;
                Callbacks.invokeSafely(this::errorHandler, (Object)ex, null);
            }
        }
    }

    @SuppressFBWarnings(justification="generated code")
    public SegmentStoreMetrics.OperationProcessor getMetrics() {
        return this.metrics;
    }

    @ThreadSafe
    private class QueueProcessingState {
        @GuardedBy(value="stateLock")
        private ArrayList<CompletableOperation> nextFrameOperations;
        @GuardedBy(value="stateLock")
        private int pendingOperationCount;
        private final MetadataCheckpointPolicy checkpointPolicy;
        @GuardedBy(value="stateLock")
        private final ArrayDeque<DataFrameBuilder.CommitArgs> metadataTransactions;
        @GuardedBy(value="stateLock")
        private long highestCommittedDataFrame;

        private QueueProcessingState(MetadataCheckpointPolicy checkpointPolicy) {
            this.checkpointPolicy = (MetadataCheckpointPolicy)Preconditions.checkNotNull((Object)checkpointPolicy, (Object)"checkpointPolicy");
            this.nextFrameOperations = new ArrayList();
            this.metadataTransactions = new ArrayDeque();
            this.highestCommittedDataFrame = -1L;
            this.pendingOperationCount = 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void addPending(CompletableOperation operation) {
            Object object = OperationProcessor.this.stateLock;
            synchronized (object) {
                this.nextFrameOperations.add(operation);
                ++this.pendingOperationCount;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        int getPendingCount() {
            Object object = OperationProcessor.this.stateLock;
            synchronized (object) {
                return this.pendingOperationCount;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void frameSealed(DataFrameBuilder.CommitArgs commitArgs) {
            Object object = OperationProcessor.this.stateLock;
            synchronized (object) {
                commitArgs.setMetadataTransactionId(OperationProcessor.this.metadataUpdater.sealTransaction());
                commitArgs.setOperations(Collections.unmodifiableList(this.nextFrameOperations));
                this.nextFrameOperations = new ArrayList();
                this.metadataTransactions.addLast(commitArgs);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        void commit(DataFrameBuilder.CommitArgs commitArgs) {
            long addressSequence;
            List<List<CompletableOperation>> toAck;
            Timer timer;
            block13: {
                assert (commitArgs.getMetadataTransactionId() >= 0L) : "DataFrameBuilder.CommitArgs does not have a key set";
                log.debug("{}: CommitSuccess ({}).", (Object)OperationProcessor.this.traceObjectId, (Object)commitArgs);
                timer = new Timer();
                toAck = null;
                try {
                    OperationProcessor.this.metadata.recordTruncationMarker(commitArgs.getLastStartedSequenceNumber(), commitArgs.getLogAddress());
                    addressSequence = commitArgs.getLogAddress().getSequence();
                    Object object = OperationProcessor.this.stateLock;
                    // MONITORENTER : object
                    if (addressSequence > this.highestCommittedDataFrame) break block13;
                    log.debug("{}: CommitRejected ({}, HighestCommittedDataFrame = {}).", new Object[]{OperationProcessor.this.traceObjectId, commitArgs, this.highestCommittedDataFrame});
                    // MONITOREXIT : object
                    if (toAck != null) {
                        toAck.stream().flatMap(Collection::stream).forEach(CompletableOperation::complete);
                        OperationProcessor.this.metrics.operationsCompleted(toAck, timer.getElapsed());
                    }
                    this.checkpointPolicy.recordCommit(commitArgs.getDataFrameLength());
                    return;
                }
                catch (Throwable throwable) {
                    if (toAck != null) {
                        toAck.stream().flatMap(Collection::stream).forEach(CompletableOperation::complete);
                        OperationProcessor.this.metrics.operationsCompleted((Collection<List<CompletableOperation>>)toAck, timer.getElapsed());
                    }
                    this.checkpointPolicy.recordCommit(commitArgs.getDataFrameLength());
                    throw throwable;
                }
            }
            if (OperationProcessor.this.state() != Service.State.RUNNING) {
                log.debug("{}: CommitRejected ({}, Not Running, State = {}).", new Object[]{OperationProcessor.this.traceObjectId, commitArgs, OperationProcessor.this.state()});
                // MONITOREXIT : object
                if (toAck != null) {
                    toAck.stream().flatMap(Collection::stream).forEach(CompletableOperation::complete);
                    OperationProcessor.this.metrics.operationsCompleted(toAck, timer.getElapsed());
                }
                this.checkpointPolicy.recordCommit(commitArgs.getDataFrameLength());
                return;
            }
            toAck = this.collectCompletionCandidates(commitArgs);
            OperationProcessor.this.metadataUpdater.commit(commitArgs.getMetadataTransactionId());
            toAck.forEach(arg_0 -> ((BlockingDrainingQueue)OperationProcessor.this.commitQueue).add(arg_0));
            this.highestCommittedDataFrame = addressSequence;
            // MONITOREXIT : object
            if (toAck != null) {
                toAck.stream().flatMap(Collection::stream).forEach(CompletableOperation::complete);
                OperationProcessor.this.metrics.operationsCompleted(toAck, timer.getElapsed());
            }
            this.checkpointPolicy.recordCommit(commitArgs.getDataFrameLength());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void fail(Throwable ex, DataFrameBuilder.CommitArgs commitArgs) {
            List<CompletableOperation> toFail = null;
            try {
                Object object = OperationProcessor.this.stateLock;
                synchronized (object) {
                    toFail = this.collectFailureCandidates(commitArgs);
                    this.pendingOperationCount -= toFail.size();
                }
            }
            finally {
                if (toFail != null) {
                    toFail.forEach(o -> this.failOperation((CompletableOperation)o, ex));
                    OperationProcessor.this.metrics.operationsFailed(toFail);
                }
            }
            Callbacks.invokeSafely(OperationProcessor.this::errorHandler, (Object)ex, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void failOperation(CompletableOperation operation, Throwable failureCause) {
            Object object = OperationProcessor.this.stateLock;
            synchronized (object) {
                Throwable stopException = OperationProcessor.this.getStopException();
                if (stopException != null) {
                    failureCause = stopException;
                }
            }
            operation.fail(failureCause);
        }

        @GuardedBy(value="stateLock")
        private List<List<CompletableOperation>> collectCompletionCandidates(DataFrameBuilder.CommitArgs commitArgs) {
            ArrayList<List<CompletableOperation>> toAck = new ArrayList<List<CompletableOperation>>();
            long transactionId = commitArgs.getMetadataTransactionId();
            boolean checkpointExists = false;
            while (!this.metadataTransactions.isEmpty() && this.metadataTransactions.peekFirst().getMetadataTransactionId() <= transactionId) {
                DataFrameBuilder.CommitArgs t = this.metadataTransactions.pollFirst();
                checkpointExists |= t.getMetadataTransactionId() == transactionId;
                if (t.getOperations().size() <= 0) continue;
                toAck.add(t.getOperations());
                this.pendingOperationCount -= t.getOperations().size();
            }
            if (!checkpointExists) {
                log.warn("{}: No Metadata UpdateTransaction found for '{}' (Count={}). This is expected after a critical failure or when OperationProcessor is shutting down.", new Object[]{OperationProcessor.this.traceObjectId, this.metadataTransactions.size(), commitArgs});
                assert (this.metadataTransactions.isEmpty()) : "No Metadata UpdateTransaction found for given CommitArgs, but there are still entries in metadataTransaction.";
            }
            return toAck;
        }

        @GuardedBy(value="stateLock")
        private List<CompletableOperation> collectFailureCandidates(DataFrameBuilder.CommitArgs commitArgs) {
            ArrayList<CompletableOperation> candidates = new ArrayList<CompletableOperation>();
            if (commitArgs != null) {
                OperationProcessor.this.metadataUpdater.rollback(commitArgs.getMetadataTransactionId());
                while (!this.metadataTransactions.isEmpty() && this.metadataTransactions.peekLast().getMetadataTransactionId() >= commitArgs.getMetadataTransactionId()) {
                    DataFrameBuilder.CommitArgs t2 = this.metadataTransactions.pollLast();
                    candidates.addAll(t2.getOperations());
                }
            } else {
                this.metadataTransactions.forEach(t -> candidates.addAll(t.getOperations()));
                this.metadataTransactions.clear();
                OperationProcessor.this.metadataUpdater.rollback(0L);
            }
            candidates.addAll(this.nextFrameOperations);
            this.nextFrameOperations.clear();
            return candidates;
        }
    }
}

