/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.bus;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.awaitility.Awaitility;
import org.killbill.TestSetup;
import org.killbill.bus.DefaultPersistentBus;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.clock.Clock;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestLoadDefaultPersistentBus
extends TestSetup {
    private static final Logger log = LoggerFactory.getLogger(TestLoadDefaultPersistentBus.class);
    PersistentBus eventBus;

    @Override
    @BeforeClass(groups={"load"}, enabled=false)
    public void beforeClass() throws Exception {
        super.beforeClass();
        Properties properties = new Properties();
        properties.setProperty("org.killbill.persistent.bus.main.inflight.claimed", "500");
        properties.setProperty("org.killbill.persistent.bus.external.inMemory", "false");
        properties.setProperty("org.killbill.persistent.bus.main.nbThreads", "200");
        properties.setProperty("org.killbill.persistent.bus.main.queue.capacity", "20000");
        properties.setProperty("org.killbill.persistent.bus.main.sleep", "0");
        properties.setProperty("org.killbill.persistent.bus.main.sticky", "true");
        properties.setProperty("org.killbill.persistent.bus.main.useInflightQ", "true");
        PersistentBusConfig busConfig = (PersistentBusConfig)new ConfigurationObjectFactory(properties).buildWithReplacements(PersistentBusConfig.class, (Map)ImmutableMap.of((Object)"instanceName", (Object)"main"));
        this.eventBus = new DefaultPersistentBus((IDBI)this.dbi, (Clock)this.clock, busConfig, this.metricRegistry, this.databaseTransactionNotificationApi);
    }

    @Override
    @BeforeMethod(groups={"load"}, enabled=false)
    public void beforeMethod() throws Exception {
        super.beforeMethod();
        this.eventBus.startQueue();
    }

    @AfterMethod(groups={"load"}, enabled=false)
    public void afterMethod() throws Exception {
        this.eventBus.stopQueue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"load"}, enabled=false)
    public void testMultiThreadedLoad() throws Exception {
        LoadHandler consumer = new LoadHandler();
        this.eventBus.register((Object)consumer);
        Long targetEventsPerSecond = 700L;
        int testDurationMinutes = 2;
        Long nbEvents = targetEventsPerSecond * 2L * 60L;
        Producer producer = new Producer(nbEvents, targetEventsPerSecond);
        try {
            Thread producerThread = new Thread(producer);
            producerThread.start();
            consumer.waitForCompletion(nbEvents, 120000L);
        }
        finally {
            producer.stop();
        }
    }

    private final class Producer
    implements Runnable {
        private final AtomicBoolean isStarted = new AtomicBoolean(true);
        private final Long nbEvents;
        private final Long targetEventsPerSecond;

        public Producer(Long nbEvents, Long targetEventsPerSecond) {
            this.nbEvents = nbEvents;
            this.targetEventsPerSecond = targetEventsPerSecond;
        }

        public void stop() {
            this.isStarted.set(false);
        }

        @Override
        public void run() {
            int batchLengthSeconds = 10;
            long eventsPerBatch = 10L * this.targetEventsPerSecond;
            Long nbEventsSent = 0L;
            while (this.isStarted.get() && nbEventsSent < this.nbEvents) {
                Long t1 = System.currentTimeMillis();
                int i = 0;
                while ((long)i < eventsPerBatch) {
                    this.postEvent();
                    ++i;
                }
                Long delayMillis = System.currentTimeMillis() - t1;
                int maxDelayMillis = 10000;
                if (delayMillis > 10000L) {
                    log.warn("Generated {} entries in {}s - producer slow", (Object)eventsPerBatch, (Object)((double)delayMillis.longValue() / 1000.0));
                } else {
                    log.info("Generated {} entries in {}s", (Object)eventsPerBatch, (Object)((double)delayMillis.longValue() / 1000.0));
                    try {
                        Thread.sleep(10000L - delayMillis);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                nbEventsSent = nbEventsSent + eventsPerBatch;
            }
            log.info("Producer shutting down - {} events sent", (Object)nbEventsSent);
        }

        private void postEvent() {
            TestLoadDefaultPersistentBus.this.dbi.inTransaction((TransactionCallback)new TransactionCallback<Void>(){

                public Void inTransaction(Handle conn, TransactionStatus status) throws Exception {
                    Assert.assertEquals((int)conn.select("select now();", new Object[0]).size(), (int)1);
                    LoadBusEvent event = new LoadBusEvent();
                    TestLoadDefaultPersistentBus.this.eventBus.postFromTransaction((BusEvent)event, conn.getConnection());
                    return null;
                }
            });
        }
    }

    private static final class LoadBusEvent
    implements BusEvent {
        private final String payload;
        private final Long searchKey1;
        private final Long searchKey2;
        private final UUID userToken;

        public LoadBusEvent() {
            StringBuilder payloadBuilder = new StringBuilder();
            for (int i = 0; i < 15; ++i) {
                payloadBuilder.append(UUID.randomUUID().toString());
            }
            this.payload = payloadBuilder.toString();
            this.searchKey2 = 12L;
            this.searchKey1 = 42L;
            this.userToken = UUID.randomUUID();
        }

        @JsonCreator
        public LoadBusEvent(@JsonProperty(value="payload") String payload, @JsonProperty(value="searchKey1") Long searchKey1, @JsonProperty(value="searchKey2") Long searchKey2, @JsonProperty(value="userToken") UUID userToken) {
            this.payload = payload;
            this.searchKey2 = searchKey2;
            this.searchKey1 = searchKey1;
            this.userToken = userToken;
        }

        public String getPayload() {
            return this.payload;
        }

        public Long getSearchKey1() {
            return this.searchKey1;
        }

        public Long getSearchKey2() {
            return this.searchKey2;
        }

        public UUID getUserToken() {
            return this.userToken;
        }
    }

    public static final class LoadHandler {
        private final AtomicLong nbEvents = new AtomicLong(0L);
        private final AtomicLong nbEventsForLogging = new AtomicLong(0L);
        private final AtomicLong lastLogLineTime = new AtomicLong(System.currentTimeMillis());

        @AllowConcurrentEvents
        @Subscribe
        public void processEvent(LoadBusEvent event) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.nbEvents.incrementAndGet();
            this.nbEventsForLogging.incrementAndGet();
            long delayMillis = System.currentTimeMillis() - this.lastLogLineTime.get();
            if (delayMillis > 10000L) {
                log.info("Consumer processed {} events in {}s", (Object)this.nbEventsForLogging, (Object)((double)delayMillis / 1000.0));
                this.nbEventsForLogging.set(0L);
                this.lastLogLineTime.set(System.currentTimeMillis());
            }
        }

        public void waitForCompletion(final Long expectedEvents, long timeoutMs) {
            Awaitility.await().atMost(timeoutMs, TimeUnit.MILLISECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return nbEvents.get() == expectedEvents.longValue();
                }
            });
        }
    }
}

