/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.test;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultipleThreadReadTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(MultipleThreadReadTest.class);
    BookKeeper.DigestType digestType;
    byte[] ledgerPassword = "aaa".getBytes();
    private int entriesPerLedger = 100;
    final SyncObj mainSyncObj = new SyncObj();
    BookKeeperTestClient readBkc;

    public MultipleThreadReadTest() {
        super(6);
        this.digestType = BookKeeper.DigestType.CRC32;
        this.baseClientConf.setAddEntryTimeout(20);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.readBkc = new BookKeeperTestClient(this.baseClientConf);
    }

    private Thread getWriterThread(final int tNo, final LedgerHandle lh, final AtomicBoolean resultHolder) {
        Thread t = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                SyncObj tSync = new SyncObj();
                for (int j = 0; j < MultipleThreadReadTest.this.entriesPerLedger; ++j) {
                    final byte[] entry = ("Entry-" + tNo + "-" + j).getBytes();
                    lh.asyncAddEntry(entry, new AsyncCallback.AddCallback(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void addComplete(int rc, LedgerHandle ledgerHandle, long eid, Object o) {
                            SyncObj syncObj;
                            SyncObj syncObj2 = syncObj = (SyncObj)o;
                            synchronized (syncObj2) {
                                if (rc != 0) {
                                    LOG.error("Add entry {} failed : rc = {}", (Object)new String(entry, Charsets.UTF_8), (Object)rc);
                                    syncObj.failed = true;
                                    syncObj.notify();
                                } else {
                                    ++syncObj.counter;
                                    syncObj.notify();
                                }
                            }
                        }
                    }, (Object)tSync);
                }
                SyncObj j = tSync;
                synchronized (j) {
                    while (!tSync.failed && tSync.counter < MultipleThreadReadTest.this.entriesPerLedger) {
                        try {
                            tSync.wait();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    resultHolder.set(!tSync.failed);
                }
                try {
                    lh.close();
                }
                catch (InterruptedException ie) {
                    LOG.error("Interrupted on closing ledger handle {} : ", (Object)lh.getId(), (Object)ie);
                    Thread.currentThread().interrupt();
                }
                catch (BKException bke) {
                    LOG.error("Error on closing ledger handle {} : ", (Object)lh.getId(), (Object)bke);
                }
            }
        }, "WriteThread(Lid=" + lh.getId() + ")");
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void uncaughtException(Thread thread, Throwable throwable) {
                SyncObj syncObj = MultipleThreadReadTest.this.mainSyncObj;
                synchronized (syncObj) {
                    MultipleThreadReadTest.this.mainSyncObj.failed = true;
                }
            }
        });
        return t;
    }

    private Thread getReaderThread(int tNo, final LedgerHandle lh, final int ledgerNumber, final AtomicBoolean resultHolder) {
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                long startEntryId = 0L;
                long eid = 0L;
                while (startEntryId <= (long)(MultipleThreadReadTest.this.entriesPerLedger - 1)) {
                    long endEntryId = Math.min(startEntryId + 10L - 1L, (long)(MultipleThreadReadTest.this.entriesPerLedger - 1));
                    long numEntries = endEntryId - startEntryId + 1L;
                    boolean success = true;
                    try {
                        Enumeration list = lh.readEntries(startEntryId, endEntryId);
                        int j = 0;
                        while ((long)j < numEntries) {
                            long curEid;
                            LedgerEntry e;
                            try {
                                e = (LedgerEntry)list.nextElement();
                            }
                            catch (NoSuchElementException exception) {
                                success = false;
                                break;
                            }
                            ++eid;
                            if (e.getEntryId() != curEid) {
                                LOG.error("Expected entry id {} for ledger {} but {} found.", new Object[]{curEid, lh.getId(), e.getEntryId()});
                                success = false;
                                break;
                            }
                            byte[] data = e.getEntry();
                            if (!Arrays.equals(("Entry-" + ledgerNumber + "-" + e.getEntryId()).getBytes(), data)) {
                                LOG.error("Expected entry data 'Entry-{}-{}' but {} found for ledger {}.", new Object[]{ledgerNumber, e.getEntryId(), new String(data, Charsets.UTF_8), lh.getId()});
                                success = false;
                                break;
                            }
                            ++j;
                        }
                        if (success) {
                            boolean bl = success = !list.hasMoreElements();
                            if (!success) {
                                LOG.error("Found more entries returned on reading ({}-{}) from ledger {}.", new Object[]{startEntryId, endEntryId, lh.getId()});
                            }
                        }
                    }
                    catch (InterruptedException ie) {
                        LOG.error("Interrupted on reading entries ({} - {}) from ledger {} : ", new Object[]{startEntryId, endEntryId, lh.getId(), ie});
                        Thread.currentThread().interrupt();
                        success = false;
                    }
                    catch (BKException bke) {
                        LOG.error("Failed on reading entries ({} - {}) from ledger {} : ", new Object[]{startEntryId, endEntryId, lh.getId(), bke});
                        success = false;
                    }
                    resultHolder.set(success);
                    if (!success) break;
                    startEntryId = endEntryId + 1L;
                }
            }
        }, "ReadThread(Tid =" + tNo + ", Lid=" + lh.getId() + ")");
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void uncaughtException(Thread thread, Throwable throwable) {
                LOG.error("Uncaught exception in thread {} : ", (Object)thread.getName(), (Object)throwable);
                SyncObj syncObj = MultipleThreadReadTest.this.mainSyncObj;
                synchronized (syncObj) {
                    MultipleThreadReadTest.this.mainSyncObj.failed = true;
                }
            }
        });
        return t;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void multiLedgerMultiThreadRead(int numLedgers, int numThreads) throws IOException {
        Assert.assertTrue((numLedgers != 0 && numThreads >= numLedgers && numThreads % numLedgers == 0 ? 1 : 0) != 0);
        try {
            Thread t;
            ArrayList<LedgerHandle> oldLedgerHandles = new ArrayList<LedgerHandle>();
            ArrayList<Long> ledgerIds = new ArrayList<Long>();
            ArrayList<Thread> threadList = new ArrayList<Thread>();
            ArrayList<AtomicBoolean> writeResults = new ArrayList<AtomicBoolean>();
            for (int i = 0; i < numLedgers; ++i) {
                LedgerHandle lh = this.bkc.createLedger(this.digestType, this.ledgerPassword);
                oldLedgerHandles.add(lh);
                ledgerIds.add(lh.getId());
                AtomicBoolean writeResult = new AtomicBoolean(false);
                writeResults.add(writeResult);
                t = this.getWriterThread(i, (LedgerHandle)oldLedgerHandles.get(i), writeResult);
                threadList.add(t);
                t.start();
            }
            for (Thread t2 : threadList) {
                t2.join();
            }
            SyncObj i = this.mainSyncObj;
            synchronized (i) {
                if (this.mainSyncObj.failed) {
                    Assert.fail((String)"Test failed because we encountered uncaught exception on adding entries.");
                }
            }
            for (int i2 = 0; i2 < numLedgers; ++i2) {
                Assert.assertTrue((String)("Failed on adding entries for ledger " + ((LedgerHandle)oldLedgerHandles.get(i2)).getId()), (boolean)((AtomicBoolean)writeResults.get(i2)).get());
            }
            for (LedgerHandle lh : oldLedgerHandles) {
                try {
                    lh.close();
                }
                catch (BKException.BKLedgerClosedException writeResult) {
                }
                catch (Exception e) {
                    Assert.fail((String)"Error while closing handle.");
                }
            }
            this.mainSyncObj.failed = false;
            threadList.clear();
            ArrayList<AtomicBoolean> readResults = new ArrayList<AtomicBoolean>();
            for (int i3 = 0; i3 < numThreads; ++i3) {
                AtomicBoolean readResult = new AtomicBoolean(false);
                t = this.getReaderThread(i3, this.readBkc.openLedger((Long)ledgerIds.get(i3 % numLedgers), this.digestType, this.ledgerPassword), i3 % numLedgers, readResult);
                threadList.add(t);
                readResults.add(readResult);
                t.start();
            }
            for (Thread t3 : threadList) {
                t3.join();
            }
            Iterator iterator = this.mainSyncObj;
            synchronized (iterator) {
                if (this.mainSyncObj.failed) {
                    Assert.fail((String)"Test failed because we encountered uncaught exception on reading entries");
                }
            }
            for (AtomicBoolean readResult : readResults) {
                Assert.assertTrue((String)"Failed on read entries", (boolean)readResult.get());
            }
        }
        catch (BKException e) {
            LOG.error("Test failed", (Throwable)e);
            Assert.fail((String)"Test failed due to BookKeeper exception");
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Test failed", (Throwable)e);
            Assert.fail((String)"Test failed due to interruption");
        }
    }

    @Test
    public void test10Ledgers20ThreadsRead() throws IOException {
        this.multiLedgerMultiThreadRead(10, 20);
    }

    @Test
    public void test10Ledgers200ThreadsRead() throws IOException {
        this.multiLedgerMultiThreadRead(10, 200);
    }

    @Test
    public void test1Ledger20ThreadsRead() throws IOException {
        this.multiLedgerMultiThreadRead(1, 20);
    }

    @Override
    public void tearDown() throws Exception {
        this.readBkc.close();
        super.tearDown();
    }

    class SyncObj {
        volatile int counter = 0;
        boolean failed = false;
    }
}

