/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.perf;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.perf.AgentBase;
import com.rabbitmq.perf.ConsumerParameters;
import com.rabbitmq.perf.MulticastSet;
import com.rabbitmq.perf.PerfTest;
import com.rabbitmq.perf.RateLimiterUtils;
import com.rabbitmq.perf.Recovery;
import com.rabbitmq.perf.StartListener;
import com.rabbitmq.perf.TimestampProvider;
import com.rabbitmq.perf.TopologyRecording;
import com.rabbitmq.perf.Utils;
import com.rabbitmq.perf.ValueIndicator;
import com.rabbitmq.perf.metrics.PerformanceMetrics;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer
extends AgentBase
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    private static final AckNackOperation ACK_OPERATION = (ch, envelope, multiple, requeue) -> ch.basicAck(envelope.getDeliveryTag(), multiple);
    private static final AckNackOperation NACK_OPERATION = (ch, envelope, multiple, requeue) -> ch.basicNack(envelope.getDeliveryTag(), multiple, requeue);
    static final String STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT = "Consumer reached message limit";
    static final String STOP_REASON_CONSUMER_IDLE = "Consumer is idle for more than 1 second";
    static final String STOP_REASON_CONSUMER_QUEUE_EMPTY = "Consumer queue(s) empty";
    private volatile ConsumerImpl q;
    private final Channel channel;
    private final String id;
    private final int txSize;
    private final boolean autoAck;
    private final int multiAckEvery;
    private final boolean requeue;
    private final PerformanceMetrics performanceMetrics;
    private final int msgLimit;
    private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap());
    private final ConsumerLatency consumerLatency;
    private final BiFunction<AMQP.BasicProperties, byte[], Long> timestampExtractor;
    private final TimestampProvider timestampProvider;
    private final MulticastSet.CompletionHandler completionHandler;
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicReference<List<String>> queueNames = new AtomicReference();
    private final AtomicLong queueNamesVersion = new AtomicLong(0L);
    private final List<String> initialQueueNames;
    private final ConsumerState state;
    private final Recovery.RecoveryProcess recoveryProcess;
    private final ExecutorService executorService;
    private final boolean polling;
    private final int pollingInterval;
    private final AckNackOperation ackNackOperation;
    private final Map<String, Object> consumerArguments;
    private final PerfTest.EXIT_WHEN exitWhen;
    private volatile long lastDeliveryTag;
    private volatile long lastAckedDeliveryTag;
    private final ScheduledExecutorService topologyRecoveryScheduledExecutorService;
    private final AtomicLong epochMessageCount = new AtomicLong(0L);
    private final Runnable rateLimiterCallback;
    private final boolean rateLimitation;

    public Consumer(ConsumerParameters parameters) {
        super(parameters.getStartListener());
        long consumerLatencyInMicroSeconds;
        this.channel = parameters.getChannel();
        this.id = parameters.getId();
        this.txSize = parameters.getTxSize();
        this.autoAck = parameters.isAutoAck();
        this.multiAckEvery = parameters.getMultiAckEvery();
        this.requeue = parameters.isRequeue();
        this.performanceMetrics = parameters.getPerformanceMetrics();
        this.msgLimit = parameters.getMsgLimit();
        this.timestampProvider = parameters.getTimestampProvider();
        this.completionHandler = parameters.getCompletionHandler();
        this.executorService = parameters.getExecutorService();
        this.polling = parameters.isPolling();
        this.pollingInterval = parameters.getPollingInterval();
        this.consumerArguments = parameters.getConsumerArguments();
        this.exitWhen = parameters.getExitWhen();
        this.topologyRecoveryScheduledExecutorService = parameters.getTopologyRecoveryScheduledExecutorService();
        this.queueNames.set(new ArrayList<String>(parameters.getQueueNames()));
        this.initialQueueNames = new ArrayList<String>(parameters.getQueueNames());
        this.consumerLatency = parameters.getConsumerLatenciesIndicator().isVariable() ? new VariableConsumerLatency(parameters.getConsumerLatenciesIndicator()) : ((consumerLatencyInMicroSeconds = parameters.getConsumerLatenciesIndicator().getValue().longValue()) <= 0L ? new NoWaitConsumerLatency() : (consumerLatencyInMicroSeconds >= 1000L ? new ThreadSleepConsumerLatency(parameters.getConsumerLatenciesIndicator()) : new BusyWaitConsumerLatency(parameters.getConsumerLatenciesIndicator())));
        this.timestampExtractor = this.timestampProvider.isTimestampInHeader() ? (properties, body) -> {
            Object timestamp = properties.getHeaders().get("timestamp");
            return timestamp == null ? Long.MAX_VALUE : (Long)timestamp;
        } : (properties, body) -> {
            DataInputStream d = new DataInputStream(new ByteArrayInputStream((byte[])body));
            try {
                d.readInt();
                return d.readLong();
            }
            catch (IOException e) {
                throw new RuntimeException("Error while extracting timestamp from body");
            }
        };
        this.ackNackOperation = parameters.isNack() ? NACK_OPERATION : ACK_OPERATION;
        boolean bl = this.rateLimitation = parameters.getRateLimit() > 0.0f;
        if (this.rateLimitation) {
            RateLimiterUtils.RateLimiter rateLimiter = RateLimiterUtils.RateLimiter.create(parameters.getRateLimit());
            this.rateLimiterCallback = () -> rateLimiter.acquire(1);
        } else {
            this.rateLimiterCallback = () -> {};
        }
        this.state = new ConsumerState(this.timestampProvider);
        this.recoveryProcess = parameters.getRecoveryProcess();
        this.recoveryProcess.init(this);
    }

    @Override
    protected StartListener.Type type() {
        return StartListener.Type.CONSUMER;
    }

    @Override
    public void run() {
        this.epochMessageCount.set(0L);
        if (this.polling) {
            this.startBasicGetConsumer();
        } else {
            this.registerAsynchronousConsumer();
        }
    }

    private void startBasicGetConsumer() {
        this.executorService.execute(() -> {
            ConsumerImpl delegate = new ConsumerImpl(this.channel);
            boolean shouldPause = this.pollingInterval > 0;
            long queueNamesVersion = this.queueNamesVersion.get();
            List<String> queues = this.queueNames.get();
            Channel ch = this.channel;
            Connection connection = this.channel.getConnection();
            this.started();
            while (!this.completed.get() && !Thread.interrupted()) {
                if (queueNamesVersion != this.queueNamesVersion.get()) {
                    queues = this.queueNames.get();
                    queueNamesVersion = this.queueNamesVersion.get();
                }
                for (String queue : queues) {
                    if (!this.recoveryProcess.isRecoverying()) {
                        try {
                            GetResponse response = ch.basicGet(queue, this.autoAck);
                            if (response != null) {
                                delegate.handleMessage(response.getEnvelope(), response.getProps(), response.getBody(), ch);
                            }
                        }
                        catch (IOException e) {
                            LOGGER.debug("Basic.get error on queue {}: {}", (Object)queue, (Object)e.getMessage());
                            try {
                                ch = connection.createChannel();
                            }
                            catch (Exception ex) {
                                LOGGER.debug("Error while trying to create a channel: {}", (Object)queue, (Object)e.getMessage());
                            }
                        }
                        catch (AlreadyClosedException e) {
                            LOGGER.debug("Tried to basic.get from a closed connection");
                        }
                        if (!shouldPause) continue;
                        try {
                            Thread.sleep(this.pollingInterval);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        continue;
                    }
                    try {
                        LOGGER.debug("Recovery in progress, sleeping for a sec");
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        });
    }

    private void registerAsynchronousConsumer() {
        this.started();
        try {
            this.q = new ConsumerImpl(this.channel);
            for (String qName : this.queueNames.get()) {
                String tag = this.channel.basicConsume(qName, this.autoAck, this.consumerArguments, (com.rabbitmq.client.Consumer)this.q);
                this.consumerTagBranchMap.put(tag, qName);
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        catch (ShutdownSignalException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean ackEnabled() {
        return !this.autoAck;
    }

    private boolean transactionEnabled() {
        return this.txSize != 0;
    }

    private void countDown(String reason) {
        if (this.completed.compareAndSet(false, true)) {
            this.completionHandler.countDown(reason);
        }
    }

    @Override
    public void recover(TopologyRecording topologyRecording) {
        this.epochMessageCount.set(0L);
        if (this.polling) {
            ArrayList<String> queues = new ArrayList<String>(this.initialQueueNames.size());
            for (String queue : this.initialQueueNames) {
                queues.add(Consumer.queueName(topologyRecording, queue));
            }
            this.queueNames.set(queues);
            this.queueNamesVersion.incrementAndGet();
        } else {
            for (Map.Entry<String, String> entry : this.consumerTagBranchMap.entrySet()) {
                String queueName = Consumer.queueName(topologyRecording, entry.getValue());
                String consumerTag = entry.getKey();
                LOGGER.debug("Recovering consumer, starting consuming on {}", (Object)queueName);
                try {
                    TopologyRecording.RecordedQueue queueRecord = topologyRecording.queue(entry.getValue());
                    this.consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, queueName);
                }
                catch (Exception e) {
                    LOGGER.warn("Error while recovering consumer {} on queue {} on connection {}", new Object[]{entry.getKey(), queueName, this.channel.getConnection().getClientProvidedName(), e});
                }
            }
        }
    }

    private void consumeOrScheduleConsume(TopologyRecording.RecordedQueue queueRecord, TopologyRecording topologyRecording, String consumerTag, String queueName) throws IOException {
        LOGGER.debug("Checking if queue {} exists before subscribing", (Object)queueName);
        boolean queueExists = Utils.exists(this.channel.getConnection(), ch -> ch.queueDeclarePassive(queueName));
        if (Consumer.queueMayBeDown(queueRecord, topologyRecording)) {
            if (queueExists) {
                LOGGER.debug("Queue {} does exist, subscribing", (Object)queueName);
                this.channel.basicConsume(queueName, this.autoAck, consumerTag, false, false, this.consumerArguments, (com.rabbitmq.client.Consumer)this.q);
            } else {
                LOGGER.debug("Queue {} does not exist, it is likely unavailable, trying to re-create it though, and scheduling subscription.", (Object)queueName);
                topologyRecording.recoverQueueAndBindings(this.channel.getConnection(), queueRecord);
                Duration schedulingPeriod = Duration.ofSeconds(5L);
                int maxRetry = (int)(Duration.ofMinutes(10L).getSeconds() / schedulingPeriod.getSeconds());
                AtomicInteger retryCount = new AtomicInteger(0);
                AtomicReference<Callable<Void>> resubscriptionReference = new AtomicReference<Callable<Void>>();
                Callable<Void> resubscription = () -> {
                    LOGGER.debug("Scheduled re-subscription for {}...", (Object)queueName);
                    if (Utils.exists(this.channel.getConnection(), ch -> ch.queueDeclarePassive(queueName))) {
                        LOGGER.debug("Queue {} exists, re-subscribing", (Object)queueName);
                        this.channel.basicConsume(queueName, this.autoAck, consumerTag, false, false, this.consumerArguments, (com.rabbitmq.client.Consumer)this.q);
                    } else if (retryCount.incrementAndGet() <= maxRetry) {
                        LOGGER.debug("Queue {} does not exist, scheduling re-subscription", (Object)queueName);
                        this.topologyRecoveryScheduledExecutorService.schedule((Callable)resubscriptionReference.get(), schedulingPeriod.getSeconds(), TimeUnit.SECONDS);
                    } else {
                        LOGGER.debug("Max subscription retry count reached {} for queue {}", (Object)retryCount.get(), (Object)queueName);
                    }
                    return null;
                };
                resubscriptionReference.set(resubscription);
                this.topologyRecoveryScheduledExecutorService.schedule(resubscription, schedulingPeriod.getSeconds(), TimeUnit.SECONDS);
            }
        } else {
            if (!queueExists) {
                LOGGER.debug("Queue {} does not exist, trying to re-create it before re-subscribing", (Object)queueName);
                topologyRecording.recoverQueueAndBindings(this.channel.getConnection(), queueRecord);
            }
            this.channel.basicConsume(queueRecord == null ? queueName : queueRecord.name(), this.autoAck, consumerTag, false, false, this.consumerArguments, (com.rabbitmq.client.Consumer)this.q);
        }
    }

    private static boolean queueMayBeDown(TopologyRecording.RecordedQueue queueRecord, TopologyRecording topologyRecording) {
        return queueRecord != null && queueRecord.isClassic() && queueRecord.isDurable() && topologyRecording.isCluster();
    }

    void maybeStopIfNoActivityOrQueueEmpty() {
        LOGGER.debug("Checking consumer activity");
        if (this.exitWhen == PerfTest.EXIT_WHEN.NEVER) {
            return;
        }
        TimestampProvider tp = this.state.getTimestampProvider();
        long lastActivityTimestamp = this.state.getLastActivityTimestamp();
        if (lastActivityTimestamp == -1L) {
            this.state.setLastActivityTimestamp(tp.getCurrentTime());
            return;
        }
        Duration idleDuration = tp.difference(tp.getCurrentTime(), lastActivityTimestamp);
        if (idleDuration.toMillis() > 1000L) {
            LOGGER.debug("Consumer idle for {}", (Object)idleDuration);
            List<String> queues = this.queueNames.get();
            if (this.exitWhen == PerfTest.EXIT_WHEN.IDLE) {
                this.maybeAckCommitBeforeExit();
                LOGGER.debug("Terminating consumer {} because of inactivity", (Object)this);
                this.countDown(STOP_REASON_CONSUMER_IDLE);
            } else if (this.exitWhen == PerfTest.EXIT_WHEN.EMPTY) {
                LOGGER.debug("Checking content of consumer queue(s)");
                boolean empty = false;
                for (String queue : queues) {
                    try {
                        AMQP.Queue.DeclareOk declareOk = this.channel.queueDeclarePassive(queue);
                        LOGGER.debug("Message count for queue {}: {}", (Object)queue, (Object)declareOk.getMessageCount());
                        if (declareOk.getMessageCount() != 0) continue;
                        empty = true;
                    }
                    catch (IOException e) {
                        LOGGER.info("Error when calling queue.declarePassive({}) in consumer {}", (Object)queue, (Object)this);
                    }
                }
                if (empty) {
                    this.maybeAckCommitBeforeExit();
                    LOGGER.debug("Terminating consumer {} because its queue(s) is (are) empty", (Object)this);
                    this.countDown(STOP_REASON_CONSUMER_QUEUE_EMPTY);
                }
            }
        }
    }

    private void maybeAckCommitBeforeExit() {
        if (this.ackEnabled() && this.lastAckedDeliveryTag < this.lastDeliveryTag) {
            LOGGER.debug("Acking/committing before exit");
            try {
                this.dealWithWriteOperation(() -> {
                    this.channel.basicAck(this.lastDeliveryTag, true);
                    if (this.transactionEnabled()) {
                        this.channel.txCommit();
                    }
                }, this.recoveryProcess);
            }
            catch (IOException e) {
                LOGGER.warn("Error while acking/committing on exit: {}", (Object)e.getMessage());
            }
        }
    }

    private static String queueName(TopologyRecording recording, String queue) {
        TopologyRecording.RecordedQueue queueRecord = recording.queue(queue);
        return queueRecord == null ? queue : queueRecord.name();
    }

    private static boolean latencySleep(long delay) {
        try {
            long ms = delay / 1000L;
            Thread.sleep(ms);
            return true;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private static boolean latencyBusyWait(long delay) {
        delay *= 1000L;
        long start = System.nanoTime();
        while (System.nanoTime() - start < delay) {
        }
        return true;
    }

    @FunctionalInterface
    private static interface AckNackOperation {
        public void apply(Channel var1, Envelope var2, boolean var3, boolean var4) throws IOException;
    }

    private static class BusyWaitConsumerLatency
    implements ConsumerLatency {
        private final ValueIndicator<Long> consumerLatenciesIndicator;

        private BusyWaitConsumerLatency(ValueIndicator<Long> consumerLatenciesIndicator) {
            this.consumerLatenciesIndicator = consumerLatenciesIndicator;
        }

        @Override
        public boolean simulateLatency() {
            return Consumer.latencyBusyWait(this.consumerLatenciesIndicator.getValue());
        }
    }

    private static class ThreadSleepConsumerLatency
    implements ConsumerLatency {
        private final ValueIndicator<Long> consumerLatenciesIndicator;

        private ThreadSleepConsumerLatency(ValueIndicator<Long> consumerLatenciesIndicator) {
            this.consumerLatenciesIndicator = consumerLatenciesIndicator;
        }

        @Override
        public boolean simulateLatency() {
            return Consumer.latencySleep(this.consumerLatenciesIndicator.getValue());
        }
    }

    private static class NoWaitConsumerLatency
    implements ConsumerLatency {
        private NoWaitConsumerLatency() {
        }

        @Override
        public boolean simulateLatency() {
            return true;
        }
    }

    private static class VariableConsumerLatency
    implements ConsumerLatency {
        private final ValueIndicator<Long> consumerLatenciesIndicator;

        private VariableConsumerLatency(ValueIndicator<Long> consumerLatenciesIndicator) {
            this.consumerLatenciesIndicator = consumerLatenciesIndicator;
        }

        @Override
        public boolean simulateLatency() {
            long consumerLatencyInMicroSeconds = this.consumerLatenciesIndicator.getValue();
            if (consumerLatencyInMicroSeconds <= 0L) {
                return true;
            }
            if (consumerLatencyInMicroSeconds >= 1000L) {
                return Consumer.latencySleep(consumerLatencyInMicroSeconds);
            }
            return Consumer.latencyBusyWait(consumerLatencyInMicroSeconds);
        }
    }

    private static interface ConsumerLatency {
        public boolean simulateLatency();
    }

    private static class ConsumerState
    implements AgentBase.AgentState {
        private volatile long lastStatsTime;
        private volatile long lastActivityTimestamp = -1L;
        private final AtomicInteger msgCount = new AtomicInteger(0);
        private final TimestampProvider timestampProvider;

        protected ConsumerState(TimestampProvider timestampProvider) {
            this.timestampProvider = timestampProvider;
        }

        @Override
        public long getLastStatsTime() {
            return this.lastStatsTime;
        }

        protected void setLastStatsTime(long lastStatsTime) {
            this.lastStatsTime = lastStatsTime;
        }

        public void setLastActivityTimestamp(long lastActivityTimestamp) {
            this.lastActivityTimestamp = lastActivityTimestamp;
        }

        public long getLastActivityTimestamp() {
            return this.lastActivityTimestamp;
        }

        @Override
        public int getMsgCount() {
            return this.msgCount.get();
        }

        public TimestampProvider getTimestampProvider() {
            return this.timestampProvider;
        }

        protected void setMsgCount(int msgCount) {
            this.msgCount.set(msgCount);
        }

        @Override
        public int incrementMessageCount() {
            return this.msgCount.incrementAndGet();
        }
    }

    private class ConsumerImpl
    extends DefaultConsumer {
        private final AtomicLong receivedMessageCount;

        private ConsumerImpl(Channel channel) {
            super(channel);
            this.receivedMessageCount = new AtomicLong(0L);
            Consumer.this.state.setLastStatsTime(System.nanoTime());
            Consumer.this.state.setMsgCount(0);
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            this.handleMessage(envelope, properties, body, Consumer.this.channel);
        }

        void handleMessage(Envelope envelope, AMQP.BasicProperties properties, byte[] body, Channel ch) throws IOException {
            this.receivedMessageCount.incrementAndGet();
            Consumer.this.epochMessageCount.incrementAndGet();
            Consumer.this.state.incrementMessageCount();
            long nowTimestamp = Consumer.this.timestampProvider.getCurrentTime();
            Consumer.this.state.setLastActivityTimestamp(nowTimestamp);
            if (Consumer.this.msgLimit == 0 || this.receivedMessageCount.get() <= (long)Consumer.this.msgLimit) {
                long messageTimestamp = (Long)Consumer.this.timestampExtractor.apply(properties, body);
                long diff_time = Consumer.this.timestampProvider.getDifference(nowTimestamp, messageTimestamp);
                Consumer.this.performanceMetrics.received(Consumer.this.id.equals(envelope.getRoutingKey()) ? diff_time : 0L);
                if (Consumer.this.consumerLatency.simulateLatency()) {
                    this.ackIfNecessary(envelope, Consumer.this.epochMessageCount.get(), ch);
                    this.commitTransactionIfNecessary(Consumer.this.epochMessageCount.get(), ch);
                    Consumer.this.lastDeliveryTag = envelope.getDeliveryTag();
                    long now = System.nanoTime();
                    if (Consumer.this.rateLimitation) {
                        if (now - Consumer.this.state.getLastStatsTime() > 1000L) {
                            Consumer.this.state.setLastStatsTime(now);
                            Consumer.this.state.setMsgCount(0);
                        }
                        Consumer.this.rateLimiterCallback.run();
                    }
                }
            }
            if (Consumer.this.msgLimit != 0 && this.receivedMessageCount.get() >= (long)Consumer.this.msgLimit) {
                Consumer.this.countDown(Consumer.STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT);
            }
        }

        private void ackIfNecessary(Envelope envelope, long currentMessageCount, Channel ch) throws IOException {
            if (Consumer.this.ackEnabled()) {
                Consumer.this.dealWithWriteOperation(() -> {
                    if (Consumer.this.multiAckEvery == 0) {
                        Consumer.this.ackNackOperation.apply(ch, envelope, false, Consumer.this.requeue);
                        Consumer.this.lastAckedDeliveryTag = envelope.getDeliveryTag();
                    } else if (currentMessageCount % (long)Consumer.this.multiAckEvery == 0L) {
                        Consumer.this.ackNackOperation.apply(ch, envelope, true, Consumer.this.requeue);
                        Consumer.this.lastAckedDeliveryTag = envelope.getDeliveryTag();
                    }
                }, Consumer.this.recoveryProcess);
            }
        }

        private void commitTransactionIfNecessary(long currentMessageCount, Channel ch) throws IOException {
            if (Consumer.this.transactionEnabled() && currentMessageCount % (long)Consumer.this.txSize == 0L) {
                Consumer.this.dealWithWriteOperation(() -> ch.txCommit(), Consumer.this.recoveryProcess);
            }
        }

        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
            LOGGER.debug("Consumer received shutdown signal, recovery process enabled? {}, condition to trigger connection recovery? {}", (Object)Consumer.this.recoveryProcess.isEnabled(), (Object)Consumer.this.isConnectionRecoveryTriggered(sig));
            if (!Consumer.this.recoveryProcess.isEnabled()) {
                LOGGER.debug("Counting down for consumer");
                Consumer.this.countDown("Consumer shut down");
            }
        }

        public void handleCancel(String consumerTag) throws IOException {
            System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
            Consumer.this.epochMessageCount.set(0L);
            if (Consumer.this.consumerTagBranchMap.containsKey(consumerTag)) {
                String qName = (String)Consumer.this.consumerTagBranchMap.get(consumerTag);
                TopologyRecording topologyRecording = Consumer.this.topologyRecording();
                TopologyRecording.RecordedQueue queueRecord = topologyRecording.queue(qName);
                Consumer.this.consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, qName);
            } else {
                System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
            }
        }
    }
}

