/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ratis.BaseTest;
import org.apache.ratis.OutputStreamBaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.impl.RaftOutputStream;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;

/*
 * Exception performing whole class analysis ignored.
 */
public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    private static final int NUM_SERVERS = 3;
    private static final byte[] BYTES = new byte[4];

    public OutputStream newOutputStream(CLUSTER cluster, int bufferSize) {
        return new RaftOutputStream(() -> cluster.createClient(), SizeInBytes.valueOf((long)bufferSize));
    }

    private static byte[] toBytes(int i) {
        byte[] b = BYTES;
        b[0] = (byte)(i >>> 24 & 0xFF);
        b[1] = (byte)(i >>> 16 & 0xFF);
        b[2] = (byte)(i >>> 8 & 0xFF);
        b[3] = (byte)(i & 0xFF);
        return b;
    }

    @Test
    public void testSimpleWrite() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestSimpleWrite(arg_0));
    }

    private void runTestSimpleWrite(CLUSTER cluster) throws Exception {
        int numRequests = 5000;
        int bufferSize = 4;
        RaftTestUtil.waitForLeader(cluster);
        try (OutputStream out = this.newOutputStream(cluster, 4);){
            for (int i = 0; i < 5000; ++i) {
                out.write(OutputStreamBaseTest.toBytes((int)i));
            }
        }
        RaftLog raftLog = cluster.getLeader().getRaftLog();
        AtomicInteger i = new AtomicInteger();
        this.checkLog(raftLog, 5000L, () -> OutputStreamBaseTest.toBytes((int)i.getAndIncrement()));
    }

    private void checkLog(RaftLog raftLog, long expectedCommittedIndex, Supplier<byte[]> s) throws IOException {
        long committedIndex = raftLog.getLastCommittedIndex();
        Assert.assertTrue((committedIndex >= expectedCommittedIndex ? 1 : 0) != 0);
        LogEntryHeader[] entries = raftLog.getEntries(0L, Long.MAX_VALUE);
        int count = 0;
        for (LogEntryHeader entry : entries) {
            RaftProtos.LogEntryProto log = raftLog.get(entry.getIndex());
            if (!log.hasStateMachineLogEntry()) continue;
            byte[] logData = log.getStateMachineLogEntry().getLogData().toByteArray();
            byte[] expected = s.get();
            String message = "log " + entry + " " + log.getLogEntryBodyCase() + " " + StringUtils.bytes2HexString((byte[])logData) + ", expected=" + StringUtils.bytes2HexString((byte[])expected);
            Assert.assertArrayEquals((String)message, (byte[])expected, (byte[])logData);
            ++count;
        }
        Assert.assertEquals((long)expectedCommittedIndex, (long)count);
    }

    @Test
    public void testWriteAndFlush() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestWriteAndFlush(arg_0));
    }

    private void runTestWriteAndFlush(CLUSTER cluster) throws Exception {
        int bufferSize = 1024;
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        OutputStream out = this.newOutputStream(cluster, 1024);
        int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
        ByteValue[] values = new ByteValue[lengths.length];
        for (int i = 0; i < values.length; ++i) {
            values[i] = new ByteValue(lengths[i], 9);
        }
        ArrayList expectedTxs = new ArrayList();
        for (ByteValue v : values) {
            byte[] data = v.genData();
            expectedTxs.addAll(v.getTransactions());
            out.write(data);
            out.flush();
            this.assertRaftLog(expectedTxs.size(), leader);
        }
        out.close();
        try {
            out.write(0);
            Assert.fail((String)"The OutputStream has been closed");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        this.LOG.info("Start to check leader's log");
        AtomicInteger index = new AtomicInteger(0);
        this.checkLog(leader.getRaftLog(), (long)expectedTxs.size(), () -> (byte[])expectedTxs.get(index.getAndIncrement()));
    }

    private RaftLog assertRaftLog(int expectedEntries, RaftServer.Division server) throws Exception {
        RaftLog raftLog = server.getRaftLog();
        EnumMap counts = RaftTestUtil.countEntries((RaftLog)raftLog);
        Assert.assertEquals((long)expectedEntries, (long)((AtomicLong)counts.get(RaftProtos.LogEntryProto.LogEntryBodyCase.STATEMACHINELOGENTRY)).get());
        RaftProtos.LogEntryProto last = RaftTestUtil.getLastEntry((RaftProtos.LogEntryProto.LogEntryBodyCase)RaftProtos.LogEntryProto.LogEntryBodyCase.STATEMACHINELOGENTRY, (RaftLog)raftLog);
        Assert.assertNotNull((Object)last);
        Assert.assertTrue((raftLog.getLastCommittedIndex() >= last.getIndex() ? 1 : 0) != 0);
        Assert.assertTrue((server.getInfo().getLastAppliedIndex() >= last.getIndex() ? 1 : 0) != 0);
        return raftLog;
    }

    @Test
    public void testWriteWithOffset() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestWriteWithOffset(arg_0));
    }

    private void runTestWriteWithOffset(CLUSTER cluster) throws Exception {
        int bufferSize = 1024;
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        OutputStream out = this.newOutputStream(cluster, 1024);
        byte[] b1 = new byte[512];
        Arrays.fill(b1, (byte)1);
        byte[] b2 = new byte[1024];
        Arrays.fill(b2, (byte)2);
        byte[] b3 = new byte[2560];
        Arrays.fill(b3, (byte)3);
        byte[] b4 = new byte[4096];
        Arrays.fill(b3, (byte)4);
        byte[] expected = new byte[8192];
        byte[][] data = new byte[][]{b1, b2, b3, b4};
        Random random = new Random();
        int totalSize = 0;
        for (byte[] b : data) {
            int toWrite;
            System.arraycopy(b, 0, expected, totalSize, b.length);
            totalSize += b.length;
            for (int written = 0; written < b.length; written += toWrite) {
                toWrite = random.nextInt(b.length - written) + 1;
                this.LOG.info("write {} bytes", (Object)toWrite);
                out.write(b, written, toWrite);
            }
        }
        out.close();
        int expectedEntries = 8;
        RaftLog raftLog = this.assertRaftLog(8, leader);
        LogEntryHeader[] entries = raftLog.getEntries(1L, Long.MAX_VALUE);
        byte[] actual = new byte[8192];
        totalSize = 0;
        for (LogEntryHeader ti : entries) {
            RaftProtos.LogEntryProto e = raftLog.get(ti.getIndex());
            if (!e.hasStateMachineLogEntry()) continue;
            byte[] eValue = e.getStateMachineLogEntry().getLogData().toByteArray();
            Assert.assertEquals((long)1024L, (long)eValue.length);
            System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
            totalSize += eValue.length;
        }
        Assert.assertArrayEquals((byte[])expected, (byte[])actual);
    }

    @Test
    public void testKillLeader() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestKillLeader(arg_0));
    }

    private void runTestKillLeader(CLUSTER cluster) throws Exception {
        int bufferSize = 4;
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        AtomicBoolean running = new AtomicBoolean(true);
        AtomicReference success = new AtomicReference();
        AtomicInteger result = new AtomicInteger(0);
        CountDownLatch latch = new CountDownLatch(1);
        new Thread(() -> {
            this.LOG.info("Writer thread starts");
            int count = 0;
            try (OutputStream out = this.newOutputStream(cluster, 4);){
                while (running.get()) {
                    out.write(OutputStreamBaseTest.toBytes((int)count++));
                    Thread.sleep(10L);
                }
                success.set(true);
                result.set(count);
            }
            catch (Exception e) {
                this.LOG.info("Got exception when writing", (Throwable)e);
                success.set(false);
            }
            finally {
                latch.countDown();
            }
        }).start();
        Thread.sleep(500L);
        RaftTestUtil.waitAndKillLeader(cluster);
        RaftServer.Division newLeader = RaftTestUtil.waitForLeader(cluster);
        Assert.assertNotEquals((Object)leader.getId(), (Object)newLeader.getId());
        Thread.sleep(500L);
        running.set(false);
        latch.await(5L, TimeUnit.SECONDS);
        this.LOG.info("Writer success? " + success.get());
        Assert.assertTrue((boolean)((Boolean)success.get()));
        this.LOG.info("last applied index: {}. total number of requests: {}", (Object)newLeader.getInfo().getLastAppliedIndex(), (Object)result.get());
        Assert.assertTrue((newLeader.getInfo().getLastAppliedIndex() >= (long)(result.get() + 1) ? 1 : 0) != 0);
    }
}

