/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.perf;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.perf.Consumer;
import com.rabbitmq.perf.ExpectedMetrics;
import com.rabbitmq.perf.FixedValueIndicator;
import com.rabbitmq.perf.MulticastParams;
import com.rabbitmq.perf.NamedThreadFactory;
import com.rabbitmq.perf.PerfTest;
import com.rabbitmq.perf.Producer;
import com.rabbitmq.perf.ShutdownService;
import com.rabbitmq.perf.Utils;
import com.rabbitmq.perf.ValueIndicator;
import com.rabbitmq.perf.VariableValueIndicator;
import com.rabbitmq.perf.metrics.PerformanceMetrics;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MulticastSet {
    public static final int DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    private static final Logger LOGGER = LoggerFactory.getLogger(MulticastSet.class);
    private static final String PRODUCER_THREAD_PREFIX = "perf-test-producer-";
    static final String STOP_REASON_REACHED_TIME_LIMIT = "Reached time limit";
    private final PerformanceMetrics performanceMetrics;
    private final ConnectionFactory factory;
    private final MulticastParams params;
    private final String testID;
    private final List<String> uris;
    private final CompletionHandler completionHandler;
    private final ShutdownService shutdownService;
    private ThreadingHandler threadingHandler = new DefaultThreadingHandler();
    private final ValueIndicator<Float> rateIndicator;
    private final ValueIndicator<Integer> messageSizeIndicator;
    private final ValueIndicator<Long> consumerLatencyIndicator;
    private final ConnectionCreator connectionCreator;
    private final ExpectedMetrics expectedMetrics;

    public MulticastSet(PerformanceMetrics performanceMetrics, ConnectionFactory factory, MulticastParams params, List<String> uris, CompletionHandler completionHandler) {
        this(performanceMetrics, factory, params, "perftest", uris, completionHandler);
    }

    public MulticastSet(PerformanceMetrics performanceMetrics, ConnectionFactory factory, MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler) {
        this(performanceMetrics, factory, params, testID, uris, completionHandler, new ShutdownService(), new ExpectedMetrics(params, (MeterRegistry)new SimpleMeterRegistry(), "perftest_", Collections.emptyMap()));
    }

    public MulticastSet(PerformanceMetrics performanceMetrics, ConnectionFactory factory, MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler, ShutdownService shutdownService, ExpectedMetrics expectedMetrics) {
        ScheduledExecutorService scheduledExecutorService;
        this.performanceMetrics = performanceMetrics;
        this.factory = factory;
        this.params = params;
        this.testID = testID;
        this.uris = uris == null || uris.isEmpty() ? null : new CopyOnWriteArrayList<String>(uris);
        this.completionHandler = completionHandler;
        this.shutdownService = shutdownService;
        this.params.init();
        if (this.params.getPublishingRates() == null || this.params.getPublishingRates().isEmpty()) {
            this.rateIndicator = new FixedValueIndicator<Float>(Float.valueOf(params.getProducerRateLimit()));
        } else {
            scheduledExecutorService = this.threadingHandler.scheduledExecutorService("perf-test-variable-rate-scheduler", 1);
            this.rateIndicator = new VariableValueIndicator<Float>(params.getPublishingRates(), scheduledExecutorService, input -> Float.valueOf(input));
        }
        if (this.params.getMessageSizes() == null || this.params.getMessageSizes().isEmpty()) {
            this.messageSizeIndicator = new FixedValueIndicator<Integer>(params.getMinMsgSize());
        } else {
            scheduledExecutorService = this.threadingHandler.scheduledExecutorService("perf-test-variable-message-size-scheduler", 1);
            this.messageSizeIndicator = new VariableValueIndicator<Integer>(params.getMessageSizes(), scheduledExecutorService, input -> Integer.valueOf(input));
        }
        if (this.params.getConsumerLatencies() == null || this.params.getConsumerLatencies().isEmpty()) {
            this.consumerLatencyIndicator = new FixedValueIndicator<Long>(params.getConsumerLatencyInMicroseconds());
        } else {
            scheduledExecutorService = this.threadingHandler.scheduledExecutorService("perf-test-variable-consumer-latency-scheduler", 1);
            this.consumerLatencyIndicator = new VariableValueIndicator<Long>(params.getConsumerLatencies(), scheduledExecutorService, input -> Long.valueOf(input));
        }
        this.connectionCreator = new ConnectionCreator(this.factory, this.uris);
        this.expectedMetrics = expectedMetrics;
    }

    protected static int nbThreadsForConsumer(MulticastParams params) {
        return Math.min(params.getConsumerChannelCount(), DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE);
    }

    protected static int nbThreadsForProducerScheduledExecutorService(MulticastParams params) {
        int producerExecutorServiceNbThreads = params.getProducerSchedulerThreadCount();
        if (producerExecutorServiceNbThreads <= 0) {
            int producerThreadCount = params.getProducerThreadCount();
            Duration publishingInterval = params.getPublishingInterval() == null ? Duration.ofSeconds(1L) : params.getPublishingInterval();
            long publishingIntervalMs = publishingInterval.toMillis();
            double publishingIntervalSeconds = (double)publishingIntervalMs / 1000.0;
            double rate = (double)producerThreadCount / publishingIntervalSeconds;
            int threadCount = (int)(rate / 100.0) + 1;
            LOGGER.debug("Using {} thread(s) to schedule {} publisher(s) publishing every {} ms", new Object[]{threadCount, producerThreadCount, publishingInterval.toMillis()});
            return threadCount;
        }
        return producerExecutorServiceNbThreads;
    }

    public void run() throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException, ExecutionException {
        this.run(false);
    }

    public void run(boolean announceStartup) throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
        int n = this.params.getServersStartUpTimeout();
        int n2 = this.params.getServersUpLimit() == -1 ? (this.uris == null ? 0 : this.uris.size()) : this.params.getServersUpLimit();
        if (MulticastSet.waitUntilBrokerAvailableIfNecessary(n, n2, this.uris, this.factory)) {
            int shutdownTimeout;
            ScheduledExecutorService heartbeatSenderExecutorService = this.threadingHandler.scheduledExecutorService("perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads());
            this.factory.setHeartbeatExecutor(heartbeatSenderExecutorService);
            ExecutorService executorServiceConfigurationConnection = this.threadingHandler.executorService("perf-test-configuration-", 1);
            this.factory.setSharedExecutor(executorServiceConfigurationConnection);
            List<Connection> configurationConnections = this.createConfigurationConnections();
            List<MulticastParams.TopologyHandlerResult> topologyHandlerResults = this.params.configureAllQueues(configurationConnections);
            this.enableTopologyRecoveryIfNecessary(topologyHandlerResults);
            ScheduledExecutorService topologyRecordingScheduledExecutorService = null;
            if (!configurationConnections.isEmpty() && Utils.isRecoverable(configurationConnections.get(0))) {
                topologyRecordingScheduledExecutorService = this.threadingHandler.scheduledExecutorService("perf-test-topology-recovery-", 1);
            }
            this.params.resetTopologyHandler();
            Runnable[] consumerRunnables = new Consumer[this.params.getConsumerThreadCount()];
            Connection[] consumerConnections = new Connection[this.params.getConsumerCount()];
            Function<Integer, ExecutorService> consumersExecutorsFactory = this.createConsumersExecutorsFactory();
            this.createConsumers(announceStartup, consumerRunnables, consumerConnections, consumersExecutorsFactory, topologyRecordingScheduledExecutorService);
            this.params.resetTopologyHandler();
            AgentState[] producerStates = new AgentState[this.params.getProducerThreadCount()];
            Connection[] producerConnections = new Connection[this.params.getProducerCount()];
            ExecutorService executorServiceForProducersConsumers = this.threadingHandler.executorService("perf-test-producers-worker-", 0);
            this.factory.setSharedExecutor(executorServiceForProducersConsumers);
            this.createProducers(announceStartup, producerStates, producerConnections);
            this.startConsumers(consumerRunnables);
            this.startProducers(producerStates);
            if (this.params.getExitWhen() == PerfTest.EXIT_WHEN.EMPTY || this.params.getExitWhen() == PerfTest.EXIT_WHEN.IDLE) {
                ScheduledExecutorService scheduledExecutorService = this.threadingHandler.scheduledExecutorService("perf-test-queue-empty-consumer-idle-scheduler", 1);
                scheduledExecutorService.scheduleAtFixedRate(() -> MulticastSet.lambda$run$3((Consumer[])consumerRunnables), 2L, 1L, TimeUnit.SECONDS);
            }
            AutoCloseable shutdownSequence = (shutdownTimeout = this.params.getShutdownTimeout()) > 0 ? this.shutdownService.wrap(() -> {
                CountDownLatch latch = new CountDownLatch(1);
                Thread shutdownThread = new Thread(() -> {
                    if (this.params.isPolling()) {
                        Connection connection = null;
                        try {
                            connection = this.createConnection("perf-test-queue-deletion");
                            this.params.deleteAutoDeleteQueuesIfNecessary(connection);
                        }
                        catch (Exception e) {
                            LOGGER.warn("Error while trying to delete auto-delete queues");
                        }
                        finally {
                            if (connection != null) {
                                MulticastSet.dispose(connection);
                            }
                        }
                    }
                    if (Thread.interrupted()) {
                        return;
                    }
                    try {
                        this.shutdown(configurationConnections, consumerConnections, producerStates, producerConnections);
                    }
                    finally {
                        latch.countDown();
                    }
                });
                shutdownThread.start();
                boolean done = latch.await(shutdownTimeout, TimeUnit.SECONDS);
                if (!done) {
                    LOGGER.debug("Shutdown not completed in {} second(s), aborting.", (Object)shutdownTimeout);
                    shutdownThread.interrupt();
                }
            }) : () -> {};
            this.performanceMetrics.start();
            this.completionHandler.waitForCompletion();
            try {
                shutdownSequence.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            System.out.println("Could not connect to broker(s) in " + this.params.getServersStartUpTimeout() + " second(s), exiting.");
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    static boolean waitUntilBrokerAvailableIfNecessary(int startUpTimeoutInSeconds, int serversUpLimit, Collection<String> uris, ConnectionFactory factory) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, InterruptedException {
        if (startUpTimeoutInSeconds <= 0) return true;
        if (uris == null) return true;
        if (uris.isEmpty()) {
            return true;
        }
        ArrayList<String> tested = new ArrayList<String>(uris);
        ArrayList<String> connected = new ArrayList<String>();
        long started = System.nanoTime();
        while ((System.nanoTime() - started) / 1000000000L < (long)startUpTimeoutInSeconds) {
            Iterator iterator = tested.iterator();
            while (iterator.hasNext()) {
                String uri = (String)iterator.next();
                factory.setUri(uri);
                try {
                    Connection ignored = factory.newConnection("perf-test-test");
                    Throwable throwable = null;
                    try {
                        connected.add(uri);
                        if (connected.size() == serversUpLimit) {
                            uris.clear();
                            uris.addAll(connected);
                            boolean bl = true;
                            return bl;
                        }
                        iterator.remove();
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (ignored == null) continue;
                        if (throwable != null) {
                            try {
                                ignored.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        ignored.close();
                    }
                }
                catch (Exception e) {
                    LOGGER.info("Could not connect to broker " + factory.getHost() + ":" + factory.getPort());
                }
            }
            Thread.sleep(1000L);
        }
        return false;
    }

    Connection createConnection(String name) throws IOException, TimeoutException {
        return this.connectionCreator.createConnection(name);
    }

    List<Connection> createConfigurationConnections() throws IOException, TimeoutException {
        return this.connectionCreator.createConfigurationConnections();
    }

    private Function<Integer, ExecutorService> createConsumersExecutorsFactory() {
        Function<Integer, ExecutorService> consumersExecutorsFactory = this.params.isPolling() ? consumerNumber -> this.threadingHandler.executorService(String.format("perf-test-synchronous-consumer-%d-worker-", consumerNumber), this.params.getConsumerChannelCount() + 1) : (this.params.getConsumersThreadPools() > 0 ? new CacheConsumersExecutorsFactory(this.threadingHandler, this.params, this.params.getConsumersThreadPools()) : new NoCacheConsumersExecutorsFactory(this.threadingHandler, this.params));
        return consumersExecutorsFactory;
    }

    private void createConsumers(boolean announceStartup, Runnable[] consumerRunnables, Connection[] consumerConnections, Function<Integer, ExecutorService> consumersExecutorsFactory, ScheduledExecutorService topologyRecordingScheduledExecutorService) throws IOException, TimeoutException {
        for (int i = 0; i < consumerConnections.length; ++i) {
            Connection consumerConnection;
            if (announceStartup) {
                System.out.println("id: " + this.testID + ", starting consumer #" + i);
            }
            ExecutorService executorService = consumersExecutorsFactory.apply(i);
            this.factory.setSharedExecutor(executorService);
            consumerConnections[i] = consumerConnection = this.createConnection("perf-test-consumer-" + i);
            for (int j = 0; j < this.params.getConsumerChannelCount(); ++j) {
                if (announceStartup) {
                    System.out.println("id: " + this.testID + ", starting consumer #" + i + ", channel #" + j);
                }
                Consumer consumer = this.params.createConsumer(consumerConnection, this.performanceMetrics, this.consumerLatencyIndicator, this.completionHandler, executorService, topologyRecordingScheduledExecutorService);
                consumerRunnables[i * this.params.getConsumerChannelCount() + j] = consumer;
            }
        }
    }

    private void createProducers(boolean announceStartup, AgentState[] producerStates, Connection[] producerConnections) throws IOException, TimeoutException {
        for (int i = 0; i < producerConnections.length; ++i) {
            Connection producerConnection;
            if (announceStartup) {
                System.out.println("id: " + this.testID + ", starting producer #" + i);
            }
            producerConnections[i] = producerConnection = this.createConnection(PRODUCER_THREAD_PREFIX + i);
            for (int j = 0; j < this.params.getProducerChannelCount(); ++j) {
                if (announceStartup) {
                    System.out.println("id: " + this.testID + ", starting producer #" + i + ", channel #" + j);
                }
                AgentState agentState = new AgentState();
                agentState.runnable = this.params.createProducer(producerConnection, this.performanceMetrics, this.completionHandler, this.rateIndicator, this.messageSizeIndicator);
                producerStates[i * this.params.getProducerChannelCount() + j] = agentState;
            }
        }
    }

    private void startConsumers(Runnable[] consumerRunnables) throws InterruptedException {
        if (this.params.getConsumerStartDelay().getSeconds() <= 0L) {
            this.consumerLatencyIndicator.start();
            for (Runnable runnable : consumerRunnables) {
                runnable.run();
                if (!this.params.getConsumerSlowStart()) continue;
                System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
                Thread.sleep(1000L);
            }
        } else {
            this.threadingHandler.scheduledExecutorService("consumer-start-delay-", 0).schedule(() -> {
                this.consumerLatencyIndicator.start();
                for (Runnable runnable : consumerRunnables) {
                    runnable.run();
                    if (!this.params.getConsumerSlowStart()) continue;
                    System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }, this.params.getConsumerStartDelay().getSeconds(), TimeUnit.SECONDS);
        }
    }

    private void startProducers(AgentState[] producerStates) {
        Object producersExecutorService;
        this.messageSizeIndicator.start();
        Float rateIndication = this.rateIndicator.getValue();
        if (!this.rateIndicator.isVariable() && rateIndication.floatValue() >= 1.0f && rateIndication.floatValue() <= 10.0f) {
            Duration calculatedPublishingInterval = MulticastSet.rateToPublishingInterval(rateIndication.floatValue());
            LOGGER.debug("Rate between 1 and 10 messages / second, falling back to scheduling with {} ms as publishing interval", (Object)calculatedPublishingInterval.toMillis());
            this.params.setPublishingInterval(calculatedPublishingInterval);
        }
        this.expectedMetrics.register(this.rateIndicator, this.params.getPublishingInterval());
        if (this.params.getPublishingInterval() != null) {
            Supplier<Duration> startDelaySupplier;
            producersExecutorService = this.threadingHandler.scheduledExecutorService(PRODUCER_THREAD_PREFIX, MulticastSet.nbThreadsForProducerScheduledExecutorService(this.params));
            if (this.params.getProducerRandomStartDelayInSeconds() > 0) {
                LOGGER.debug("Using random start-up delay for producers, from 1 ms to {} s", (Object)this.params.getProducerRandomStartDelayInSeconds());
                Random random = new Random();
                int bound = this.params.getProducerRandomStartDelayInSeconds() * 1000;
                startDelaySupplier = () -> Duration.ofMillis(random.nextInt(bound) + 1);
            } else {
                LOGGER.debug("No start-up delay for producers, they are starting ASAP");
                startDelaySupplier = () -> Duration.ZERO;
            }
            Duration publishingInterval = this.params.getPublishingInterval();
            for (int i = 0; i < producerStates.length; ++i) {
                AgentState producerState = producerStates[i];
                Duration delay = startDelaySupplier.get();
                producerState.task = producersExecutorService.scheduleAtFixedRate(producerState.runnable.createRunnableForScheduling(), delay.toMillis(), publishingInterval.toMillis(), TimeUnit.MILLISECONDS);
            }
        } else if (!this.rateIndicator.isVariable() && this.rateIndicator.getValue().floatValue() == 0.0f) {
            for (AgentState producerState : producerStates) {
                producerState.task = Utils.NO_OP_FUTURE;
            }
        } else {
            this.rateIndicator.start();
            producersExecutorService = this.threadingHandler.executorService(PRODUCER_THREAD_PREFIX, producerStates.length);
            for (AgentState producerState : producerStates) {
                producerState.task = producersExecutorService.submit(producerState.runnable);
            }
        }
    }

    static Duration rateToPublishingInterval(double rate) {
        return Duration.ofMillis((long)(1.0 / rate * 1000.0));
    }

    private void shutdown(List<Connection> configurationConnections, Connection[] consumerConnections, AgentState[] producerStates, Connection[] producerConnections) {
        try {
            LOGGER.debug("Starting test shutdown");
            for (AgentState producerState : producerStates) {
                if (Thread.interrupted()) {
                    return;
                }
                boolean cancelled = producerState.task.cancel(true);
                LOGGER.debug("Producer has been correctly cancelled: {}", (Object)cancelled);
            }
            for (AgentState producerState : producerStates) {
                if (producerState.task.isDone()) continue;
                try {
                    if (Thread.interrupted()) {
                        return;
                    }
                    producerState.task.get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    LOGGER.debug("Error while waiting for producer to stop: {}. Moving on.", (Object)e.getMessage());
                }
            }
            if (Thread.interrupted()) {
                return;
            }
            if (!MulticastSet.closeConnections(configurationConnections.toArray(new Connection[0]))) {
                return;
            }
            if (!MulticastSet.closeConnections(producerConnections)) {
                return;
            }
            if (!MulticastSet.closeConnections(consumerConnections)) {
                return;
            }
            if (Thread.interrupted()) {
                return;
            }
            LOGGER.debug("Shutting down threading handler");
            this.threadingHandler.shutdown();
            LOGGER.debug("Threading handler shut down");
        }
        catch (Exception e) {
            LOGGER.warn("Error during test shutdown", (Throwable)e);
        }
    }

    private static boolean closeConnections(Connection[] connections) {
        for (Connection connection : connections) {
            if (Thread.interrupted()) {
                return false;
            }
            MulticastSet.dispose(connection);
        }
        return true;
    }

    private void enableTopologyRecoveryIfNecessary(List<MulticastParams.TopologyHandlerResult> topologyHandlerResults) throws IOException {
        for (final MulticastParams.TopologyHandlerResult topologyHandlerResult : topologyHandlerResults) {
            final Connection connection = topologyHandlerResult.connection;
            if (Utils.isRecoverable(topologyHandlerResult.connection)) {
                final String connectionName = connection.getClientProvidedName();
                ((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener(){

                    public void handleRecoveryStarted(Recoverable recoverable) {
                        LOGGER.debug("Connection recovery started for connection {}", (Object)connectionName);
                    }

                    public void handleRecovery(Recoverable recoverable) {
                        LOGGER.debug("Starting topology recovery for connection {}", (Object)connectionName);
                        topologyHandlerResult.topologyRecording.recover(connection);
                        LOGGER.debug("Topology recovery done for connection {}", (Object)connectionName);
                    }
                });
                continue;
            }
            connection.close();
        }
    }

    private static void dispose(Connection connection) {
        try {
            LOGGER.debug("Closing connection {}", (Object)connection.getClientProvidedName());
            connection.close(200, "Closed by PerfTest", 3000);
            LOGGER.debug("Connection {} has been closed", (Object)connection.getClientProvidedName());
        }
        catch (AlreadyClosedException e) {
            LOGGER.debug("Connection {} already closed", (Object)connection.getClientProvidedName());
        }
        catch (Exception e) {
            LOGGER.debug("Error while closing connection {}: {}", (Object)connection.getClientProvidedName(), (Object)e.getMessage());
        }
    }

    public void setThreadingHandler(ThreadingHandler threadingHandler) {
        this.threadingHandler = threadingHandler;
    }

    private static void recordReason(Map<String, Integer> reasons, String reason) {
        reasons.compute(reason, (keyReason, count) -> {
            int n;
            if (count == null) {
                n = 1;
            } else {
                count = count + 1;
                n = count;
            }
            return n;
        });
    }

    private static /* synthetic */ void lambda$run$3(Consumer[] consumerRunnables) {
        for (Consumer consumer : consumerRunnables) {
            try {
                consumer.maybeStopIfNoActivityOrQueueEmpty();
            }
            catch (Exception e) {
                LOGGER.info("Error while checking exit-when for consumer {}: {}", (Object)consumer, (Object)e.getMessage());
            }
        }
    }

    private static class ConnectionCreator {
        private final ConnectionFactory cf;
        private final List<Address> addresses;

        private ConnectionCreator(ConnectionFactory cf, List<String> uris) {
            this.cf = cf;
            if (uris == null || uris.isEmpty()) {
                this.addresses = Collections.emptyList();
            } else {
                ArrayList<Address> addresses = new ArrayList<Address>(uris.size());
                for (String uri : uris) {
                    try {
                        addresses.add(Utils.extract(uri));
                    }
                    catch (Exception e) {
                        throw new IllegalArgumentException("Could not parse URI: " + uri);
                    }
                }
                this.addresses = Collections.unmodifiableList(addresses);
            }
        }

        Connection createConnection(String name) throws IOException, TimeoutException {
            if (this.addresses.isEmpty()) {
                return this.cf.newConnection(name);
            }
            ArrayList<Address> addrs = new ArrayList<Address>(this.addresses);
            if (this.addresses.size() > 1) {
                Collections.shuffle(addrs);
            }
            return this.cf.newConnection(addrs, name);
        }

        List<Connection> createConfigurationConnections() throws IOException, TimeoutException {
            if (this.addresses.isEmpty()) {
                return Collections.singletonList(this.createConnection("perf-test-configuration-0"));
            }
            ArrayList<Connection> connections = new ArrayList<Connection>(this.addresses.size());
            for (int i = 0; i < this.addresses.size(); ++i) {
                connections.add(this.cf.newConnection(Collections.singletonList(this.addresses.get(i)), "perf-test-configuration-" + i));
            }
            return Collections.unmodifiableList(connections);
        }
    }

    static class CacheConsumersExecutorsFactory
    implements Function<Integer, ExecutorService> {
        private final ThreadingHandler threadingHandler;
        private final MulticastParams params;
        private final int modulo;
        private final List<ExecutorService> cache;

        CacheConsumersExecutorsFactory(ThreadingHandler threadingHandler, MulticastParams params, int modulo) {
            this.threadingHandler = threadingHandler;
            this.params = params;
            this.modulo = modulo;
            this.cache = new ArrayList<ExecutorService>(modulo);
            IntStream.range(0, modulo).forEach(i -> this.cache.add(null));
        }

        @Override
        public ExecutorService apply(Integer consumerNumber) {
            int remaining = consumerNumber % this.modulo;
            ExecutorService executorService = this.cache.get(remaining);
            if (executorService == null) {
                executorService = this.threadingHandler.executorService(String.format("perf-test-shared-consumer-worker-%d-", remaining), MulticastSet.nbThreadsForConsumer(this.params));
                this.cache.set(remaining, executorService);
            }
            return executorService;
        }
    }

    static class NoCacheConsumersExecutorsFactory
    implements Function<Integer, ExecutorService> {
        private final ThreadingHandler threadingHandler;
        private final MulticastParams params;

        NoCacheConsumersExecutorsFactory(ThreadingHandler threadingHandler, MulticastParams params) {
            this.threadingHandler = threadingHandler;
            this.params = params;
        }

        @Override
        public ExecutorService apply(Integer consumerNumber) {
            ExecutorService executorService = this.threadingHandler.executorService(String.format("perf-test-consumer-%d-worker-", consumerNumber), MulticastSet.nbThreadsForConsumer(this.params));
            return executorService;
        }
    }

    static class NoLimitCompletionHandler
    implements CompletionHandler {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final ConcurrentMap<String, Integer> reasons;

        NoLimitCompletionHandler(ConcurrentMap<String, Integer> reasons) {
            this.reasons = reasons;
        }

        @Override
        public void waitForCompletion() throws InterruptedException {
            this.latch.await();
        }

        @Override
        public void countDown(String reason) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Counting down ({})", (Object)reason);
            }
            MulticastSet.recordReason(this.reasons, reason);
            this.latch.countDown();
        }
    }

    static class DefaultCompletionHandler
    implements CompletionHandler {
        private final int timeLimit;
        private final CountDownLatch latch;
        private final ConcurrentMap<String, Integer> reasons;
        private final AtomicBoolean completed = new AtomicBoolean(false);

        DefaultCompletionHandler(int timeLimit, int countLimit, ConcurrentMap<String, Integer> reasons) {
            this.timeLimit = timeLimit;
            this.latch = new CountDownLatch(countLimit <= 0 ? 1 : countLimit);
            this.reasons = reasons;
        }

        @Override
        public void waitForCompletion() throws InterruptedException {
            if (this.timeLimit <= 0) {
                this.latch.await();
                this.completed.set(true);
            } else {
                boolean countedDown = this.latch.await(this.timeLimit, TimeUnit.SECONDS);
                this.completed.set(true);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Completed, counted down? {}", (Object)countedDown);
                }
                if (!countedDown) {
                    MulticastSet.recordReason(this.reasons, MulticastSet.STOP_REASON_REACHED_TIME_LIMIT);
                }
            }
        }

        @Override
        public void countDown(String reason) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Counting down ({})", (Object)reason);
            }
            if (!this.completed.get()) {
                MulticastSet.recordReason(this.reasons, reason);
                this.latch.countDown();
            }
        }
    }

    private static class AgentState {
        private Producer runnable;
        private Future<?> task;

        private AgentState() {
        }
    }

    static class DefaultThreadingHandler
    implements ThreadingHandler {
        private final Collection<ExecutorService> executorServices = new ArrayList<ExecutorService>();
        private final AtomicBoolean closing = new AtomicBoolean(false);
        private final String prefix;

        DefaultThreadingHandler(String prefix) {
            this.prefix = prefix;
        }

        DefaultThreadingHandler() {
            this("");
        }

        @Override
        public ExecutorService executorService(String name, int nbThreads) {
            if (nbThreads <= 0) {
                return this.create(() -> Executors.newSingleThreadExecutor(new NamedThreadFactory(this.prefix + name)));
            }
            return this.create(() -> Executors.newFixedThreadPool(nbThreads, new NamedThreadFactory(this.prefix + name)));
        }

        @Override
        public ScheduledExecutorService scheduledExecutorService(String name, int nbThreads) {
            return (ScheduledExecutorService)this.create(() -> Executors.newScheduledThreadPool(nbThreads, new NamedThreadFactory(name)));
        }

        private ExecutorService create(Supplier<ExecutorService> s) {
            ExecutorService executorService = s.get();
            this.executorServices.add(executorService);
            return executorService;
        }

        @Override
        public void shutdown() {
            if (this.closing.compareAndSet(false, true)) {
                for (ExecutorService executorService : this.executorServices) {
                    executorService.shutdownNow();
                    try {
                        boolean terminated = executorService.awaitTermination(10L, TimeUnit.SECONDS);
                        if (terminated) continue;
                        LoggerFactory.getLogger(DefaultThreadingHandler.class).warn("Some PerfTest tasks (producer, consumer, rate scheduler) didn't finish");
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            }
        }
    }

    public static interface CompletionHandler {
        public void waitForCompletion() throws InterruptedException;

        public void countDown(String var1);
    }

    static interface ThreadingHandler {
        public ExecutorService executorService(String var1, int var2);

        public ScheduledExecutorService scheduledExecutorService(String var1, int var2);

        public void shutdown();
    }
}

