/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ShadowManagedLedgerImplTest
extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(ShadowManagedLedgerImplTest.class);

    private ShadowManagedLedgerImpl openShadowManagedLedger(String name, String sourceName) throws ManagedLedgerException, InterruptedException {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setShadowSourceName(sourceName);
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("PULSAR.SHADOW_SOURCE", "source_topic");
        config.setProperties(properties);
        ManagedLedger shadowML = this.factory.open(name, config);
        Assert.assertTrue((boolean)(shadowML instanceof ShadowManagedLedgerImpl));
        return (ShadowManagedLedgerImpl)shadowML;
    }

    @Test
    public void testShadowWrites() throws Exception {
        ManagedLedgerImpl sourceML = (ManagedLedgerImpl)this.factory.open("source_ML", new ManagedLedgerConfig().setMaxEntriesPerLedger(2).setRetentionTime(-1, TimeUnit.DAYS).setRetentionSizeInMB(-1L));
        byte[] data = new byte[10];
        ArrayList<Position> positions = new ArrayList<Position>();
        for (int i = 0; i < 5; ++i) {
            Position pos = sourceML.addEntry(data);
            log.info("pos={}", (Object)pos);
            positions.add(pos);
        }
        log.info("currentLedgerId:{}", (Object)sourceML.currentLedger.getId());
        Assert.assertEquals((int)sourceML.ledgers.size(), (int)3);
        ShadowManagedLedgerImpl shadowML = this.openShadowManagedLedger("shadow_ML", "source_ML");
        Assert.assertEquals((int)shadowML.ledgers.size(), (int)3);
        Assert.assertEquals((long)sourceML.currentLedger.getId(), (long)shadowML.currentLedger.getId());
        Assert.assertEquals((Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        Position newPos = sourceML.addEntry(data);
        log.info("Source.LCE={},Shadow.LCE={}", (Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        Assert.assertNotEquals((Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        newPos = sourceML.addEntry(data);
        Assert.assertEquals((int)sourceML.ledgers.size(), (int)4);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)shadowML.ledgers.size(), (int)4));
        log.info("Source.LCE={},Shadow.LCE={}", (Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry));
        final CompletableFuture future = new CompletableFuture();
        shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                future.complete(position);
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }
        }, positions.get(2));
        Assert.assertEquals(future.get(), positions.get(2));
        log.info("1.Source.LCE={},Shadow.LCE={}", (Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        Assert.assertNotEquals((Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        newPos = sourceML.addEntry(data);
        Assert.assertEquals((int)sourceML.ledgers.size(), (int)4);
        Assert.assertNotEquals((Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        future = new CompletableFuture();
        shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                future.complete(position);
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(exception);
            }
        }, (Object)newPos);
        Assert.assertEquals(future.get(), (Object)newPos);
        log.info("2.Source.LCE={},Shadow.LCE={}", (Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        Assert.assertEquals((Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        PositionImpl fakePos = PositionImpl.get((long)(newPos.getLedgerId() + 1L), (long)newPos.getEntryId());
        final CompletableFuture future2 = new CompletableFuture();
        shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                future2.complete(position);
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                future2.completeExceptionally(exception);
            }
        }, (Object)fakePos);
        newPos = sourceML.addEntry(data);
        newPos = sourceML.addEntry(data);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)shadowML.ledgers.size(), (int)5));
        Assert.assertEquals(future2.get(), (Object)fakePos);
        log.info("3.Source.LCE={},Shadow.LCE={}", (Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
        Assert.assertEquals((Object)sourceML.lastConfirmedEntry, (Object)shadowML.lastConfirmedEntry);
    }
}

