/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.perf;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.perf.Consumer;
import com.rabbitmq.perf.ConsumerParameters;
import com.rabbitmq.perf.LocalFilesMessageBodySource;
import com.rabbitmq.perf.MessageBodySource;
import com.rabbitmq.perf.MulticastSet;
import com.rabbitmq.perf.PerfTest;
import com.rabbitmq.perf.Producer;
import com.rabbitmq.perf.ProducerParameters;
import com.rabbitmq.perf.RandomJsonMessageBodySource;
import com.rabbitmq.perf.Recovery;
import com.rabbitmq.perf.StartListener;
import com.rabbitmq.perf.TimeSequenceMessageBodySource;
import com.rabbitmq.perf.TimestampProvider;
import com.rabbitmq.perf.TopologyRecording;
import com.rabbitmq.perf.Utils;
import com.rabbitmq.perf.ValueIndicator;
import com.rabbitmq.perf.metrics.PerformanceMetrics;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class MulticastParams {
    private long confirm = -1L;
    private int confirmTimeout = 30;
    private int consumerCount = 1;
    private int producerCount = 1;
    private int consumerChannelCount = 1;
    private int producerChannelCount = 1;
    private int consumerTxSize = 0;
    private int producerTxSize = 0;
    private int channelPrefetch = 0;
    private int consumerPrefetch = 0;
    private int minMsgSize = 0;
    private int timeLimit = 0;
    private float producerRateLimit = -1.0f;
    private float consumerRateLimit = -1.0f;
    private int producerMsgCount = 0;
    private int consumerMsgCount = 0;
    private boolean consumerSlowStart = false;
    private String exchangeName = "direct";
    private String exchangeType = "direct";
    private List<String> queueNames = new ArrayList<String>();
    private boolean queuesInSequence = false;
    private String routingKey = null;
    private boolean randomRoutingKey = false;
    private boolean skipBindingQueues = false;
    private List<String> flags = new ArrayList<String>();
    private int multiAckEvery = 0;
    private boolean autoAck = false;
    private boolean autoDelete = true;
    private List<String> bodyFiles = new ArrayList<String>();
    private String bodyContentType = null;
    private boolean predeclared = false;
    private boolean useMillis = false;
    private Map<String, Object> queueArguments = null;
    private String queuePattern = null;
    private int queueSequenceFrom = -1;
    private int queueSequenceTo = -1;
    private Map<String, Object> messageProperties = null;
    private TopologyHandler topologyHandler;
    private TopologyRecording topologyRecording;
    private int heartbeatSenderThreads = -1;
    private int routingKeyCacheSize = 0;
    private boolean exclusive = false;
    private Duration publishingInterval = null;
    private int producerRandomStartDelayInSeconds;
    private int producerSchedulerThreadCount = -1;
    private int consumersThreadPools = -1;
    private int shutdownTimeout = 5;
    private int serversStartUpTimeout = -1;
    private int serversUpLimit = -1;
    private List<String> publishingRates = new ArrayList<String>();
    private List<String> messageSizes = new ArrayList<String>();
    private long consumerLatencyInMicroseconds;
    private List<String> consumerLatencies = new ArrayList<String>();
    private boolean polling = false;
    private int pollingInterval = -1;
    private boolean nack = false;
    private boolean requeue = true;
    private boolean jsonBody = false;
    private int bodyFieldCount = 1000;
    private int bodyCount = 100;
    private Map<String, Object> consumerArguments = null;
    private PerfTest.EXIT_WHEN exitWhen = PerfTest.EXIT_WHEN.NEVER;
    private Duration consumerStartDelay = Duration.ofSeconds(-1L);
    private Map<String, Number> exposedMetrics = Collections.emptyMap();
    private AtomicReference<MessageBodySource> messageBodySourceReference = new AtomicReference();
    private boolean cluster = false;
    private StartListener startListener;

    public void setExchangeType(String exchangeType) {
        this.exchangeType = exchangeType;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public void setQueueNames(List<String> queueNames) {
        this.queueNames = queueNames == null ? new ArrayList<String>() : new ArrayList<String>(queueNames);
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public void setRandomRoutingKey(boolean randomRoutingKey) {
        this.randomRoutingKey = randomRoutingKey;
    }

    public void setSkipBindingQueues(boolean skipBindingQueues) {
        this.skipBindingQueues = skipBindingQueues;
    }

    public void setProducerRateLimit(float producerRateLimit) {
        this.producerRateLimit = producerRateLimit;
    }

    public void setProducerCount(int producerCount) {
        this.producerCount = producerCount;
    }

    public void setProducerChannelCount(int producerChannelCount) {
        this.producerChannelCount = producerChannelCount;
    }

    public void setConsumerRateLimit(float consumerRateLimit) {
        this.consumerRateLimit = consumerRateLimit;
    }

    public void setConsumerCount(int consumerCount) {
        this.consumerCount = consumerCount;
    }

    public void setConsumerChannelCount(int consumerChannelCount) {
        this.consumerChannelCount = consumerChannelCount;
    }

    public void setConsumerSlowStart(boolean slowStart) {
        this.consumerSlowStart = slowStart;
    }

    public void setProducerTxSize(int producerTxSize) {
        this.producerTxSize = producerTxSize;
    }

    public void setConsumerTxSize(int consumerTxSize) {
        this.consumerTxSize = consumerTxSize;
    }

    public void setConfirm(long confirm) {
        this.confirm = confirm;
    }

    long getConfirm() {
        return this.confirm;
    }

    public void setConfirmTimeout(int confirmTimeout) {
        this.confirmTimeout = confirmTimeout;
    }

    public void setAutoAck(boolean autoAck) {
        this.autoAck = autoAck;
    }

    public void setMultiAckEvery(int multiAckEvery) {
        this.multiAckEvery = multiAckEvery;
    }

    public void setChannelPrefetch(int channelPrefetch) {
        this.channelPrefetch = channelPrefetch;
    }

    public void setConsumerPrefetch(int consumerPrefetch) {
        this.consumerPrefetch = consumerPrefetch;
    }

    public void setMinMsgSize(int minMsgSize) {
        this.minMsgSize = minMsgSize;
    }

    public void setTimeLimit(int timeLimit) {
        this.timeLimit = timeLimit;
    }

    public void setUseMillis(boolean useMillis) {
        this.useMillis = useMillis;
    }

    public void setProducerMsgCount(int producerMsgCount) {
        this.producerMsgCount = producerMsgCount;
    }

    public void setConsumerMsgCount(int consumerMsgCount) {
        this.consumerMsgCount = consumerMsgCount;
    }

    public void setMsgCount(int msgCount) {
        this.setProducerMsgCount(msgCount);
        this.setConsumerMsgCount(msgCount);
    }

    public void setFlags(List<String> flags) {
        this.flags = flags;
    }

    List<String> getFlags() {
        return this.flags;
    }

    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }

    boolean isAutoDelete() {
        return this.autoDelete;
    }

    public void setPredeclared(boolean predeclared) {
        this.predeclared = predeclared;
    }

    public void setQueueArguments(Map<String, Object> queueArguments) {
        this.queueArguments = queueArguments;
    }

    public void setMessageProperties(Map<String, Object> messageProperties) {
        this.messageProperties = messageProperties;
    }

    public void setConsumersThreadPools(int consumersThreadPools) {
        this.consumersThreadPools = consumersThreadPools;
    }

    public void setShutdownTimeout(int shutdownTimeout) {
        this.shutdownTimeout = shutdownTimeout;
    }

    public void setServersStartUpTimeout(int serversStartUpTimeout) {
        this.serversStartUpTimeout = serversStartUpTimeout;
    }

    public void setServersUpLimit(int serversUpLimit) {
        this.serversUpLimit = serversUpLimit;
    }

    public void setPublishingRates(List<String> publishingRates) {
        this.publishingRates = publishingRates;
    }

    public void setConsumerArguments(Map<String, Object> consumerArguments) {
        this.consumerArguments = consumerArguments;
    }

    public void setExitWhen(PerfTest.EXIT_WHEN exitWhen) {
        this.exitWhen = exitWhen;
    }

    void setCluster(boolean cluster) {
        this.cluster = cluster;
    }

    void setConsumerStartDelay(Duration csd) {
        this.consumerStartDelay = csd;
    }

    public int getConsumerCount() {
        return this.consumerCount;
    }

    public int getConsumerChannelCount() {
        return this.consumerChannelCount;
    }

    public boolean getConsumerSlowStart() {
        return this.consumerSlowStart;
    }

    public int getConsumerThreadCount() {
        return this.consumerCount * this.consumerChannelCount;
    }

    public int getProducerCount() {
        return this.producerCount;
    }

    public int getProducerChannelCount() {
        return this.producerChannelCount;
    }

    public int getProducerThreadCount() {
        return this.producerCount * this.producerChannelCount;
    }

    public int getMinMsgSize() {
        return this.minMsgSize;
    }

    public float getProducerRateLimit() {
        return this.producerRateLimit;
    }

    public void setBodyFiles(List<String> bodyFiles) {
        this.bodyFiles = bodyFiles == null ? new ArrayList<String>() : new ArrayList<String>(bodyFiles);
    }

    List<String> getBodyFiles() {
        return Collections.unmodifiableList(this.bodyFiles);
    }

    Map<String, Object> getQueueArguments() {
        return this.queueArguments;
    }

    public void setBodyContentType(String bodyContentType) {
        this.bodyContentType = bodyContentType;
    }

    String getBodyContentType() {
        return this.bodyContentType;
    }

    public void setQueuePattern(String queuePattern) {
        this.queuePattern = queuePattern;
    }

    public void setQueueSequenceFrom(int queueSequenceFrom) {
        this.queueSequenceFrom = queueSequenceFrom;
    }

    public void setQueueSequenceTo(int queueSequenceTo) {
        this.queueSequenceTo = queueSequenceTo;
    }

    public void setHeartbeatSenderThreads(int heartbeatSenderThreads) {
        this.heartbeatSenderThreads = heartbeatSenderThreads;
    }

    public void setMessageSizes(List<String> messageSizes) {
        this.messageSizes = messageSizes;
    }

    public void setConsumerLatencyInMicroseconds(long consumerLatencyInMicroseconds) {
        this.consumerLatencyInMicroseconds = consumerLatencyInMicroseconds;
    }

    public void setConsumerLatencies(List<String> consumerLatencies) {
        this.consumerLatencies = consumerLatencies;
    }

    public int getHeartbeatSenderThreads() {
        return this.heartbeatSenderThreads <= 0 ? this.producerCount + this.consumerCount : this.heartbeatSenderThreads;
    }

    public int getTimeLimit() {
        return this.timeLimit;
    }

    public float getConsumerRateLimit() {
        return this.consumerRateLimit;
    }

    public int getProducerMsgCount() {
        return this.producerMsgCount;
    }

    public int getConsumerMsgCount() {
        return this.consumerMsgCount;
    }

    public void setRoutingKeyCacheSize(int routingKeyCacheSize) {
        this.routingKeyCacheSize = routingKeyCacheSize;
    }

    public int getConsumersThreadPools() {
        return this.consumersThreadPools;
    }

    public int getShutdownTimeout() {
        return this.shutdownTimeout;
    }

    public int getServersStartUpTimeout() {
        return this.serversStartUpTimeout;
    }

    public int getServersUpLimit() {
        return this.serversUpLimit;
    }

    public List<String> getPublishingRates() {
        return this.publishingRates;
    }

    public List<String> getMessageSizes() {
        return this.messageSizes;
    }

    public long getConsumerLatencyInMicroseconds() {
        return this.consumerLatencyInMicroseconds;
    }

    public List<String> getConsumerLatencies() {
        return this.consumerLatencies;
    }

    public PerfTest.EXIT_WHEN getExitWhen() {
        return this.exitWhen;
    }

    public Duration getConsumerStartDelay() {
        return this.consumerStartDelay;
    }

    public void setPolling(boolean polling) {
        this.polling = polling;
    }

    public boolean isPolling() {
        return this.polling;
    }

    public void setPollingInterval(int pollingInterval) {
        this.pollingInterval = pollingInterval;
    }

    public void setNack(boolean nack) {
        this.nack = nack;
    }

    public void setRequeue(boolean requeue) {
        this.requeue = requeue;
    }

    public void setJsonBody(boolean jsonBody) {
        this.jsonBody = jsonBody;
    }

    public void setBodyFieldCount(int bodyFieldCount) {
        this.bodyFieldCount = bodyFieldCount;
    }

    public void setBodyCount(int bodyCount) {
        this.bodyCount = bodyCount;
    }

    public void setQueuesInSequence(boolean queuesInSequence) {
        this.queuesInSequence = queuesInSequence;
    }

    public void setStartListener(StartListener startListener) {
        this.startListener = startListener;
    }

    public Producer createProducer(Connection connection, PerformanceMetrics performanceMetrics, MulticastSet.CompletionHandler completionHandler, ValueIndicator<Float> rateIndicator, ValueIndicator<Integer> messageSizeIndicator) throws IOException {
        MessageBodySource messageBodySource;
        TimestampProvider tsp;
        Channel channel = connection.createChannel();
        if (this.producerTxSize > 0) {
            channel.txSelect();
        }
        if (this.confirm >= 0L) {
            channel.confirmSelect();
        }
        TopologyRecording topologyRecording = new TopologyRecording(this.isPolling(), this.cluster);
        if (!this.predeclared || !MulticastParams.exchangeExists(connection, this.exchangeName)) {
            Utils.exchangeDeclare(channel, this.exchangeName, this.exchangeType);
            topologyRecording.recordExchange(this.exchangeName, this.exchangeType);
        }
        if (this.bodyFiles.size() > 0) {
            tsp = new TimestampProvider(this.useMillis, true);
            messageBodySource = new LocalFilesMessageBodySource(this.bodyFiles, this.bodyContentType);
        } else if (this.jsonBody) {
            tsp = new TimestampProvider(this.useMillis, true);
            if (this.messageBodySourceReference.get() == null) {
                this.messageBodySourceReference.set(new RandomJsonMessageBodySource(this.minMsgSize, this.bodyFieldCount, this.bodyCount));
            }
            messageBodySource = this.messageBodySourceReference.get();
        } else {
            tsp = new TimestampProvider(this.useMillis, false);
            messageBodySource = new TimeSequenceMessageBodySource(tsp, messageSizeIndicator);
        }
        Recovery.RecoveryProcess recoveryProcess = Recovery.setupRecoveryProcess(connection, topologyRecording);
        Producer producer = new Producer(new ProducerParameters().setChannel(channel).setExchangeName(this.exchangeName).setId(this.topologyHandler.getRoutingKey()).setRandomRoutingKey(this.randomRoutingKey).setFlags(this.flags).setTxSize(this.producerTxSize).setMsgLimit(this.producerMsgCount).setConfirm(this.confirm).setConfirmTimeout(this.confirmTimeout).setMessageBodySource(messageBodySource).setTsp(tsp).setPerformanceMetrics(performanceMetrics).setMessageProperties(this.messageProperties).setCompletionHandler(completionHandler).setRoutingKeyCacheSize(this.routingKeyCacheSize).setRandomStartDelayInSeconds(this.producerRandomStartDelayInSeconds).setRecoveryProcess(recoveryProcess).setRateIndicator(rateIndicator).setStartListener(this.startListener));
        channel.addReturnListener((ReturnListener)producer);
        channel.addConfirmListener((ConfirmListener)producer);
        this.topologyHandler.next();
        return producer;
    }

    public Consumer createConsumer(Connection connection, PerformanceMetrics performanceMetrics, ValueIndicator<Long> consumerLatenciesIndicator, MulticastSet.CompletionHandler completionHandler, ExecutorService executorService, ScheduledExecutorService topologyRecordingScheduledExecutorService) throws IOException {
        TopologyHandlerResult topologyHandlerResult = this.topologyHandler.configureQueuesForClient(connection);
        connection = topologyHandlerResult.connection;
        Channel channel = connection.createChannel();
        if (this.consumerTxSize > 0) {
            channel.txSelect();
        }
        if (this.consumerPrefetch > 0) {
            channel.basicQos(this.consumerPrefetch);
        }
        if (this.channelPrefetch > 0) {
            channel.basicQos(this.channelPrefetch, true);
        }
        boolean timestampInHeader = this.bodyFiles.size() > 0 || this.jsonBody;
        TimestampProvider tsp = new TimestampProvider(this.useMillis, timestampInHeader);
        Recovery.RecoveryProcess recoveryProcess = Recovery.setupRecoveryProcess(connection, topologyHandlerResult.topologyRecording);
        Consumer consumer = new Consumer(new ConsumerParameters().setChannel(channel).setId(this.topologyHandler.getRoutingKey()).setQueueNames(topologyHandlerResult.configuredQueues).setTxSize(this.consumerTxSize).setAutoAck(this.autoAck).setMultiAckEvery(this.multiAckEvery).setPerformanceMetrics(performanceMetrics).setRateLimit(this.consumerRateLimit).setMsgLimit(this.consumerMsgCount).setConsumerLatencyIndicator(consumerLatenciesIndicator).setTimestampProvider(tsp).setCompletionHandler(completionHandler).setRecoveryProcess(recoveryProcess).setExecutorService(executorService).setPolling(this.polling).setPollingInterval(this.pollingInterval).setNack(this.nack).setRequeue(this.requeue).setConsumerArguments(this.consumerArguments).setExitWhen(this.exitWhen).setTopologyRecoveryScheduledExecutorService(topologyRecordingScheduledExecutorService).setStartListener(this.startListener));
        this.topologyHandler.next();
        return consumer;
    }

    public List<TopologyHandlerResult> configureAllQueues(List<Connection> connections) throws IOException {
        return this.topologyHandler.configureAllQueues(connections);
    }

    public void init() {
        this.topologyRecording = new TopologyRecording(this.isPolling(), this.cluster);
        this.topologyHandler = this.queuePattern == null && !this.queuesInSequence ? new FixedQueuesTopologyHandler(this, this.routingKey, this.queueNames, this.topologyRecording) : (this.queuePattern == null && this.queuesInSequence ? new SequenceTopologyHandler(this, this.queueNames, this.topologyRecording, this.routingKey) : new SequenceTopologyHandler(this, this.queueSequenceFrom, this.queueSequenceTo, this.queuePattern, this.topologyRecording, this.routingKey));
    }

    public void resetTopologyHandler() {
        this.topologyHandler.reset();
    }

    public void deleteAutoDeleteQueuesIfNecessary(Connection connection) throws IOException, TimeoutException {
        if (this.polling) {
            try (Channel channel = connection.createChannel();){
                for (TopologyRecording.RecordedQueue queue : this.topologyRecording.queues()) {
                    if (!queue.isAutoDelete() || queue.isExclusive()) continue;
                    if (Thread.interrupted()) {
                        return;
                    }
                    channel.queueDelete(queue.name());
                }
            }
        }
    }

    private static boolean exchangeExists(Connection connection, String exchangeName) throws IOException {
        if ("".equals(exchangeName) || exchangeName.startsWith("amq.")) {
            return true;
        }
        return Utils.exists(connection, ch -> ch.exchangeDeclarePassive(exchangeName));
    }

    private static boolean queueExists(Connection connection, String queueName) throws IOException {
        return queueName != null && Utils.exists(connection, ch -> ch.queueDeclarePassive(queueName));
    }

    public boolean hasLimit() {
        return this.timeLimit > 0 || this.consumerMsgCount > 0 || this.producerMsgCount > 0 || this.exitWhen == PerfTest.EXIT_WHEN.EMPTY || this.exitWhen == PerfTest.EXIT_WHEN.IDLE;
    }

    public void setExclusive(boolean exclusive) {
        this.exclusive = exclusive;
    }

    public boolean isExclusive() {
        return this.exclusive;
    }

    public void setPublishingInterval(Duration publishingInterval) {
        this.publishingInterval = publishingInterval;
    }

    public Duration getPublishingInterval() {
        return this.publishingInterval;
    }

    public void setProducerRandomStartDelayInSeconds(int producerRandomStartDelayInSeconds) {
        this.producerRandomStartDelayInSeconds = producerRandomStartDelayInSeconds;
    }

    public int getProducerRandomStartDelayInSeconds() {
        return this.producerRandomStartDelayInSeconds;
    }

    public int getProducerSchedulerThreadCount() {
        return this.producerSchedulerThreadCount;
    }

    public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) {
        this.producerSchedulerThreadCount = producerSchedulerThreadCount;
    }

    static class SequenceTopologyHandler
    extends TopologyHandlerSupport
    implements TopologyHandler {
        final List<String> queues;
        int index = 0;
        private final TopologyRecording topologyRecording;
        private final String routingKey;

        public SequenceTopologyHandler(MulticastParams params, int from, int to, String queuePattern, TopologyRecording topologyRecording, String routingKey) {
            super(params);
            this.queues = new ArrayList<String>(to - from + 1);
            for (int i = from; i <= to; ++i) {
                this.queues.add(String.format(queuePattern, i));
            }
            this.topologyRecording = topologyRecording;
            this.routingKey = routingKey;
        }

        public SequenceTopologyHandler(MulticastParams params, List<String> queues, TopologyRecording topologyRecording, String routingKey) {
            super(params);
            this.queues = new ArrayList<String>(queues);
            this.topologyRecording = topologyRecording;
            this.routingKey = routingKey;
        }

        @Override
        public String getRoutingKey() {
            return this.routingKey == null ? this.getQueueNamesForClient().get(0) : this.routingKey;
        }

        @Override
        public TopologyHandlerResult configureQueuesForClient(Connection connection) throws IOException {
            if (this.params.isExclusive()) {
                Connection connectionToUse = this.maybeUseCachedConnection(this.getQueueNamesForClient(), connection);
                return this.configureQueues(connectionToUse, this.getQueueNamesForClient(), this.topologyRecording, () -> {});
            }
            List<String> queues = this.getQueueNamesForClient();
            List<String> queuesForSubRecording = this.params.predeclared ? Collections.emptyList() : queues;
            TopologyRecording clientTopologyRecording = this.topologyRecording.subRecording(queuesForSubRecording);
            return new TopologyHandlerResult(connection, queues, clientTopologyRecording);
        }

        @Override
        public List<TopologyHandlerResult> configureAllQueues(List<Connection> connections) throws IOException {
            if (this.params.isExclusive()) {
                return connections.stream().map(connection -> new TopologyHandlerResult((Connection)connection, (List<String>)new ArrayList<String>(), this.topologyRecording.child())).collect(Collectors.toList());
            }
            return this.configureQueues(connections, this.getQueueNames(), this.topologyRecording, () -> this.next());
        }

        protected List<String> getQueueNames() {
            return Collections.unmodifiableList(this.queues);
        }

        protected List<String> getQueueNamesForClient() {
            return Collections.singletonList(this.queues.get(this.index % this.queues.size()));
        }

        @Override
        public void next() {
            ++this.index;
        }

        @Override
        public void reset() {
            this.index = 0;
        }
    }

    static class FixedQueuesTopologyHandler
    extends TopologyHandlerSupport
    implements TopologyHandler {
        final String routingKey;
        final List<String> queueNames;
        final TopologyRecording topologyRecording;

        FixedQueuesTopologyHandler(MulticastParams params, String routingKey, List<String> queueNames, TopologyRecording topologyRecording) {
            super(params);
            this.routingKey = routingKey == null ? UUID.randomUUID().toString() : routingKey;
            this.queueNames = queueNames == null ? new ArrayList() : queueNames;
            this.topologyRecording = topologyRecording;
        }

        @Override
        public String getRoutingKey() {
            return this.routingKey;
        }

        @Override
        public TopologyHandlerResult configureQueuesForClient(Connection connection) throws IOException {
            Connection connectionToUse = this.params.isExclusive() ? this.maybeUseCachedConnection(this.queueNames, connection) : connection;
            return this.configureQueues(connectionToUse, this.queueNames, this.topologyRecording, () -> {});
        }

        @Override
        public List<TopologyHandlerResult> configureAllQueues(List<Connection> connections) throws IOException {
            if (this.shouldConfigureQueues() && !this.params.isExclusive()) {
                return this.configureQueues(connections, this.queueNames, this.topologyRecording, () -> {});
            }
            return connections.stream().map(connection -> new TopologyHandlerResult((Connection)connection, (List<String>)new ArrayList<String>(), this.topologyRecording.child())).collect(Collectors.toList());
        }

        public boolean shouldConfigureQueues() {
            return this.params.consumerCount == 0 && this.queueNames.size() != 0;
        }

        @Override
        public void next() {
        }

        @Override
        public void reset() {
        }
    }

    static abstract class TopologyHandlerSupport {
        protected final MulticastParams params;
        private final ConcurrentMap<String, Connection> connectionCache = new ConcurrentHashMap<String, Connection>();

        protected TopologyHandlerSupport(MulticastParams params) {
            this.params = params;
        }

        protected Connection maybeUseCachedConnection(List<String> queues, Connection connection) throws IOException {
            Connection connectionToUse = this.connectionCache.putIfAbsent(queues.toString(), connection);
            if (connectionToUse == null) {
                connectionToUse = connection;
            } else if (connection != connectionToUse) {
                connection.close(200, "Connection not used", -1);
            }
            return connectionToUse;
        }

        protected TopologyHandlerResult configureQueues(Connection connection, List<String> queues, TopologyRecording topologyRecording, Runnable afterQueueConfigurationCallback) throws IOException {
            return this.configureQueues(Collections.singletonList(connection), queues, topologyRecording, afterQueueConfigurationCallback).get(0);
        }

        protected List<TopologyHandlerResult> configureQueues(List<Connection> connections, List<String> queues, TopologyRecording parentTopologyRecording, Runnable afterQueueConfigurationCallback) throws IOException {
            class State {
                final Connection c;
                final Channel ch;
                final TopologyRecording topologyRecording;
                final List<String> generatedQueueNames = new ArrayList<String>();

                State(Connection c, Channel ch, TopologyRecording topologyRecording) {
                    this.c = c;
                    this.ch = ch;
                    this.topologyRecording = topologyRecording;
                }
            }
            ArrayList<State> states = new ArrayList<State>(connections.size());
            for (Connection connection : connections) {
                states.add(new State(connection, connection.createChannel(), parentTopologyRecording.child()));
            }
            State firstState = (State)states.get(0);
            if (!this.params.predeclared || !MulticastParams.exchangeExists(firstState.c, this.params.exchangeName)) {
                Utils.exchangeDeclare(firstState.ch, this.params.exchangeName, this.params.exchangeType);
                firstState.topologyRecording.recordExchange(this.params.exchangeName, this.params.exchangeType);
            }
            if (!this.params.predeclared && queues.isEmpty()) {
                queues = Collections.singletonList("");
            }
            for (int i = 0; i < queues.size(); ++i) {
                String qName = queues.get(i);
                State state = (State)states.get(i % states.size());
                Connection connection = state.c;
                List<String> generatedQueueNames = state.generatedQueueNames;
                Channel channel = state.ch;
                TopologyRecording topologyRecording = state.topologyRecording;
                if (!this.params.predeclared || !MulticastParams.queueExists(connection, qName)) {
                    boolean serverNamed = qName == null || "".equals(qName);
                    qName = channel.queueDeclare(qName, this.params.flags.contains("persistent"), this.params.isExclusive(), this.params.autoDelete, this.params.queueArguments).getQueue();
                    topologyRecording.recordQueue(qName, this.params.flags.contains("persistent"), this.params.isExclusive(), this.params.autoDelete, this.params.queueArguments, serverNamed);
                }
                generatedQueueNames.add(qName);
                if (!("".equals(this.params.exchangeName) || "amq.default".equals(this.params.exchangeName) || this.params.skipBindingQueues)) {
                    String routingKey = this.params.topologyHandler.getRoutingKey();
                    channel.queueBind(qName, this.params.exchangeName, routingKey);
                    topologyRecording.recordBinding(qName, this.params.exchangeName, routingKey);
                }
                afterQueueConfigurationCallback.run();
            }
            ArrayList<TopologyHandlerResult> topologyHandlerResults = new ArrayList<TopologyHandlerResult>(connections.size());
            for (State state : states) {
                try {
                    state.ch.close();
                    topologyHandlerResults.add(new TopologyHandlerResult(state.c, state.generatedQueueNames, state.topologyRecording));
                }
                catch (TimeoutException e) {
                    throw new IOException(e);
                }
            }
            return topologyHandlerResults;
        }
    }

    static class TopologyHandlerResult {
        final Connection connection;
        final TopologyRecording topologyRecording;
        final List<String> configuredQueues;

        TopologyHandlerResult(Connection connection, List<String> configuredQueues, TopologyRecording topologyRecording) {
            this.connection = connection;
            this.configuredQueues = configuredQueues;
            this.topologyRecording = topologyRecording;
        }
    }

    static interface TopologyHandler {
        public String getRoutingKey();

        public TopologyHandlerResult configureQueuesForClient(Connection var1) throws IOException;

        public List<TopologyHandlerResult> configureAllQueues(List<Connection> var1) throws IOException;

        public void next();

        public void reset();
    }
}

