/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.impl;

import java.util.concurrent.CompletableFuture;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.junit.Assert;
import org.junit.Test;

public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    @Test
    public void testStateMachineShutdownWaitsForApplyTxn() throws Exception {
        RaftProperties prop = this.getProperties();
        prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, StateMachineWithConditionalWait.class, StateMachine.class);
        MiniRaftCluster cluster = this.newCluster(3);
        cluster.start();
        RaftTestUtil.waitForLeader(cluster);
        RaftServer.Division leader = cluster.getLeader();
        RaftPeerId leaderId = leader.getId();
        ((StateMachineWithConditionalWait)leader.getStateMachine()).unBlockApplyTxn();
        ((StateMachineWithConditionalWait)cluster.getFollowers().get(0).getStateMachine()).unBlockApplyTxn();
        cluster.getLeaderAndSendFirstMessage(true);
        try (RaftClient client = cluster.createClient(leaderId);){
            client.io().send((Message)new RaftTestUtil.SimpleMessage("message"));
            RaftClientReply reply = client.io().send((Message)new RaftTestUtil.SimpleMessage("message2"));
            long logIndex = reply.getLogIndex();
            RaftClientReply watchReply = client.io().watch(logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
            watchReply.getCommitInfos().forEach(val -> Assert.assertTrue((val.getCommitIndex() >= logIndex ? 1 : 0) != 0));
            RaftServer.Division secondFollower = cluster.getFollowers().get(1);
            Assert.assertTrue((secondFollower.getInfo().getLastAppliedIndex() < logIndex ? 1 : 0) != 0);
            Thread t = new Thread(() -> ((RaftServer.Division)secondFollower).close());
            t.start();
            Assert.assertTrue((secondFollower.getInfo().getLastAppliedIndex() < logIndex ? 1 : 0) != 0);
            ((StateMachineWithConditionalWait)secondFollower.getStateMachine()).unBlockApplyTxn();
            t.join(5000L);
            Assert.assertEquals((long)logIndex, (long)secondFollower.getInfo().getLastAppliedIndex());
            cluster.shutdown();
        }
    }

    protected static class StateMachineWithConditionalWait
    extends SimpleStateMachine4Testing {
        private final Long objectToWait = 0L;
        volatile boolean blockOnApply = true;

        protected StateMachineWithConditionalWait() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
            if (this.blockOnApply) {
                Long l = this.objectToWait;
                synchronized (l) {
                    try {
                        this.objectToWait.wait();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException();
                    }
                }
            }
            RaftProtos.LogEntryProto entry = trx.getLogEntry();
            this.updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
            return CompletableFuture.completedFuture(new RaftTestUtil.SimpleMessage("done"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void unBlockApplyTxn() {
            this.blockOnApply = false;
            Long l = this.objectToWait;
            synchronized (l) {
                this.objectToWait.notifyAll();
            }
        }
    }
}

