/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopicTest;
import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleState;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.Watcher;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@PrepareForTest(value={ZooKeeperDataCache.class, BrokerService.class})
@PowerMockIgnore(value={"org.apache.logging.log4j.*"})
@Test(groups={"broker"})
public class PersistentSubscriptionTest {
    private PulsarService pulsarMock;
    private BrokerService brokerMock;
    private ManagedLedgerFactory mlFactoryMock;
    private ManagedLedger ledgerMock;
    private ManagedCursorImpl cursorMock;
    private ConfigurationCacheService configCacheServiceMock;
    private PersistentTopic topic;
    private PersistentSubscription persistentSubscription;
    private Consumer consumerMock;
    private ManagedLedgerConfig managedLedgerConfigMock;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    final String subName = "subscriptionName";
    final TxnID txnID1 = new TxnID(1L, 1L);
    final TxnID txnID2 = new TxnID(1L, 2L);
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;

    @BeforeMethod
    public void setup() throws Exception {
        this.executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-subscription-test").build();
        this.eventLoopGroup = new NioEventLoopGroup();
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        svcConfig.setBrokerShutdownTimeoutMs(0L);
        svcConfig.setTransactionCoordinatorEnabled(true);
        this.pulsarMock = (PulsarService)Mockito.spy((Object)new PulsarService(svcConfig));
        ((PulsarService)Mockito.doReturn((Object)new InMemTransactionBufferProvider()).when((Object)this.pulsarMock)).getTransactionBufferProvider();
        ((PulsarService)Mockito.doReturn((Object)new TransactionPendingAckStoreProvider(){

            public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) {
                return CompletableFuture.completedFuture(new PendingAckStore(){

                    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) {
                        try {
                            Field field = PendingAckHandleState.class.getDeclaredField("state");
                            field.setAccessible(true);
                            field.set(pendingAckHandle, PendingAckHandleState.State.Ready);
                        }
                        catch (IllegalAccessException | NoSuchFieldException e) {
                            Assert.fail();
                        }
                    }

                    public CompletableFuture<Void> closeAsync() {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendIndividualAck(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions) {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, PositionImpl position) {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendCommitMark(TxnID txnID, CommandAck.AckType ackType) {
                        return CompletableFuture.completedFuture(null);
                    }

                    public CompletableFuture<Void> appendAbortMark(TxnID txnID, CommandAck.AckType ackType) {
                        return CompletableFuture.completedFuture(null);
                    }
                });
            }
        }).when((Object)this.pulsarMock)).getTransactionPendingAckStoreProvider();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsarMock)).getConfiguration();
        ((PulsarService)Mockito.doReturn((Object)Mockito.mock(Compactor.class)).when((Object)this.pulsarMock)).getCompactor();
        this.mlFactoryMock = (ManagedLedgerFactory)Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService)Mockito.doReturn((Object)this.mlFactoryMock).when((Object)this.pulsarMock)).getManagedLedgerFactory();
        MockZooKeeper zkMock = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService)Mockito.doReturn((Object)zkMock).when((Object)this.pulsarMock)).getZkClient();
        ((PulsarService)Mockito.doReturn((Object)((Object)MockedPulsarServiceBaseTest.createMockBookKeeper(this.executor))).when((Object)this.pulsarMock)).getBookKeeperClient();
        ZooKeeperCache cache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache)Mockito.doReturn((Object)30).when((Object)cache)).getZkOperationTimeoutSeconds();
        CompletableFuture getDataFuture = new CompletableFuture();
        getDataFuture.complete(Optional.empty());
        ((ZooKeeperCache)Mockito.doReturn(getDataFuture).when((Object)cache)).getDataAsync(ArgumentMatchers.anyString(), (Watcher)ArgumentMatchers.any(), (ZooKeeperCache.Deserializer)ArgumentMatchers.any());
        ((PulsarService)Mockito.doReturn((Object)cache).when((Object)this.pulsarMock)).getLocalZkCache();
        this.configCacheServiceMock = (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zkPoliciesDataCacheMock = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkPoliciesDataCacheMock).when((Object)this.configCacheServiceMock)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheServiceMock).when((Object)this.pulsarMock)).getConfigurationCache();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.empty()).when((Object)zkPoliciesDataCacheMock)).get(ArgumentMatchers.anyString());
        LocalZooKeeperCacheService zkCacheMock = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when((Object)zkPoliciesDataCacheMock)).getAsync((String)ArgumentMatchers.any());
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)zkPoliciesDataCacheMock).when((Object)zkCacheMock)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)zkCacheMock).when((Object)this.pulsarMock)).getLocalZkCacheService();
        this.brokerMock = (BrokerService)Mockito.spy((Object)new BrokerService(this.pulsarMock, this.eventLoopGroup));
        ((BrokerService)Mockito.doNothing().when((Object)this.brokerMock)).unloadNamespaceBundlesGracefully();
        ((PulsarService)Mockito.doReturn((Object)this.brokerMock).when((Object)this.pulsarMock)).getBrokerService();
        this.ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedgerImpl.class);
        this.cursorMock = (ManagedCursorImpl)Mockito.mock(ManagedCursorImpl.class);
        this.managedLedgerConfigMock = (ManagedLedgerConfig)Mockito.mock(ManagedLedgerConfig.class);
        ((ManagedLedger)Mockito.doReturn((Object)new ManagedCursorContainer()).when((Object)this.ledgerMock)).getCursors();
        ((ManagedCursorImpl)Mockito.doReturn((Object)"mockCursor").when((Object)this.cursorMock)).getName();
        ((ManagedCursorImpl)Mockito.doReturn((Object)new PositionImpl(1L, 50L)).when((Object)this.cursorMock)).getMarkDeletedPosition();
        ((ManagedCursorImpl)Mockito.doReturn((Object)this.ledgerMock).when((Object)this.cursorMock)).getManagedLedger();
        ((ManagedLedger)Mockito.doReturn((Object)this.managedLedgerConfigMock).when((Object)this.ledgerMock)).getConfig();
        ((ManagedLedgerConfig)Mockito.doReturn((Object)false).when((Object)this.managedLedgerConfigMock)).isAutoSkipNonRecoverableData();
        this.topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerMock);
        this.consumerMock = (Consumer)Mockito.mock(Consumer.class);
        this.persistentSubscription = new PersistentSubscription(this.topic, "subscriptionName", (ManagedCursor)this.cursorMock, false);
    }

    @AfterMethod(alwaysRun=true)
    public void teardown() throws Exception {
        this.brokerMock.close();
        try {
            this.pulsarMock.close();
        }
        catch (Exception e) {
            log.warn("Failed to close pulsar service", (Throwable)e);
            throw e;
        }
        this.executor.shutdownNow();
        this.eventLoopGroup.shutdownGracefully().get();
    }

    @Test
    public void testCanAcknowledgeAndCommitForTransaction() throws ExecutionException, InterruptedException {
        ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.DeleteCallback)invocationOnMock.getArguments()[1]).deleteComplete(invocationOnMock.getArguments()[2]);
            return null;
        }).when((Object)this.cursorMock)).asyncDelete((Iterable)ArgumentMatchers.any(List.class), (AsyncCallbacks.DeleteCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteCallback.class), ArgumentMatchers.any());
        ArrayList<MutablePair> positionsPair = new ArrayList<MutablePair>();
        positionsPair.add(new MutablePair((Object)new PositionImpl(1L, 1L), (Object)0));
        positionsPair.add(new MutablePair((Object)new PositionImpl(1L, 3L), (Object)0));
        positionsPair.add(new MutablePair((Object)new PositionImpl(1L, 5L), (Object)0));
        ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
            Assert.assertTrue((boolean)Arrays.deepEquals(((List)invocationOnMock.getArguments()[0]).toArray(), positionsPair.toArray()));
            ((AsyncCallbacks.MarkDeleteCallback)invocationOnMock.getArguments()[2]).markDeleteComplete(invocationOnMock.getArguments()[3]);
            return null;
        }).when((Object)this.cursorMock)).asyncMarkDelete((Position)ArgumentMatchers.any(), (Map)ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback)ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
        this.persistentSubscription.transactionIndividualAcknowledge(this.txnID1, positionsPair);
        this.persistentSubscription.endTxn(this.txnID1.getMostSigBits(), this.txnID1.getLeastSigBits(), 0, -1L).get();
        ArrayList<PositionImpl> positions = new ArrayList<PositionImpl>();
        positions.add(new PositionImpl(3L, 100L));
        this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID1, positions);
        ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
            Assert.assertEquals((int)((PositionImpl)invocationOnMock.getArguments()[0]).compareTo(new PositionImpl(3L, 100L)), (int)0);
            ((AsyncCallbacks.MarkDeleteCallback)invocationOnMock.getArguments()[2]).markDeleteComplete(invocationOnMock.getArguments()[3]);
            return null;
        }).when((Object)this.cursorMock)).asyncMarkDelete((Position)ArgumentMatchers.any(), (Map)ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback)ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
        this.persistentSubscription.endTxn(this.txnID1.getMostSigBits(), this.txnID1.getLeastSigBits(), 0, -1L).get();
    }

    @Test
    public void testCanAcknowledgeAndAbortForTransaction() throws BrokerServiceException, InterruptedException {
        ArrayList<MutablePair> positionsPair = new ArrayList<MutablePair>();
        positionsPair.add(new MutablePair((Object)new PositionImpl(2L, 1L), (Object)0));
        positionsPair.add(new MutablePair((Object)new PositionImpl(2L, 3L), (Object)0));
        positionsPair.add(new MutablePair((Object)new PositionImpl(2L, 5L), (Object)0));
        ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.DeleteCallback)invocationOnMock.getArguments()[1]).deleteComplete(invocationOnMock.getArguments()[2]);
            return null;
        }).when((Object)this.cursorMock)).asyncDelete((Iterable)ArgumentMatchers.any(List.class), (AsyncCallbacks.DeleteCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteCallback.class), ArgumentMatchers.any());
        ((Consumer)Mockito.doReturn((Object)CommandSubscribe.SubType.Exclusive).when((Object)this.consumerMock)).subType();
        Awaitility.await().until(() -> {
            try {
                this.persistentSubscription.addConsumer(this.consumerMock);
                return true;
            }
            catch (Exception e) {
                return false;
            }
        });
        this.persistentSubscription.transactionIndividualAcknowledge(this.txnID1, positionsPair);
        ArrayList<PositionImpl> positions = new ArrayList<PositionImpl>();
        positions.add(new PositionImpl(1L, 100L));
        this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID1, positions);
        positions.clear();
        positions.add(new PositionImpl(2L, 1L));
        try {
            this.persistentSubscription.transactionIndividualAcknowledge(this.txnID2, positionsPair).get();
            Assert.fail((String)"Single acknowledge for transaction2 should fail. ");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((String)e.getCause().getMessage(), (String)"[persistent://prop/use/ns-abc/successTopic][subscriptionName] Transaction:(1,2) try to ack message:2:1 in pending ack status.");
        }
        positions.clear();
        positions.add(new PositionImpl(2L, 50L));
        try {
            this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID2, positions).get();
            Assert.fail((String)"Cumulative acknowledge for transaction2 should fail. ");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TransactionConflictException));
            Assert.assertEquals((String)e.getCause().getMessage(), (String)"[persistent://prop/use/ns-abc/successTopic][subscriptionName] Transaction:(1,2) try to cumulative batch ack position: 2:50 within range of current currentPosition: 1:100");
        }
        ArrayList<PositionImpl> positionList = new ArrayList<PositionImpl>();
        positionList.add(new PositionImpl(1L, 1L));
        positionList.add(new PositionImpl(1L, 3L));
        positionList.add(new PositionImpl(1L, 5L));
        positionList.add(new PositionImpl(3L, 1L));
        positionList.add(new PositionImpl(3L, 3L));
        positionList.add(new PositionImpl(3L, 5L));
        this.persistentSubscription.acknowledgeMessage(positionList, CommandAck.AckType.Individual, Collections.emptyMap());
        this.persistentSubscription.endTxn(this.txnID1.getMostSigBits(), this.txnID2.getLeastSigBits(), 1, -1L);
        positions.clear();
        positions.add(new PositionImpl(2L, 50L));
        this.persistentSubscription.transactionCumulativeAcknowledge(this.txnID2, positions);
        positionsPair.clear();
        positionsPair.add(new MutablePair((Object)new PositionImpl(2L, 1L), (Object)0));
        this.persistentSubscription.transactionIndividualAcknowledge(this.txnID2, positionsPair);
    }
}

