/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.impl;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.LongStream;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerState;
import org.apache.ratis.server.impl.StateMachineMetrics;
import org.apache.ratis.server.impl.StateMachineUpdater;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StateMachineUpdater
implements Runnable {
    static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
    private final Consumer<Object> infoIndexChange;
    private final Consumer<Object> debugIndexChange;
    private final String name;
    private final StateMachine stateMachine;
    private final RaftServerImpl server;
    private final RaftLog raftLog;
    private final Long autoSnapshotThreshold;
    private final boolean purgeUptoSnapshotIndex;
    private final Thread updater;
    private final AwaitForSignal awaitForSignal;
    private final RaftLogIndex appliedIndex;
    private final RaftLogIndex snapshotIndex;
    private final AtomicReference<Long> stopIndex = new AtomicReference();
    private volatile State state = State.RUNNING;
    private final SnapshotRetentionPolicy snapshotRetentionPolicy;
    private final MemoizedSupplier<StateMachineMetrics> stateMachineMetrics;
    private final Consumer<Long> appliedIndexConsumer;

    StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server, ServerState serverState, long lastAppliedIndex, RaftProperties properties, Consumer<Long> appliedIndexConsumer) {
        this.name = serverState.getMemberId() + "-" + JavaUtils.getClassSimpleName(this.getClass());
        this.appliedIndexConsumer = appliedIndexConsumer;
        this.infoIndexChange = s -> LOG.info("{}: {}", (Object)this.name, s);
        this.debugIndexChange = s -> LOG.debug("{}: {}", (Object)this.name, s);
        this.stateMachine = stateMachine;
        this.server = server;
        this.raftLog = serverState.getLog();
        this.appliedIndex = new RaftLogIndex((Object)"appliedIndex", lastAppliedIndex);
        this.snapshotIndex = new RaftLogIndex((Object)"snapshotIndex", lastAppliedIndex);
        boolean autoSnapshot = RaftServerConfigKeys.Snapshot.autoTriggerEnabled((RaftProperties)properties);
        this.autoSnapshotThreshold = autoSnapshot ? Long.valueOf(RaftServerConfigKeys.Snapshot.autoTriggerThreshold((RaftProperties)properties)) : null;
        int numSnapshotFilesRetained = RaftServerConfigKeys.Snapshot.retentionFileNum((RaftProperties)properties);
        this.snapshotRetentionPolicy = new /* Unavailable Anonymous Inner Class!! */;
        this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex((RaftProperties)properties);
        this.updater = Daemon.newBuilder().setName(this.name).setRunnable((Runnable)this).setThreadGroup(server.getThreadGroup()).build();
        this.awaitForSignal = new AwaitForSignal((Object)this.name);
        this.stateMachineMetrics = MemoizedSupplier.valueOf(() -> StateMachineMetrics.getStateMachineMetrics((RaftServerImpl)server, (RaftLogIndex)this.appliedIndex, (StateMachine)stateMachine));
    }

    void start() {
        this.stateMachineMetrics.get();
        this.updater.start();
        this.notifyAppliedIndex(this.appliedIndex.get());
    }

    private void stop() {
        this.state = State.STOP;
        try {
            this.stateMachine.close();
            if (this.stateMachineMetrics.isInitialized()) {
                ((StateMachineMetrics)this.stateMachineMetrics.get()).unregister();
            }
        }
        catch (Throwable t) {
            LOG.warn(this.name + ": Failed to close " + JavaUtils.getClassSimpleName(this.stateMachine.getClass()) + " " + this.stateMachine, t);
        }
    }

    void stopAndJoin() throws InterruptedException {
        if (this.state == State.EXCEPTION) {
            this.stop();
            return;
        }
        if (this.stopIndex.compareAndSet(null, this.raftLog.getLastCommittedIndex())) {
            this.notifyUpdater();
            LOG.info("{}: set stopIndex = {}", (Object)this, (Object)this.stopIndex);
        }
        this.updater.join();
    }

    void reloadStateMachine() {
        this.state = State.RELOAD;
        this.notifyUpdater();
    }

    void notifyUpdater() {
        this.awaitForSignal.signal();
    }

    public String toString() {
        return this.name;
    }

    @Override
    public void run() {
        while (this.state != State.STOP) {
            try {
                this.waitForCommit();
                if (this.state == State.RELOAD) {
                    this.reload();
                }
                MemoizedSupplier futures = this.applyLog();
                this.checkAndTakeSnapshot(futures);
                if (!this.shouldStop()) continue;
                this.checkAndTakeSnapshot(futures);
                this.stop();
            }
            catch (Throwable t) {
                if (t instanceof InterruptedException && this.state == State.STOP) {
                    LOG.info("{} was interrupted.  Exiting ...", (Object)this);
                    continue;
                }
                this.state = State.EXCEPTION;
                LOG.error(this + " caught a Throwable.", t);
                this.server.close();
            }
        }
    }

    private void waitForCommit() throws InterruptedException {
        long applied = this.getLastAppliedIndex();
        while (applied >= this.raftLog.getLastCommittedIndex() && this.state == State.RUNNING && !this.shouldStop()) {
            if (!this.awaitForSignal.await(100L, TimeUnit.MILLISECONDS)) continue;
            return;
        }
    }

    private void reload() throws IOException {
        Preconditions.assertTrue((this.stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED ? 1 : 0) != 0);
        this.stateMachine.reinitialize();
        SnapshotInfo snapshot = this.stateMachine.getLatestSnapshot();
        Objects.requireNonNull(snapshot, "snapshot == null");
        long i = snapshot.getIndex();
        this.snapshotIndex.setUnconditionally(i, this.infoIndexChange);
        this.appliedIndex.setUnconditionally(i, this.infoIndexChange);
        this.notifyAppliedIndex(i);
        this.state = State.RUNNING;
    }

    private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws RaftLogIOException {
        long applied;
        MemoizedSupplier futures = MemoizedSupplier.valueOf(ArrayList::new);
        long committed = this.raftLog.getLastCommittedIndex();
        while ((applied = this.getLastAppliedIndex()) < committed && this.state == State.RUNNING && !this.shouldStop()) {
            long nextIndex = applied + 1L;
            RaftProtos.LogEntryProto next = this.raftLog.get(nextIndex);
            if (next != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{}: applying nextIndex={}, nextLog={}", new Object[]{this, nextIndex, LogProtoUtils.toLogEntryString((RaftProtos.LogEntryProto)next)});
                } else {
                    LOG.debug("{}: applying nextIndex={}", (Object)this, (Object)nextIndex);
                }
                CompletableFuture f = this.server.applyLogToStateMachine(next);
                long incremented = this.appliedIndex.incrementAndGet(this.debugIndexChange);
                Preconditions.assertTrue((incremented == nextIndex ? 1 : 0) != 0);
                if (f == null) continue;
                ((List)futures.get()).add(f);
                f.thenAccept(m -> this.notifyAppliedIndex(incremented));
                continue;
            }
            LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", new Object[]{this, nextIndex, this.state});
            break;
        }
        return futures;
    }

    private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> futures) throws ExecutionException, InterruptedException {
        if (this.shouldTakeSnapshot()) {
            if (futures.isInitialized()) {
                JavaUtils.allOf((Collection)((Collection)futures.get())).get();
            }
            this.takeSnapshot();
        }
    }

    private void takeSnapshot() {
        long i;
        try {
            Timer.Context takeSnapshotTimerContext = ((StateMachineMetrics)this.stateMachineMetrics.get()).getTakeSnapshotTimer().time();
            i = this.stateMachine.takeSnapshot();
            takeSnapshotTimerContext.stop();
            this.server.getSnapshotRequestHandler().completeTakingSnapshot(i);
            long lastAppliedIndex = this.getLastAppliedIndex();
            if (i > lastAppliedIndex) {
                throw new StateMachineException("Bug in StateMachine: snapshot index = " + i + " > appliedIndex = " + lastAppliedIndex + "; StateMachine class=" + this.stateMachine.getClass().getName() + ", stateMachine=" + this.stateMachine);
            }
            this.stateMachine.getStateMachineStorage().cleanupOldSnapshots(this.snapshotRetentionPolicy);
        }
        catch (IOException e) {
            LOG.error(this.name + ": Failed to take snapshot", (Throwable)e);
            return;
        }
        if (i >= 0L) {
            long purgeIndex;
            LOG.info("{}: Took a snapshot at index {}", (Object)this.name, (Object)i);
            this.snapshotIndex.updateIncreasingly(i, this.infoIndexChange);
            if (this.purgeUptoSnapshotIndex) {
                purgeIndex = i;
            } else {
                LongStream commitIndexStream = this.server.getCommitInfos().stream().mapToLong(RaftProtos.CommitInfoProto::getCommitIndex);
                purgeIndex = LongStream.concat(LongStream.of(i), commitIndexStream).min().orElse(i);
            }
            this.raftLog.purge(purgeIndex);
        }
    }

    private boolean shouldStop() {
        return Optional.ofNullable(this.stopIndex.get()).filter(i -> i <= this.getLastAppliedIndex()).isPresent();
    }

    private boolean shouldTakeSnapshot() {
        if (this.state == State.RUNNING && this.server.getSnapshotRequestHandler().shouldTriggerTakingSnapshot()) {
            return true;
        }
        if (this.autoSnapshotThreshold == null) {
            return false;
        }
        if (this.shouldStop()) {
            return this.getLastAppliedIndex() - this.snapshotIndex.get() > 0L;
        }
        return this.state == State.RUNNING && this.getStateMachineLastAppliedIndex() - this.snapshotIndex.get() >= this.autoSnapshotThreshold;
    }

    private long getLastAppliedIndex() {
        return this.appliedIndex.get();
    }

    private void notifyAppliedIndex(long index) {
        this.appliedIndexConsumer.accept(index);
    }

    long getStateMachineLastAppliedIndex() {
        return Optional.ofNullable(this.stateMachine.getLastAppliedTermIndex()).map(TermIndex::getIndex).orElse(-1L);
    }
}

