/*
 * Decompiled with CFR 0.152.
 */
package org.copperengine.performancetest.main;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.copperengine.core.PersistentProcessingEngine;
import org.copperengine.core.WorkflowInstanceDescr;
import org.copperengine.core.common.ProcessorPool;
import org.copperengine.core.persistent.PersistentPriorityProcessorPool;
import org.copperengine.performancetest.main.ConfigParameter;
import org.copperengine.performancetest.main.ConfigParameterGroup;
import org.copperengine.performancetest.main.PerformanceTestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThroughputPerformanceTest {
    private static final Logger logger = LoggerFactory.getLogger(ThroughputPerformanceTest.class);

    protected String createTestData(int size) {
        StringBuilder sb = new StringBuilder(size);
        Random r = new Random();
        for (int i = 0; i < size; ++i) {
            sb.append(r.nextInt(2) == 0 ? "0" : "1");
        }
        return sb.toString();
    }

    public void run() {
        try (PerformanceTestContext context = new PerformanceTestContext();){
            int numberOfExtraProcessorPools = context.getConfigManager().getConfigInt(ConfigParameter.THROUGHPUTTEST_NUMBER_OF_EXTRA_PROC_POOLS);
            int insertThreads = context.getConfigManager().getConfigInt(ConfigParameter.THROUGHPUTTEST_NUMBER_OF_INSERT_THREADS);
            int insertBatchSize = context.getConfigManager().getConfigInt(ConfigParameter.THROUGHPUTTEST_BATCHS_SIZE);
            int dataSize = context.getConfigManager().getConfigInt(ConfigParameter.THROUGHPUTTEST_DATA_SIZE);
            int numbOfWfI = context.getConfigManager().getConfigInt(ConfigParameter.THROUGHPUTTEST_NUMBER_OF_WORKFLOW_INSTANCES);
            String data = this.createTestData(dataSize);
            final PersistentProcessingEngine engine = context.getEngine();
            Semaphore semaphore = new Semaphore(numbOfWfI);
            context.registerBean("semaphore", semaphore);
            for (int i = 0; i < numberOfExtraProcessorPools; ++i) {
                int procPoolNumbOfThreads = context.getConfigManager().getConfigInt(ConfigParameter.PROC_POOL_NUMB_OF_THREADS);
                String ppoolId = "P" + i;
                logger.debug("Starting additional processor pool {} with {} threads", (Object)ppoolId, (Object)procPoolNumbOfThreads);
                PersistentPriorityProcessorPool pool = new PersistentPriorityProcessorPool(ppoolId, context.getTransactionController(), procPoolNumbOfThreads);
                pool.setDequeueBulkSize(context.getConfigManager().getConfigInt(ConfigParameter.PROC_DEQUEUE_BULK_SIZE));
                context.getProcessorPoolManager().addProcessorPool((ProcessorPool)pool);
            }
            context.getConfigManager().log(logger, ConfigParameterGroup.throughput, ConfigParameterGroup.common, context.isCassandraTest() ? ConfigParameterGroup.cassandra : ConfigParameterGroup.rdbms);
            logger.debug("number of insert threads is {}", (Object)insertThreads);
            logger.debug("insert batch size is {}", (Object)insertBatchSize);
            logger.debug("numberOfExtraProcessorPools is {}", (Object)numberOfExtraProcessorPools);
            logger.info("Starting throughput performance test with {} workflow instances and data size {} chars ...", (Object)numbOfWfI, (Object)dataSize);
            semaphore.acquire(numbOfWfI);
            long startTS = System.currentTimeMillis();
            ExecutorService pool = insertThreads >= 2 ? Executors.newFixedThreadPool(insertThreads) : null;
            ArrayList<WorkflowInstanceDescr> batch = new ArrayList<WorkflowInstanceDescr>();
            for (int i = 0; i < numbOfWfI; ++i) {
                String ppoolId = "P#DEFAULT";
                if (numberOfExtraProcessorPools > 0) {
                    ppoolId = "P" + i % numberOfExtraProcessorPools;
                }
                batch.add(new WorkflowInstanceDescr("org.copperengine.performancetest.workflows.WaitNotifyPerfTestWorkflow", (Object)data, engine.createUUID(), Integer.valueOf(1), ppoolId));
                if (batch.size() != insertBatchSize) continue;
                final ArrayList<WorkflowInstanceDescr> __batch = batch;
                Runnable r = new Runnable(){

                    @Override
                    public void run() {
                        try {
                            engine.runBatch(__batch);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                if (pool != null) {
                    pool.execute(r);
                } else {
                    r.run();
                }
                batch = new ArrayList();
            }
            if (!batch.isEmpty()) {
                engine.runBatch(batch);
            }
            if (pool != null) {
                pool.shutdown();
                pool.awaitTermination(10L, TimeUnit.MINUTES);
            }
            logger.info("Workflow instances started, waiting...");
            semaphore.acquire(numbOfWfI);
            long et = System.currentTimeMillis() - startTS;
            long avgWaitNotifyPerSecond = (long)numbOfWfI * 10L * 1000L / et;
            logger.info("Finished performance test with {} workflow instances in {} msec ==> {} wait/notify cycles per second", new Object[]{numbOfWfI, et, avgWaitNotifyPerSecond});
            Thread.sleep(5000L);
            logger.info("statistics:\n{}", (Object)context.getStatisticsCollector().print());
        }
        catch (Exception e) {
            logger.error("performance test failed", (Throwable)e);
        }
    }
}

