/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ratis.BaseTest;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
import org.apache.ratis.examples.arithmetic.expression.BinaryExpression;
import org.apache.ratis.examples.arithmetic.expression.DoubleValue;
import org.apache.ratis.examples.arithmetic.expression.Expression;
import org.apache.ratis.examples.arithmetic.expression.Variable;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.Slf4jUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public class TestReadAfterWrite
extends BaseTest
implements MiniRaftClusterWithGrpc.FactoryGet {
    @Before
    public void setup() {
        Slf4jUtils.setLogLevel((Logger)ArithmeticStateMachine.LOG, (Level)Level.DEBUG);
        Slf4jUtils.setLogLevel((Logger)CodeInjectionForTesting.LOG, (Level)Level.DEBUG);
        Slf4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
        RaftServerTestUtil.setStateMachineUpdaterLogLevel((Level)Level.DEBUG);
        RaftProperties p = this.getProperties();
        p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, ArithmeticStateMachine.class, StateMachine.class);
        RaftServerConfigKeys.Read.setOption((RaftProperties)p, (RaftServerConfigKeys.Read.Option)RaftServerConfigKeys.Read.Option.LINEARIZABLE);
    }

    @Test
    public void testReadAfterWriteSingleServer() throws Exception {
        this.runWithNewCluster(1, cluster -> {
            try (RaftClient client = cluster.createClient();){
                this.runTestReadAfterWrite(client);
            }
        });
    }

    @Test
    public void testReadAfterWrite() throws Exception {
        this.runWithNewCluster(3, cluster -> {
            try (RaftClient client = cluster.createClient();){
                this.runTestReadAfterWrite(client);
            }
        });
    }

    void runTestReadAfterWrite(RaftClient client) throws Exception {
        Variable a = new Variable("a");
        BinaryExpression a_plus_2 = BinaryExpression.Op.ADD.apply((Expression)a, (Expression)new DoubleValue(2.0));
        AsyncApi async = client.async();
        int initialValue = 10;
        RaftClientReply assign = (RaftClientReply)async.send((Message)a.assign((Expression)new DoubleValue(10.0))).join();
        Assert.assertTrue((boolean)assign.isSuccess());
        Message query = Expression.Utils.toMessage((Expression)a);
        this.assertReply(async.sendReadOnly(query), 10);
        BlockingCode blockingCode = new BlockingCode();
        CodeInjectionForTesting.put((String)RaftServerImpl.APPEND_TRANSACTION, (CodeInjectionForTesting.Code)blockingCode);
        CompletableFuture plus2 = async.send((Message)a.assign((Expression)a_plus_2));
        CompletableFuture readOnlyUnordered = async.sendReadOnlyUnordered(query);
        CompletableFuture readAfterWrite = async.sendReadAfterWrite(query);
        Thread.sleep(1000L);
        this.assertReply(readOnlyUnordered, 10);
        this.LOG.info("readAfterWrite.get");
        try {
            RaftClientReply reply = (RaftClientReply)readAfterWrite.get(100L, TimeUnit.MILLISECONDS);
            DoubleValue result = (DoubleValue)Expression.Utils.bytes2Expression((byte[])reply.getMessage().getContent().toByteArray(), (int)0);
            Assert.fail((String)("result=" + result + ", reply=" + reply));
        }
        catch (TimeoutException e) {
            this.LOG.info("Good", (Throwable)e);
        }
        Assert.assertFalse((boolean)plus2.isDone());
        Assert.assertFalse((boolean)readAfterWrite.isDone());
        blockingCode.complete();
        this.assertReply(readAfterWrite, 12);
    }

    void assertReply(CompletableFuture<RaftClientReply> future, int expected) {
        this.LOG.info("assertReply, expected {}", (Object)expected);
        RaftClientReply reply = future.join();
        Assert.assertTrue((boolean)reply.isSuccess());
        this.LOG.info("reply {}", (Object)reply);
        DoubleValue result = (DoubleValue)Expression.Utils.bytes2Expression((byte[])reply.getMessage().getContent().toByteArray(), (int)0);
        Assert.assertEquals((long)expected, (long)((int)result.evaluate(null).doubleValue()));
    }

    static class BlockingCode
    implements CodeInjectionForTesting.Code {
        private final CompletableFuture<Void> future = new CompletableFuture();

        BlockingCode() {
        }

        void complete() {
            this.future.complete(null);
        }

        public boolean execute(Object localId, Object remoteId, Object ... args) {
            boolean blocked;
            boolean bl = blocked = !this.future.isDone();
            if (blocked) {
                LOG.info("Server {} blocks client {}: {}", new Object[]{localId, remoteId, args[0]});
            }
            this.future.join();
            if (blocked) {
                LOG.info("Server {} unblocks client {}", localId, remoteId);
            }
            return true;
        }
    }
}

