/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.segmentstore.server.logs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.concurrent.Services;
import io.pravega.common.util.Retry;
import io.pravega.common.util.SequencedItemList;
import io.pravega.segmentstore.contracts.StreamingException;
import io.pravega.segmentstore.server.ContainerOfflineException;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.IllegalContainerStateException;
import io.pravega.segmentstore.server.OperationLog;
import io.pravega.segmentstore.server.ReadIndex;
import io.pravega.segmentstore.server.UpdateableContainerMetadata;
import io.pravega.segmentstore.server.logs.DurableLogConfig;
import io.pravega.segmentstore.server.logs.MemoryStateUpdater;
import io.pravega.segmentstore.server.logs.MetadataCheckpointPolicy;
import io.pravega.segmentstore.server.logs.OperationProcessor;
import io.pravega.segmentstore.server.logs.RecoveryProcessor;
import io.pravega.segmentstore.server.logs.operations.MetadataCheckpointOperation;
import io.pravega.segmentstore.server.logs.operations.Operation;
import io.pravega.segmentstore.server.logs.operations.StorageMetadataCheckpointOperation;
import io.pravega.segmentstore.storage.DataLogDisabledException;
import io.pravega.segmentstore.storage.DurableDataLog;
import io.pravega.segmentstore.storage.DurableDataLogFactory;
import io.pravega.segmentstore.storage.LogAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class DurableLog
extends AbstractService
implements OperationLog {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(DurableLog.class);
    private static final Duration RECOVERY_TIMEOUT = Duration.ofSeconds(30L);
    private final String traceObjectId;
    private final SequencedItemList<Operation> inMemoryOperationLog;
    private final DurableDataLog durableDataLog;
    private final MemoryStateUpdater memoryStateUpdater;
    private final OperationProcessor operationProcessor;
    private final UpdateableContainerMetadata metadata;
    @GuardedBy(value="tailReads")
    private final Set<TailRead> tailReads;
    private final ScheduledExecutorService executor;
    private final AtomicReference<Throwable> stopException = new AtomicReference();
    private final AtomicBoolean closed;
    private final CompletableFuture<Void> delayedStart;
    private final Retry.RetryAndThrowConditionally delayedStartRetry;

    public DurableLog(DurableLogConfig config, UpdateableContainerMetadata metadata, DurableDataLogFactory dataFrameLogFactory, ReadIndex readIndex, ScheduledExecutorService executor) {
        Preconditions.checkNotNull((Object)config, (Object)"config");
        this.metadata = (UpdateableContainerMetadata)Preconditions.checkNotNull((Object)metadata, (Object)"metadata");
        Preconditions.checkNotNull((Object)dataFrameLogFactory, (Object)"dataFrameLogFactory");
        Preconditions.checkNotNull((Object)readIndex, (Object)"readIndex");
        this.executor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)executor, (Object)"executor");
        this.durableDataLog = dataFrameLogFactory.createDurableDataLog(metadata.getContainerId());
        assert (this.durableDataLog != null) : "dataFrameLogFactory created null durableDataLog.";
        this.traceObjectId = String.format("DurableLog[%s]", metadata.getContainerId());
        this.inMemoryOperationLog = this.createInMemoryLog();
        this.memoryStateUpdater = new MemoryStateUpdater(this.inMemoryOperationLog, readIndex, this::triggerTailReads);
        MetadataCheckpointPolicy checkpointPolicy = new MetadataCheckpointPolicy(config, this::queueMetadataCheckpoint, this.executor);
        this.operationProcessor = new OperationProcessor(this.metadata, this.memoryStateUpdater, this.durableDataLog, checkpointPolicy, executor);
        Services.onStop((Service)this.operationProcessor, this::queueStoppedHandler, this::queueFailedHandler, (Executor)this.executor);
        this.tailReads = new HashSet<TailRead>();
        this.closed = new AtomicBoolean();
        this.delayedStart = new CompletableFuture();
        this.delayedStartRetry = Retry.withExpBackoff((long)config.getStartRetryDelay().toMillis(), (int)1, (int)Integer.MAX_VALUE).retryWhen(ex -> Exceptions.unwrap((Throwable)ex) instanceof DataLogDisabledException);
    }

    @VisibleForTesting
    protected SequencedItemList<Operation> createInMemoryLog() {
        return new SequencedItemList();
    }

    @Override
    public void close() {
        if (!this.closed.get()) {
            Futures.await((CompletableFuture)Services.stopAsync((Service)this, (Executor)this.executor));
            this.operationProcessor.close();
            this.durableDataLog.close();
            log.info("{}: Closed.", (Object)this.traceObjectId);
            this.closed.set(true);
        }
    }

    protected void doStart() {
        log.info("{}: Starting.", (Object)this.traceObjectId);
        this.delayedStartRetry.runAsync(() -> this.tryStartOnce().whenComplete((v, ex) -> {
            if (ex != null) {
                if (Exceptions.unwrap((Throwable)ex) instanceof DataLogDisabledException) {
                    this.notifyStartComplete(null);
                }
                throw new CompletionException((Throwable)ex);
            }
            this.notifyDelayedStartComplete(null);
        }), this.executor).exceptionally(this::notifyDelayedStartComplete);
    }

    private Void notifyDelayedStartComplete(Throwable failureCause) {
        if (failureCause == null) {
            this.delayedStart.complete(null);
        } else {
            this.delayedStart.completeExceptionally(failureCause);
        }
        this.notifyStartComplete(failureCause);
        return null;
    }

    private void notifyStartComplete(Throwable failureCause) {
        if (failureCause == null && this.state() == Service.State.STARTING) {
            log.info("{}: Started ({}).", (Object)this.traceObjectId, (Object)(this.isOffline() ? "OFFLINE" : "Online"));
            this.notifyStarted();
        }
        if (failureCause != null) {
            failureCause = Exceptions.unwrap((Throwable)failureCause);
            this.stopException.set(failureCause);
            if (this.state() == Service.State.STARTING) {
                this.notifyFailed(failureCause);
                this.operationProcessor.stopAsync();
            } else {
                this.doStop();
            }
        }
    }

    private CompletableFuture<Void> tryStartOnce() {
        return CompletableFuture.supplyAsync(this::performRecovery, this.executor).thenCompose(anyItemsRecovered -> Services.startAsync((Service)this.operationProcessor, (Executor)this.executor).thenComposeAsync(v -> anyItemsRecovered != false ? CompletableFuture.completedFuture(null) : this.queueMetadataCheckpoint(), (Executor)this.executor));
    }

    private boolean performRecovery() {
        Preconditions.checkState((this.state() == Service.State.STARTING || this.state() == Service.State.RUNNING && this.isOffline() ? 1 : 0) != 0, (Object)"Invalid State for recovery.");
        this.operationProcessor.getMetrics().operationLogInit();
        Timer timer = new Timer();
        try {
            this.durableDataLog.initialize(RECOVERY_TIMEOUT);
            RecoveryProcessor p = new RecoveryProcessor(this.metadata, this.durableDataLog, this.memoryStateUpdater);
            int recoveredItemCount = p.performRecovery();
            this.operationProcessor.getMetrics().operationsCompleted(recoveredItemCount, timer.getElapsed());
            Preconditions.checkState((!this.metadata.isRecoveryMode() ? 1 : 0) != 0, (Object)"Recovery completed but Metadata is still in Recovery Mode.");
            return recoveredItemCount > 0;
        }
        catch (Exception ex) {
            log.error("{} Recovery FAILED.", (Object)this.traceObjectId, (Object)ex);
            if (Exceptions.unwrap((Throwable)ex) instanceof DataCorruptionException) {
                try {
                    this.durableDataLog.disable();
                    log.info("{} Log disabled due to DataCorruptionException during recovery.", (Object)this.traceObjectId);
                }
                catch (Exception disableEx) {
                    log.warn("{}: Unable to disable log after DataCorruptionException during recovery.", (Object)this.traceObjectId, (Object)disableEx);
                    ex.addSuppressed(disableEx);
                }
            }
            throw ex;
        }
    }

    protected void doStop() {
        long traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.traceObjectId, (String)"doStop", (Object[])new Object[0]);
        log.info("{}: Stopping.", (Object)this.traceObjectId);
        ((CompletableFuture)Services.stopAsync((Service)this.operationProcessor, (Executor)this.executor).whenCompleteAsync((r, ex) -> {
            this.cancelTailReads();
            this.durableDataLog.close();
            Throwable cause = this.stopException.get();
            if (cause == null && this.operationProcessor.state() == Service.State.FAILED) {
                cause = this.operationProcessor.failureCause();
            }
            this.delayedStart.completeExceptionally((Throwable)(cause == null ? new ObjectClosedException((Object)this) : cause));
            if (cause == null) {
                this.notifyStopped();
            } else {
                this.notifyFailed(cause);
            }
            log.info("{}: Stopped.", (Object)this.traceObjectId);
            LoggerHelpers.traceLeave((Logger)log, (String)this.traceObjectId, (String)"doStop", (long)traceId, (Object[])new Object[0]);
        }, (Executor)this.executor)).exceptionally(ex -> {
            this.notifyFailed((Throwable)ex);
            return null;
        });
    }

    @Override
    public int getId() {
        return this.metadata.getContainerId();
    }

    @Override
    public boolean isOffline() {
        return !this.delayedStart.isDone();
    }

    @Override
    public CompletableFuture<Void> add(Operation operation, Duration timeout) {
        this.ensureRunning();
        return this.operationProcessor.process(operation);
    }

    @Override
    public CompletableFuture<Void> truncate(long upToSequenceNumber, Duration timeout) {
        this.ensureRunning();
        Preconditions.checkArgument((boolean)this.metadata.isValidTruncationPoint(upToSequenceNumber), (Object)"Invalid Truncation Point. Must refer to a MetadataCheckpointOperation.");
        long actualTruncationSequenceNumber = upToSequenceNumber - 1L;
        LogAddress truncationFrameAddress = this.metadata.getClosestTruncationMarker(actualTruncationSequenceNumber);
        if (truncationFrameAddress == null) {
            return CompletableFuture.completedFuture(null);
        }
        TimeoutTimer timer = new TimeoutTimer(timeout);
        log.info("{}: Truncate (OperationSequenceNumber = {}, DataFrameAddress = {}).", new Object[]{this.traceObjectId, upToSequenceNumber, truncationFrameAddress});
        return ((CompletableFuture)this.add(new StorageMetadataCheckpointOperation(), timer.getRemaining()).thenComposeAsync(v -> this.durableDataLog.truncate(truncationFrameAddress, timer.getRemaining()), (Executor)this.executor)).thenRunAsync(() -> {
            int count = this.inMemoryOperationLog.truncate(actualTruncationSequenceNumber);
            this.metadata.removeTruncationMarkers(actualTruncationSequenceNumber);
            this.operationProcessor.getMetrics().operationLogTruncate(count);
        }, this.executor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Iterator<Operation>> read(long afterSequenceNumber, int maxCount, Duration timeout) {
        Operation lastOp;
        this.ensureRunning();
        log.debug("{}: Read (AfterSequenceNumber = {}, MaxCount = {}).", new Object[]{this.traceObjectId, afterSequenceNumber, maxCount});
        Iterator logReadResult = this.inMemoryOperationLog.read(afterSequenceNumber, maxCount);
        if (logReadResult.hasNext()) {
            return CompletableFuture.completedFuture(logReadResult);
        }
        CompletableFuture<Iterator<Operation>> result = null;
        Set<TailRead> set = this.tailReads;
        synchronized (set) {
            lastOp = (Operation)this.inMemoryOperationLog.getLast();
            if (lastOp == null || lastOp.getSequenceNumber() <= afterSequenceNumber) {
                TailRead tailRead = new TailRead(afterSequenceNumber, maxCount, timeout, this.executor);
                result = tailRead.future;
                this.tailReads.add(tailRead);
                result.whenComplete((r, ex) -> this.unregisterTailRead(tailRead));
            }
        }
        if (result == null) {
            logReadResult = this.inMemoryOperationLog.read(afterSequenceNumber, maxCount);
            assert (logReadResult.hasNext()) : String.format("Unable to read anything after SeqNo %d, even though last operation SeqNo == %d", afterSequenceNumber, lastOp == null ? -1L : lastOp.getSequenceNumber());
            result = CompletableFuture.completedFuture(logReadResult);
        }
        return result;
    }

    @Override
    public CompletableFuture<Void> awaitOnline() {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        if (this.state() != Service.State.RUNNING) {
            throw new IllegalContainerStateException(this.getId(), this.state(), Service.State.RUNNING);
        }
        return this.delayedStart;
    }

    private void ensureRunning() {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        if (this.state() != Service.State.RUNNING) {
            throw new IllegalContainerStateException(this.getId(), this.state(), Service.State.RUNNING);
        }
        if (this.isOffline()) {
            throw new ContainerOfflineException(this.getId());
        }
    }

    private void queueFailedHandler(Throwable cause) {
        log.warn("{}: QueueProcessor failed with exception {}", (Object)this.traceObjectId, (Object)cause);
        this.stopException.set(cause);
        this.stopAsync();
    }

    private void queueStoppedHandler() {
        if (this.state() != Service.State.STOPPING && this.state() != Service.State.FAILED) {
            log.warn("{}: OperationProcessor stopped unexpectedly (no error) but DurableLog was not currently stopping. Shutting down DurableLog.", (Object)this.traceObjectId);
            this.stopException.set((Throwable)new StreamingException("OperationProcessor stopped unexpectedly (no error) but DurableLog was not currently stopping."));
            this.stopAsync();
        }
    }

    private CompletableFuture<Void> queueMetadataCheckpoint() {
        log.debug("{}: Queuing MetadataCheckpointOperation.", (Object)this.traceObjectId);
        return this.operationProcessor.process(new MetadataCheckpointOperation()).thenAccept(seqNo -> log.info("{}: MetadataCheckpointOperation durably stored.", (Object)this.traceObjectId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unregisterTailRead(TailRead tailRead) {
        Set<TailRead> set = this.tailReads;
        synchronized (set) {
            this.tailReads.remove(tailRead);
        }
        if (tailRead.future != null && !tailRead.future.isDone()) {
            tailRead.future.cancel(true);
        }
    }

    private void triggerTailReads() {
        this.executor.execute(() -> {
            List<TailRead> toTrigger;
            Set<TailRead> set = this.tailReads;
            synchronized (set) {
                Operation lastOp = (Operation)this.inMemoryOperationLog.getLast();
                if (lastOp != null) {
                    long seqNo = lastOp.getSequenceNumber();
                    toTrigger = this.tailReads.stream().filter(e -> e.afterSequenceNumber < seqNo).collect(Collectors.toList());
                } else {
                    toTrigger = Collections.emptyList();
                }
            }
            for (TailRead tr : toTrigger) {
                tr.future.complete((Iterator<Operation>)Futures.runOrFail(() -> this.inMemoryOperationLog.read(tr.afterSequenceNumber, tr.maxCount), tr.future));
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelTailReads() {
        ArrayList<TailRead> reads;
        Set<TailRead> set = this.tailReads;
        synchronized (set) {
            reads = new ArrayList<TailRead>(this.tailReads);
        }
        reads.forEach(this::unregisterTailRead);
    }

    private static class TailRead {
        final long afterSequenceNumber;
        final int maxCount;
        final CompletableFuture<Iterator<Operation>> future;

        TailRead(long afterSequenceNumber, int maxCount, Duration timeout, ScheduledExecutorService executor) {
            this.afterSequenceNumber = afterSequenceNumber;
            this.maxCount = maxCount;
            this.future = Futures.futureWithTimeout((Duration)timeout, (ScheduledExecutorService)executor);
        }

        public String toString() {
            return String.format("SeqNo = %d, Count = %d", this.afterSequenceNumber, this.maxCount);
        }
    }
}

