/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.test;

import java.io.File;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieJournalRollingTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(BookieJournalRollingTest.class);
    private final BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;

    public BookieJournalRollingTest() {
        super(1);
        this.baseConf.setAllowEphemeralPorts(false);
    }

    @Override
    @Before
    public void setUp() throws Exception {
        this.baseConf.setMaxJournalSizeMB(1L);
        this.baseConf.setMaxBackupJournals(2);
        super.setUp();
    }

    @Override
    @After
    public void tearDown() throws Exception {
        super.tearDown();
    }

    protected LedgerHandle[] writeLedgerEntries(int numLedgers, int msgSize, int numMsgs) throws Exception {
        LedgerHandle[] lhs = new LedgerHandle[numLedgers];
        long[] ledgerIds = new long[numLedgers];
        for (int i = 0; i < numLedgers; ++i) {
            lhs[i] = this.bkc.createLedger(1, 1, this.digestType, "".getBytes());
            ledgerIds[i] = lhs[i].getId();
        }
        this.writeLedgerEntries(lhs, msgSize, numMsgs);
        return lhs;
    }

    protected void writeLedgerEntries(LedgerHandle[] lhs, int msgSize, int numMsgs) throws Exception {
        StringBuilder msgSB = new StringBuilder();
        for (int i = 0; i < msgSize; ++i) {
            msgSB.append("a");
        }
        String msg = msgSB.toString();
        final CountDownLatch completeLatch = new CountDownLatch(numMsgs * lhs.length);
        final AtomicInteger rc = new AtomicInteger(0);
        for (int i = 0; i < numMsgs; ++i) {
            for (int j = 0; j < lhs.length; ++j) {
                StringBuilder sb = new StringBuilder();
                sb.append(lhs[j].getId()).append('-').append(i).append('-').append(msg);
                lhs[j].asyncAddEntry(sb.toString().getBytes(), new AsyncCallback.AddCallback(){

                    public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
                        rc.compareAndSet(0, rc2);
                        completeLatch.countDown();
                    }
                }, null);
            }
        }
        completeLatch.await();
        if (rc.get() != 0) {
            throw BKException.create((int)rc.get());
        }
    }

    protected void validLedgerEntries(long[] ledgerIds, int msgSize, int numMsgs) throws Exception {
        LedgerHandle[] lhs = new LedgerHandle[ledgerIds.length];
        for (int i = 0; i < lhs.length; ++i) {
            lhs[i] = this.bkc.openLedger(ledgerIds[i], this.digestType, "".getBytes());
        }
        StringBuilder msgSB = new StringBuilder();
        for (int i = 0; i < msgSize; ++i) {
            msgSB.append("a");
        }
        String msg = msgSB.toString();
        int numToRead = 10;
        for (int j = 0; j < lhs.length; ++j) {
            int start = 0;
            int read = Math.min(numToRead, numMsgs - start);
            int end = start + read - 1;
            int entryId = 0;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Validating Entries of Ledger " + ledgerIds[j]);
            }
            while (start < numMsgs) {
                Enumeration seq = lhs[j].readEntries((long)start, (long)end);
                Assert.assertTrue((String)"Enumeration of ledger entries has no element", (boolean)seq.hasMoreElements());
                while (seq.hasMoreElements()) {
                    LedgerEntry e = (LedgerEntry)seq.nextElement();
                    Assert.assertEquals((long)entryId, (long)e.getEntryId());
                    StringBuilder sb = new StringBuilder();
                    sb.append(ledgerIds[j]).append('-').append(entryId).append('-').append(msg);
                    Assert.assertArrayEquals((byte[])sb.toString().getBytes(), (byte[])e.getEntry());
                    ++entryId;
                }
                Assert.assertEquals((long)(entryId - 1), (long)end);
                start = end + 1;
                read = Math.min(numToRead, numMsgs - start);
                end = start + read - 1;
            }
        }
        for (LedgerHandle lh : lhs) {
            lh.close();
        }
    }

    @Test
    public void testJournalRolling() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Testing Journal Rolling");
        }
        LedgerHandle[] lhs = this.writeLedgerEntries(4, 1024, 1024);
        long[] ledgerIds = new long[lhs.length];
        for (int i = 0; i < lhs.length; ++i) {
            ledgerIds[i] = lhs[i].getId();
            lhs[i].close();
        }
        Awaitility.await().untilAsserted(() -> {
            for (File journalDir : this.bookieJournalDirs()) {
                File[] journals = journalDir.listFiles();
                int numJournals = 0;
                for (File f : journals) {
                    if (!f.getName().endsWith(".txn")) continue;
                    ++numJournals;
                }
                Assert.assertTrue((numJournals <= 2 ? 1 : 0) != 0);
            }
        });
        this.restartBookies();
        this.validLedgerEntries(ledgerIds, 1024, 1024);
    }

    @Test
    public void testJournalRollingWithoutSyncup() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Testing Journal Rolling without sync up");
        }
        this.restartBookies(c -> {
            c.setFlushInterval(999999999);
            c.setAllowEphemeralPorts(false);
            return c;
        });
        LedgerHandle[] lhs = this.writeLedgerEntries(4, 1024, 1024);
        long[] ledgerIds = new long[lhs.length];
        for (int i = 0; i < lhs.length; ++i) {
            ledgerIds[i] = lhs[i].getId();
            lhs[i].close();
        }
        this.restartBookies();
        this.validLedgerEntries(ledgerIds, 1024, 1024);
    }

    @Test
    public void testReplayDeletedLedgerJournalEntries() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Testing replaying journal entries whose ledger has been removed.");
        }
        LedgerHandle[] lhs = this.writeLedgerEntries(1, 1024, 10);
        Thread.sleep(3 * this.baseConf.getFlushInterval());
        this.restartBookies(c -> {
            c.setFlushInterval(999999999);
            c.setAllowEphemeralPorts(false);
            return c;
        });
        this.writeLedgerEntries(lhs, 1024, 10);
        for (LedgerHandle lh : lhs) {
            this.bkc.deleteLedger(lh.getId());
        }
        Thread.sleep(2L * this.confByIndex(0).getGcWaitTime());
        this.restartBookies();
    }
}

