/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.TestSetup;
import org.killbill.bus.DefaultPersistentBus;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.commons.eventbus.AllowConcurrentEvents;
import org.killbill.commons.eventbus.Subscribe;
import org.killbill.queue.QueueObjectMapper;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.skife.config.TimeSpan;
import org.skife.jdbi.v2.IDBI;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestReaperIntegration
extends TestSetup {
    private PersistentBusConfig config;
    private DefaultPersistentBus bus;
    private PersistentBusSqlDao sqlDao;
    private DummyHandler handler;

    @Override
    @BeforeClass(groups={"slow"})
    public void beforeClass() throws Exception {
        super.beforeClass();
        this.sqlDao = (PersistentBusSqlDao)this.getDBI().onDemand(PersistentBusSqlDao.class);
    }

    @Override
    @BeforeMethod(groups={"slow"})
    public void beforeMethod() throws Exception {
        super.beforeMethod();
        this.config = this.createConfig();
        this.bus = new DefaultPersistentBus((IDBI)this.dbi, (Clock)this.clock, this.config, this.metricRegistry, this.databaseTransactionNotificationApi);
        this.bus.startQueue();
        this.handler = new DummyHandler();
        this.bus.register((Object)this.handler);
    }

    @AfterMethod(groups={"slow"})
    public void afterMethod() throws Exception {
        this.bus.stopQueue();
    }

    @Test(groups={"slow"})
    public void testWithStuckEntryProcessedByAnotherNode() throws PersistentBus.EventBusException, JsonProcessingException, InterruptedException {
        DateTime now = this.clock.getUTCNow();
        DummyEvent event1 = new DummyEvent();
        this.bus.post((BusEvent)event1);
        this.handler.waitFor(event1);
        this.handler.assertSeenEvents(1);
        DummyEvent event2 = new DummyEvent();
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry("thatOtherNode", "thatOtherNode", now, event2, PersistentQueueEntryLifecycleState.IN_PROCESSING), this.config.getTableName());
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(1);
        DummyEvent event3 = new DummyEvent();
        this.bus.post((BusEvent)event3);
        this.handler.waitFor(event3);
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(2);
        this.clock.addDeltaFromReality(this.config.getReapThreshold().getMillis());
        this.handler.waitFor(event2);
        this.handler.assertSeenEvents(3);
    }

    @Test(groups={"slow"})
    public void testWithStuckEntryNonExistingNode() throws PersistentBus.EventBusException, JsonProcessingException, InterruptedException {
        DateTime now = this.clock.getUTCNow();
        DummyEvent event1 = new DummyEvent();
        this.bus.post((BusEvent)event1);
        this.handler.waitFor(event1);
        this.handler.assertSeenEvents(1);
        DummyEvent event2 = new DummyEvent();
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry("aws-compute-123456.internal", null, now, event2, PersistentQueueEntryLifecycleState.AVAILABLE), this.config.getTableName());
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(1);
        DummyEvent event3 = new DummyEvent();
        this.bus.post((BusEvent)event3);
        this.handler.waitFor(event3);
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(2);
        this.clock.addDeltaFromReality(this.config.getReapThreshold().getMillis());
        this.handler.waitFor(event2);
        this.handler.assertSeenEvents(3);
    }

    @Test(groups={"slow"})
    public void testWithStuckEntryProcessedByThisNode() throws PersistentBus.EventBusException, JsonProcessingException, InterruptedException {
        DateTime now = this.clock.getUTCNow();
        DummyEvent event1 = new DummyEvent();
        this.bus.post((BusEvent)event1);
        this.handler.waitFor(event1);
        this.handler.assertSeenEvents(1);
        DummyEvent event2 = new DummyEvent();
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry(CreatorName.get(), CreatorName.get(), now, event2, PersistentQueueEntryLifecycleState.IN_PROCESSING), this.config.getTableName());
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(1);
        DummyEvent event3 = new DummyEvent();
        this.bus.post((BusEvent)event3);
        this.handler.waitFor(event3);
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(2);
        this.clock.addDeltaFromReality(this.config.getReapThreshold().getMillis());
        Thread.sleep(500L);
        this.clock.addDeltaFromReality(1000L);
        this.handler.waitFor(event2);
        this.handler.assertSeenEvents(3);
        Iterable result = this.bus.getHistoricalBusEventsForSearchKey2(now, SEARCH_KEY_2);
        long nbItems = result.spliterator().getExactSizeIfKnown();
        Assert.assertEquals((long)nbItems, (long)4L);
    }

    @Test(groups={"slow"})
    public void testWithLateBusOnThisNode() throws PersistentBus.EventBusException, JsonProcessingException, InterruptedException {
        DateTime now = this.clock.getUTCNow();
        DummyEvent event1 = new DummyEvent();
        this.bus.post((BusEvent)event1);
        this.handler.waitFor(event1);
        this.handler.assertSeenEvents(1);
        DummyEvent event2 = new DummyEvent();
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry(CreatorName.get(), null, now.minusHours(2), null, event2, PersistentQueueEntryLifecycleState.IN_PROCESSING), this.config.getTableName());
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(1);
        DummyEvent event3 = new DummyEvent();
        this.bus.post((BusEvent)event3);
        this.handler.waitFor(event3);
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(2);
        this.clock.addDeltaFromReality(this.config.getReapThreshold().getMillis());
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(2);
    }

    @Test(groups={"slow"})
    public void testWithLateBusOnAnotherNode() throws PersistentBus.EventBusException, JsonProcessingException, InterruptedException {
        DateTime now = this.clock.getUTCNow();
        DummyEvent event1 = new DummyEvent();
        this.bus.post((BusEvent)event1);
        this.handler.waitFor(event1);
        this.handler.assertSeenEvents(1);
        DummyEvent event2 = new DummyEvent();
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry("thatOtherNode", "thatOtherNode", now.minusHours(2), now.plus(this.config.getClaimedTime().getMillis()), event2, PersistentQueueEntryLifecycleState.IN_PROCESSING), this.config.getTableName());
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(1);
        DummyEvent event3 = new DummyEvent();
        this.bus.post((BusEvent)event3);
        this.handler.waitFor(event3);
        this.handler.ensureNotSeen(event2);
        this.handler.assertSeenEvents(2);
        this.clock.addDeltaFromReality(this.config.getReapThreshold().getMillis());
        this.handler.waitFor(event2);
        this.handler.assertSeenEvents(3);
    }

    private BusEventModelDao createEntry(String creatingOwner, String processingOwner, DateTime createdDate, BusEvent event, PersistentQueueEntryLifecycleState state) throws JsonProcessingException {
        DateTime processingAvailableDate = createdDate.plus(this.config.getClaimedTime().getMillis());
        return this.createEntry(creatingOwner, processingOwner, createdDate, processingAvailableDate, event, state);
    }

    private BusEventModelDao createEntry(String creatingOwner, String processingOwner, DateTime createdDate, DateTime processingAvailableDate, BusEvent event, PersistentQueueEntryLifecycleState state) throws JsonProcessingException {
        return new BusEventModelDao(null, creatingOwner, processingOwner, createdDate, processingAvailableDate, state, event.getClass().getName(), QueueObjectMapper.get().writeValueAsString((Object)event), Long.valueOf(0L), event.getUserToken(), event.getSearchKey1(), event.getSearchKey2());
    }

    private PersistentBusConfig createConfig() {
        return new PersistentBusConfig(){

            public boolean isInMemory() {
                return false;
            }

            public int getMaxFailureRetries() {
                return 0;
            }

            public int getMinInFlightEntries() {
                return 0;
            }

            public int getMaxInFlightEntries() {
                return 0;
            }

            public int getMaxEntriesClaimed() {
                return 10;
            }

            public PersistentQueueConfig.PersistentQueueMode getPersistentQueueMode() {
                return PersistentQueueConfig.PersistentQueueMode.STICKY_POLLING;
            }

            public TimeSpan getClaimedTime() {
                return new TimeSpan("1m");
            }

            public long getPollingSleepTimeMs() {
                return 1L;
            }

            public boolean isProcessingOff() {
                return false;
            }

            public int geMaxDispatchThreads() {
                return 1;
            }

            public int geNbLifecycleDispatchThreads() {
                return 1;
            }

            public int geNbLifecycleCompleteThreads() {
                return 1;
            }

            public int getEventQueueCapacity() {
                return 1;
            }

            public String getTableName() {
                return "bus_events";
            }

            public String getHistoryTableName() {
                return "bus_events_history";
            }

            public TimeSpan getReapThreshold() {
                return new TimeSpan(5L, TimeUnit.MINUTES);
            }

            public int getMaxReDispatchCount() {
                return 10;
            }

            public TimeSpan getReapSchedule() {
                return new TimeSpan(1L, TimeUnit.SECONDS);
            }

            public TimeSpan getShutdownTimeout() {
                return new TimeSpan(5L, TimeUnit.SECONDS);
            }
        };
    }

    public static class DummyHandler {
        private final Set<UUID> receivedEvents = new HashSet<UUID>();

        DummyHandler() {
        }

        @AllowConcurrentEvents
        @Subscribe
        public void processEvent(BusEvent event) {
            this.receivedEvents.add(event.getUserToken());
        }

        void waitFor(final BusEvent event) {
            Awaitility.await().atMost(15L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return receivedEvents.contains(event.getUserToken());
                }
            });
        }

        void ensureNotSeen(BusEvent event) throws InterruptedException {
            Thread.sleep(1000L);
            Assert.assertFalse((boolean)this.receivedEvents.contains(event.getUserToken()));
        }

        void assertSeenEvents(int expected) {
            Assert.assertEquals((int)this.receivedEvents.size(), (int)expected);
        }
    }

    public static class DummyEvent
    implements BusEvent {
        private final String name;
        private final Long searchKey1;
        private final Long searchKey2;
        private final UUID userToken;

        @JsonCreator
        public DummyEvent(@JsonProperty(value="name") String name, @JsonProperty(value="searchKey1") Long searchKey1, @JsonProperty(value="searchKey2") Long searchKey2, @JsonProperty(value="userToken") UUID userToken) {
            this.name = name;
            this.searchKey2 = searchKey2;
            this.searchKey1 = searchKey1;
            this.userToken = userToken;
        }

        public DummyEvent() {
            this(UUID.randomUUID().toString(), SEARCH_KEY_1, SEARCH_KEY_2, UUID.randomUUID());
        }

        public String getName() {
            return this.name;
        }

        public Long getSearchKey1() {
            return this.searchKey1;
        }

        public Long getSearchKey2() {
            return this.searchKey2;
        }

        public UUID getUserToken() {
            return this.userToken;
        }

        public String toString() {
            StringBuffer sb = new StringBuffer("DummyEvent{");
            sb.append("name='").append(this.name).append('\'');
            sb.append(", searchKey1=").append(this.searchKey1);
            sb.append(", searchKey2=").append(this.searchKey2);
            sb.append(", userToken=").append(this.userToken);
            sb.append('}');
            return sb.toString();
        }
    }
}

