/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ChecksumException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogFormat;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
import org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.StringUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    static final int NUM_SERVERS = 3;

    public ServerRestartTests() {
        Slf4jUtils.setLogLevel((Logger)RaftLog.LOG, (Level)Level.DEBUG);
        RaftProperties prop = this.getProperties();
        prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
        RaftServerConfigKeys.Log.setSegmentSizeMax((RaftProperties)prop, (SizeInBytes)SizeInBytes.valueOf((String)"8KB"));
    }

    @Test
    public void testRestartFollower() throws Exception {
        this.runWithNewCluster(3, this::runTestRestartFollower);
    }

    void runTestRestartFollower(MiniRaftCluster cluster) throws Exception {
        RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        RaftPeerId leaderId = cluster.getLeader().getId();
        AtomicInteger messageCount = new AtomicInteger();
        Supplier<Message> newMessage = () -> new RaftTestUtil.SimpleMessage("m" + messageCount.getAndIncrement());
        ServerRestartTests.writeSomething(newMessage, cluster);
        RaftPeerId followerId = ((RaftServer.Division)cluster.getFollowers().get(0)).getId();
        this.LOG.info("Restart follower {}", (Object)followerId);
        cluster.restartServer(followerId, false);
        ServerRestartTests.writeSomething(newMessage, cluster);
        int truncatedMessageIndex = messageCount.get() - 1;
        long leaderLastIndex = cluster.getLeader().getRaftLog().getLastEntryTermIndex().getIndex();
        RaftServer.Division followerState = cluster.getDivision(followerId);
        JavaUtils.attemptRepeatedly(() -> {
            Assertions.assertTrue((followerState.getInfo().getLastAppliedIndex() >= leaderLastIndex ? 1 : 0) != 0);
            return null;
        }, (int)10, (TimeDuration)ONE_SECOND, (String)"follower catchup", (Logger)this.LOG);
        RaftServer.Division follower = cluster.restartServer(followerId, false);
        RaftLog followerLog = follower.getRaftLog();
        long followerLastIndex = followerLog.getLastEntryTermIndex().getIndex();
        Assertions.assertTrue((followerLastIndex >= leaderLastIndex ? 1 : 0) != 0);
        long leaderFinalIndex = cluster.getLeader().getRaftLog().getLastEntryTermIndex().getIndex();
        Assertions.assertEquals((long)leaderFinalIndex, (long)followerLastIndex);
        File followerOpenLogFile = ServerRestartTests.getOpenLogFile(follower);
        File leaderOpenLogFile = ServerRestartTests.getOpenLogFile(cluster.getDivision(leaderId));
        for (RaftServer.Division d : cluster.getFollowers()) {
            d.close();
        }
        cluster.getDivision(leaderId).close();
        ServerRestartTests.assertTruncatedLog(followerId, followerOpenLogFile, followerLastIndex, cluster);
        ServerRestartTests.assertTruncatedLog(leaderId, leaderOpenLogFile, leaderFinalIndex, cluster);
        cluster.restart(false);
        ServerRestartTests.writeSomething(newMessage, cluster);
        cluster.restart(false);
        try (RaftClient client = cluster.createClient();){
            for (int i = 0; i < messageCount.get(); ++i) {
                if (i == truncatedMessageIndex) continue;
                RaftTestUtil.SimpleMessage m = new RaftTestUtil.SimpleMessage("m" + i);
                RaftClientReply reply = client.io().sendReadOnly((Message)m);
                Assertions.assertTrue((boolean)reply.isSuccess());
                this.LOG.info("query {}: {} {}", new Object[]{m, reply, RaftProtos.LogEntryProto.parseFrom((ByteString)reply.getMessage().getContent())});
            }
        }
    }

    static void writeSomething(Supplier<Message> newMessage, MiniRaftCluster cluster) throws Exception {
        try (RaftClient client = cluster.createClient();){
            for (int i = 0; i < 10; ++i) {
                Assertions.assertTrue((boolean)client.io().send(newMessage.get()).isSuccess());
            }
        }
    }

    static void assertTruncatedLog(RaftPeerId id, File openLogFile, long lastIndex, MiniRaftCluster cluster) throws Exception {
        if (openLogFile.length() > 0L) {
            FileUtils.truncateFile((File)openLogFile, (long)(openLogFile.length() - 1L));
        }
        RaftServer.Division server = cluster.restartServer(id, false);
        Assertions.assertEquals((long)(lastIndex - 1L), (long)server.getRaftLog().getLastEntryTermIndex().getIndex());
        server.getRaftServer().close();
    }

    static File getOpenLogFile(RaftServer.Division server) throws Exception {
        List openLogs = SegmentedRaftLogTestUtils.getOpenLogFiles((RaftServer.Division)server);
        Assertions.assertEquals((int)1, (int)openLogs.size());
        return ((Path)openLogs.get(0)).toFile();
    }

    @Test
    public void testRestartWithCorruptedLogHeader() throws Exception {
        this.runWithNewCluster(3, this::runTestRestartWithCorruptedLogHeader);
    }

    void runTestRestartWithCorruptedLogHeader(MiniRaftCluster cluster) throws Exception {
        RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        for (RaftServer.Division impl : cluster.iterateDivisions()) {
            JavaUtils.attemptRepeatedly(() -> ServerRestartTests.getOpenLogFile(impl), (int)10, (TimeDuration)TimeDuration.valueOf((long)100L, (TimeUnit)TimeUnit.MILLISECONDS), (String)(impl.getId() + ": wait for log file creation"), (Logger)this.LOG);
        }
        for (RaftServer s : cluster.getServers()) {
            s.close();
        }
        for (RaftServer.Division impl : cluster.iterateDivisions()) {
            File openLogFile = (File)JavaUtils.attemptRepeatedly(() -> ServerRestartTests.getOpenLogFile(impl), (int)10, (TimeDuration)HUNDRED_MILLIS, (String)(impl.getId() + "-getOpenLogFile"), (Logger)this.LOG);
            for (int i = 0; i < SegmentedRaftLogFormat.getHeaderLength(); ++i) {
                ServerRestartTests.assertCorruptedLogHeader(impl.getId(), openLogFile, i, cluster, this.LOG);
                Assertions.assertTrue((boolean)SegmentedRaftLogTestUtils.getOpenLogFiles((RaftServer.Division)impl).isEmpty());
            }
        }
    }

    static void assertCorruptedLogHeader(RaftPeerId id, File openLogFile, int partialLength, MiniRaftCluster cluster, Logger log) throws Exception {
        Preconditions.assertTrue((partialLength < SegmentedRaftLogFormat.getHeaderLength() ? 1 : 0) != 0);
        try (RandomAccessFile raf = new RandomAccessFile(openLogFile, "rw");){
            ByteBuffer header = SegmentedRaftLogFormat.getHeaderBytebuffer();
            log.info("header    = {}", (Object)StringUtils.bytes2HexString((ByteBuffer)header));
            byte[] corrupted = new byte[header.remaining()];
            header.get(corrupted, 0, partialLength);
            log.info("corrupted = {}", (Object)StringUtils.bytes2HexString((byte[])corrupted));
            raf.write(corrupted);
        }
        RaftServer.Division server = cluster.restartServer(id, false);
        server.getRaftServer().close();
    }

    @Test
    public void testRestartCommitIndex() throws Exception {
        this.runWithNewCluster(3, this::runTestRestartCommitIndex);
    }

    void runTestRestartCommitIndex(MiniRaftCluster cluster) throws Exception {
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)10);
        ArrayList futures = new ArrayList(messages.length);
        for (int i = 0; i < messages.length; ++i) {
            CompletableFuture f = new CompletableFuture();
            futures.add(f);
            RaftTestUtil.SimpleMessage m = messages[i];
            new Thread(() -> {
                try (RaftClient client = cluster.createClient();){
                    Assertions.assertTrue((boolean)client.io().send((Message)m).isSuccess());
                }
                catch (IOException e) {
                    throw new IllegalStateException("Failed to send " + m, e);
                }
                f.complete(null);
            }).start();
        }
        JavaUtils.allOf(futures).get();
        this.LOG.info("sent {} messages.", (Object)messages.length);
        ArrayList<RaftPeerId> ids = new ArrayList<RaftPeerId>();
        RaftServer.Division leader = cluster.getLeader();
        RaftLog leaderLog = leader.getRaftLog();
        RaftPeerId leaderId = leader.getId();
        ids.add(leaderId);
        RaftTestUtil.getStateMachineLogEntries((RaftLog)leaderLog, arg_0 -> ((Logger)this.LOG).info(arg_0));
        JavaUtils.attempt(() -> ServerRestartTests.assertLastLogEntry(leader), (int)20, (TimeDuration)HUNDRED_MILLIS, (String)"leader last metadata entry", (Logger)this.LOG);
        long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
        this.LOG.info("{}: leader lastIndex={}", (Object)leaderId, (Object)lastIndex);
        RaftProtos.LogEntryProto lastEntry = leaderLog.get(lastIndex);
        this.LOG.info("{}: leader lastEntry entry[{}] = {}", new Object[]{leaderId, lastIndex, LogProtoUtils.toLogEntryString((RaftProtos.LogEntryProto)lastEntry)});
        long loggedCommitIndex = lastEntry.getMetadataEntry().getCommitIndex();
        RaftProtos.LogEntryProto lastCommittedEntry = leaderLog.get(loggedCommitIndex);
        this.LOG.info("{}: leader lastCommittedEntry = entry[{}] = {}", new Object[]{leaderId, loggedCommitIndex, LogProtoUtils.toLogEntryString((RaftProtos.LogEntryProto)lastCommittedEntry)});
        SimpleStateMachine4Testing leaderStateMachine = SimpleStateMachine4Testing.get((RaftServer.Division)leader);
        TermIndex lastAppliedTermIndex = leaderStateMachine.getLastAppliedTermIndex();
        this.LOG.info("{}: leader lastAppliedTermIndex = {}", (Object)leaderId, (Object)lastAppliedTermIndex);
        for (RaftServer.Division s : cluster.iterateDivisions()) {
            if (s.getId().equals((Object)leaderId)) continue;
            ids.add(s.getId());
            JavaUtils.attempt(() -> RaftTestUtil.assertSameLog((RaftLog)leaderLog, (RaftLog)s.getRaftLog()), (int)10, (TimeDuration)HUNDRED_MILLIS, (String)("assertRaftLog-" + s.getId()), (Logger)this.LOG);
        }
        leaderStateMachine.takeSnapshot();
        leaderLog.truncate(lastIndex);
        ids.forEach(arg_0 -> ((MiniRaftCluster)cluster).killServer(arg_0));
        for (RaftPeerId id : ids) {
            cluster.restartServer(id, false);
            RaftServer.Division server = cluster.getDivision(id);
            RaftLog raftLog = server.getRaftLog();
            JavaUtils.attemptRepeatedly(() -> {
                Assertions.assertTrue((raftLog.getLastCommittedIndex() >= loggedCommitIndex ? 1 : 0) != 0);
                return null;
            }, (int)10, (TimeDuration)HUNDRED_MILLIS, (String)(id + "(commitIndex >= loggedCommitIndex)"), (Logger)this.LOG);
            JavaUtils.attemptRepeatedly(() -> {
                Assertions.assertTrue((server.getInfo().getLastAppliedIndex() >= loggedCommitIndex ? 1 : 0) != 0);
                return null;
            }, (int)10, (TimeDuration)HUNDRED_MILLIS, (String)(id + "(lastAppliedIndex >= loggedCommitIndex)"), (Logger)this.LOG);
            this.LOG.info("{}: commitIndex={}, lastAppliedIndex={}", new Object[]{id, raftLog.getLastCommittedIndex(), server.getInfo().getLastAppliedIndex()});
            cluster.killServer(id);
        }
    }

    static void assertLastLogEntry(RaftServer.Division server) throws RaftLogIOException {
        RaftLog raftLog = server.getRaftLog();
        long lastIndex = raftLog.getLastEntryTermIndex().getIndex();
        RaftProtos.LogEntryProto lastEntry = raftLog.get(lastIndex);
        Assertions.assertTrue((boolean)lastEntry.hasMetadataEntry());
        long loggedCommitIndex = lastEntry.getMetadataEntry().getCommitIndex();
        RaftProtos.LogEntryProto lastCommittedEntry = raftLog.get(loggedCommitIndex);
        Assertions.assertTrue((boolean)lastCommittedEntry.hasStateMachineLogEntry());
        SimpleStateMachine4Testing leaderStateMachine = SimpleStateMachine4Testing.get((RaftServer.Division)server);
        TermIndex lastAppliedTermIndex = leaderStateMachine.getLastAppliedTermIndex();
        Assertions.assertEquals((long)lastCommittedEntry.getTerm(), (long)lastAppliedTermIndex.getTerm());
        Assertions.assertTrue((lastCommittedEntry.getIndex() <= lastAppliedTermIndex.getIndex() ? 1 : 0) != 0);
    }

    @Test
    public void testRestartWithCorruptedLogEntryWithWarnAndReturn() throws Exception {
        RaftProperties p = this.getProperties();
        RaftServerConfigKeys.Log.CorruptionPolicy policy = RaftServerConfigKeys.Log.corruptionPolicy((RaftProperties)p);
        RaftServerConfigKeys.Log.setCorruptionPolicy((RaftProperties)p, (RaftServerConfigKeys.Log.CorruptionPolicy)RaftServerConfigKeys.Log.CorruptionPolicy.WARN_AND_RETURN);
        this.runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry);
        RaftServerConfigKeys.Log.setCorruptionPolicy((RaftProperties)p, (RaftServerConfigKeys.Log.CorruptionPolicy)policy);
    }

    @Test
    public void testRestartWithCorruptedLogEntryWithException() throws Exception {
        RaftProperties p = this.getProperties();
        RaftServerConfigKeys.Log.CorruptionPolicy policy = RaftServerConfigKeys.Log.corruptionPolicy((RaftProperties)p);
        RaftServerConfigKeys.Log.setCorruptionPolicy((RaftProperties)p, (RaftServerConfigKeys.Log.CorruptionPolicy)RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION);
        this.testFailureCase("restart-fail-ChecksumException", () -> this.runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry), CompletionException.class, new Class[]{IllegalStateException.class, ChecksumException.class});
        RaftServerConfigKeys.Log.setCorruptionPolicy((RaftProperties)p, (RaftServerConfigKeys.Log.CorruptionPolicy)policy);
    }

    private void runTestRestartWithCorruptedLogEntry(CLUSTER cluster) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        RaftPeerId id = leader.getId();
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)100);
        RaftTestUtil.SimpleMessage lastMessage = messages[messages.length - 1];
        try (RaftClient client = cluster.createClient();){
            for (RaftTestUtil.SimpleMessage m : messages) {
                Assertions.assertTrue((boolean)client.io().send((Message)m).isSuccess());
            }
            Assertions.assertTrue((boolean)client.io().sendReadOnly((Message)lastMessage).isSuccess());
        }
        RaftLog log = leader.getRaftLog();
        long size = TestSegmentedRaftLog.getOpenSegmentSize(log);
        leader.getRaftServer().close();
        File openLogFile = (File)JavaUtils.attemptRepeatedly(() -> ServerRestartTests.getOpenLogFile(leader), (int)10, (TimeDuration)HUNDRED_MILLIS, (String)(id + "-getOpenLogFile"), (Logger)this.LOG);
        try (RandomAccessFile raf = new RandomAccessFile(openLogFile, "rw");){
            long mid = size / 2L;
            raf.seek(mid);
            for (long i = mid; i < size; ++i) {
                raf.write(-1);
            }
        }
        cluster.restartServer(id, false);
        this.testFailureCase("last-entry-not-found", () -> {
            try (RaftClient client = cluster.createClient();){
                client.io().sendReadOnly((Message)lastMessage);
            }
        }, StateMachineException.class, new Class[]{IndexOutOfBoundsException.class});
    }
}

