/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerFlowStrategy;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamNotAvailableException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.impl.AsyncRetry;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.ConnectionStreamException;
import com.rabbitmq.stream.impl.DefaultExecutorServiceFactory;
import com.rabbitmq.stream.impl.ExecutorServiceFactory;
import com.rabbitmq.stream.impl.StreamConsumer;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.TimeoutStreamException;
import com.rabbitmq.stream.impl.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ConsumersCoordinator
implements AutoCloseable {
    static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256;
    static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5;
    private static final boolean DEBUG = false;
    static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next();
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumersCoordinator.class);
    private final StreamEnvironment environment;
    private final Utils.ClientFactory clientFactory;
    private final int maxConsumersByConnection;
    private final Function<Utils.ClientConnectionType, String> connectionNamingStrategy;
    private final AtomicLong managerIdSequence = new AtomicLong(0L);
    private final NavigableSet<ClientSubscriptionsManager> managers = new ConcurrentSkipListSet<ClientSubscriptionsManager>();
    private final AtomicLong trackerIdSequence = new AtomicLong(0L);
    private final Function<List<Client.Broker>, Client.Broker> brokerPicker;
    private final List<SubscriptionTracker> trackers = new CopyOnWriteArrayList<SubscriptionTracker>();
    private final ExecutorServiceFactory executorServiceFactory = new DefaultExecutorServiceFactory(Utils.AVAILABLE_PROCESSORS, 10, "rabbitmq-stream-consumer-connection-");
    private final boolean forceReplica;
    private final Lock coordinatorLock = new ReentrantLock();
    private static final Predicate<Exception> RETRY_ON_TIMEOUT = e -> e instanceof TimeoutStreamException;

    ConsumersCoordinator(StreamEnvironment environment, int maxConsumersByConnection, Function<Utils.ClientConnectionType, String> connectionNamingStrategy, Utils.ClientFactory clientFactory, boolean forceReplica, Function<List<Client.Broker>, Client.Broker> brokerPicker) {
        this.environment = environment;
        this.clientFactory = clientFactory;
        this.maxConsumersByConnection = maxConsumersByConnection;
        this.connectionNamingStrategy = connectionNamingStrategy;
        this.forceReplica = forceReplica;
        this.brokerPicker = brokerPicker;
    }

    private BackOffDelayPolicy recoveryBackOffDelayPolicy() {
        return this.environment.recoveryBackOffDelayPolicy();
    }

    private BackOffDelayPolicy metadataUpdateBackOffDelayPolicy() {
        return this.environment.topologyUpdateBackOffDelayPolicy();
    }

    Runnable subscribe(StreamConsumer consumer, String stream, OffsetSpecification offsetSpecification, String trackingReference, SubscriptionListener subscriptionListener, Runnable trackingClosingCallback, MessageHandler messageHandler, Map<String, String> subscriptionProperties, ConsumerFlowStrategy flowStrategy) {
        return Utils.lock(this.coordinatorLock, () -> {
            List<Utils.BrokerWrapper> candidates = this.findCandidateNodes(stream, this.forceReplica);
            Client.Broker newNode = ConsumersCoordinator.pickBroker(this.brokerPicker, candidates);
            if (newNode == null) {
                throw new IllegalStateException("No available node to subscribe to");
            }
            SubscriptionTracker subscriptionTracker = new SubscriptionTracker(this.trackerIdSequence.getAndIncrement(), consumer, stream, offsetSpecification, trackingReference, subscriptionListener, trackingClosingCallback, messageHandler, subscriptionProperties, flowStrategy);
            try {
                this.addToManager(newNode, candidates, subscriptionTracker, offsetSpecification, true);
            }
            catch (ConnectionStreamException e) {
                throw new StreamException(e.getMessage());
            }
            return subscriptionTracker::cancel;
        });
    }

    private void addToManager(Client.Broker node, List<Utils.BrokerWrapper> candidates, SubscriptionTracker tracker, OffsetSpecification offsetSpecification, boolean isInitialSubscription) {
        Client.ClientParameters clientParameters = this.environment.clientParametersCopy().executorServiceFactory(this.executorServiceFactory).host(node.getHost()).port(node.getPort());
        ClientSubscriptionsManager pickedManager = null;
        while (pickedManager == null) {
            Iterator<ClientSubscriptionsManager> iterator = this.managers.iterator();
            while (iterator.hasNext()) {
                pickedManager = iterator.next();
                if (pickedManager.isClosed()) {
                    iterator.remove();
                    pickedManager = null;
                    continue;
                }
                if (node.equals(pickedManager.node) && !pickedManager.isFull()) break;
                pickedManager = null;
            }
            if (pickedManager == null) {
                String name = Utils.keyForNode(node);
                LOGGER.debug("Creating subscription manager on {}", (Object)name);
                pickedManager = new ClientSubscriptionsManager(node, candidates, clientParameters);
                LOGGER.debug("Created subscription manager on {}, id {}", (Object)name, (Object)pickedManager.id);
            }
            try {
                pickedManager.add(tracker, offsetSpecification, isInitialSubscription);
                LOGGER.debug("Assigned tracker {} to manager {} (node {}), subscription ID {}", new Object[]{tracker.label(), pickedManager.id, pickedManager.name, tracker.subscriptionIdInClient});
                this.managers.add(pickedManager);
            }
            catch (IllegalStateException e) {
                pickedManager = null;
            }
            catch (StreamNotAvailableException | ConnectionStreamException | ClientClosedException e) {
                if (pickedManager.isEmpty()) {
                    this.environment.execute(pickedManager::closeIfEmpty, "Consumer manager closing after timeout, consumer %d on stream '%s'", tracker.consumer.id(), tracker.stream);
                }
                throw e;
            }
            catch (RuntimeException e) {
                if (pickedManager != null) {
                    pickedManager.closeIfEmpty();
                }
                throw e;
            }
        }
    }

    int managerCount() {
        return this.managers.size();
    }

    List<Utils.BrokerWrapper> findCandidateNodes(String stream, boolean forceReplica) {
        List brokers;
        LOGGER.debug("Candidate lookup to consumer from '{}', forcing replica? {}", (Object)stream, (Object)forceReplica);
        Map metadata = this.environment.locatorOperation(Utils.namedFunction(c -> c.metadata(stream), "Candidate lookup to consume from '%s'", stream));
        if (metadata.isEmpty() || metadata.get(stream) == null) {
            throw new StreamDoesNotExistException(stream);
        }
        Client.StreamMetadata streamMetadata = (Client.StreamMetadata)metadata.get(stream);
        if (!streamMetadata.isResponseOk()) {
            if (streamMetadata.getResponseCode() == 2) {
                throw new StreamDoesNotExistException(stream);
            }
            throw new IllegalStateException("Could not get stream metadata, response code: " + Utils.formatConstant(streamMetadata.getResponseCode()));
        }
        Client.Broker leader = streamMetadata.getLeader();
        List<Client.Broker> replicas = streamMetadata.getReplicas();
        if ((replicas == null || replicas.isEmpty()) && leader == null) {
            throw new IllegalStateException("No node available to consume from stream " + stream);
        }
        if (replicas == null || replicas.isEmpty()) {
            if (forceReplica) {
                throw new IllegalStateException(String.format("Only the leader node is available for consuming from %s and consuming from leader has been deactivated for this consumer", stream));
            }
            brokers = Collections.singletonList(new Utils.BrokerWrapper(leader, true));
            LOGGER.debug("Only leader node {} for consuming from {}", (Object)leader, (Object)stream);
        } else {
            LOGGER.debug("Replicas for consuming from {}: {}", (Object)stream, replicas);
            brokers = replicas.stream().map(b -> new Utils.BrokerWrapper((Client.Broker)b, false)).collect(Collectors.toCollection(ArrayList::new));
            if (!forceReplica && leader != null) {
                brokers.add(new Utils.BrokerWrapper(leader, true));
            }
        }
        LOGGER.debug("Candidates to consume from {}: {}", (Object)stream, (Object)brokers);
        return brokers;
    }

    private Callable<List<Utils.BrokerWrapper>> findCandidateNodes(String stream) {
        AtomicInteger attemptNumber = new AtomicInteger();
        return () -> {
            boolean mustUseReplica = this.forceReplica ? attemptNumber.incrementAndGet() <= 5 : false;
            LOGGER.debug("Looking for broker(s) for stream {}, forcing replica {}", (Object)stream, (Object)mustUseReplica);
            return this.findCandidateNodes(stream, mustUseReplica);
        };
    }

    @Override
    public void close() {
        Iterator<ClientSubscriptionsManager> iterator = this.managers.iterator();
        while (iterator.hasNext()) {
            ClientSubscriptionsManager manager = iterator.next();
            try {
                iterator.remove();
                manager.close();
            }
            catch (Exception e) {
                LOGGER.info("Error while closing manager {} connected to node {}: {}", new Object[]{manager.id, manager.name, e.getMessage()});
            }
        }
        try {
            this.executorServiceFactory.close();
        }
        catch (Exception e) {
            LOGGER.info("Error while closing executor service factory: {}", (Object)e.getMessage());
        }
    }

    public String toString() {
        StringBuilder builder = new StringBuilder("{");
        builder.append(Utils.jsonField("client_count", this.managers.size())).append(", ");
        builder.append(Utils.quote("clients")).append(" : [");
        builder.append(this.managers.stream().map(m -> {
            StringBuilder managerBuilder = new StringBuilder("{");
            managerBuilder.append(Utils.jsonField("id", m.id)).append(",").append(Utils.jsonField("node", m.name)).append(",").append(Utils.jsonField("consumer_count", m.trackerCount)).append(",");
            managerBuilder.append("\"subscriptions\" : [");
            List<SubscriptionTracker> trackers = m.subscriptionTrackers;
            managerBuilder.append(trackers.stream().filter(Objects::nonNull).map(t -> {
                StringBuilder trackerBuilder = new StringBuilder("{");
                trackerBuilder.append(Utils.jsonField("stream", t.stream)).append(",");
                trackerBuilder.append(Utils.jsonField("id", t.id)).append(",");
                trackerBuilder.append(Utils.jsonField("subscription_id", t.subscriptionIdInClient));
                return trackerBuilder.append("}").toString();
            }).collect(Collectors.joining(",")));
            managerBuilder.append("]");
            return managerBuilder.append("}").toString();
        }).collect(Collectors.joining(",")));
        builder.append("]");
        builder.append("}");
        return builder.toString();
    }

    static <T> int pickSlot(List<T> list, AtomicInteger sequence) {
        int index = Integer.remainderUnsigned(sequence.getAndIncrement(), 256);
        while (list.get(index) != null) {
            index = Integer.remainderUnsigned(sequence.getAndIncrement(), 256);
        }
        return index;
    }

    private static List<Client.Broker> keepReplicasIfPossible(Collection<Utils.BrokerWrapper> brokers) {
        if (brokers.size() > 1) {
            return brokers.stream().filter(w -> !w.isLeader()).map(Utils.BrokerWrapper::broker).collect(Collectors.toList());
        }
        return brokers.stream().map(Utils.BrokerWrapper::broker).collect(Collectors.toList());
    }

    static Client.Broker pickBroker(Function<List<Client.Broker>, Client.Broker> picker, Collection<Utils.BrokerWrapper> candidates) {
        return picker.apply(ConsumersCoordinator.keepReplicasIfPossible(candidates));
    }

    private static /* synthetic */ String lambda$toString$7(SubscriptionTracker t) {
        StringBuilder b = new StringBuilder("{");
        b.append(Utils.quote("stream")).append(":").append(Utils.quote(t.stream)).append(",");
        b.append(Utils.quote("node")).append(":");
        Client client = null;
        ClientSubscriptionsManager manager = t.manager;
        if (manager != null) {
            client = manager.client;
        }
        if (client == null) {
            b.append("null");
        } else {
            b.append(Utils.quote(client.getHost() + ":" + client.getPort()));
        }
        return b.append("}").toString();
    }

    private /* synthetic */ void lambda$subscribe$0(SubscriptionTracker subscriptionTracker) {
        try {
            this.trackers.remove(subscriptionTracker);
        }
        catch (Exception e) {
            LOGGER.debug("Error while removing subscription tracker from list");
        }
        subscriptionTracker.cancel();
    }

    private static class DefaultConsumerFlowStrategyContext
    implements ConsumerFlowStrategy.Context {
        private final byte subscriptionId;
        private final Client client;
        private final long messageCount;

        private DefaultConsumerFlowStrategyContext(byte subscriptionId, Client client, long messageCount) {
            this.subscriptionId = subscriptionId;
            this.client = client;
            this.messageCount = messageCount;
        }

        @Override
        public void credits(int credits) {
            try {
                this.client.credit(this.subscriptionId, credits);
            }
            catch (Exception e) {
                LOGGER.info("Error while providing {} credit(s) to subscription {}: {}", new Object[]{credits, this.subscriptionId, e.getMessage()});
            }
        }

        @Override
        public long messageCount() {
            return this.messageCount;
        }
    }

    private static class ClientClosedException
    extends StreamException {
        public ClientClosedException() {
            super("Client already closed");
        }
    }

    private static final class DefaultSubscriptionContext
    implements SubscriptionListener.SubscriptionContext {
        private volatile OffsetSpecification offsetSpecification;
        private final String name;

        private DefaultSubscriptionContext(OffsetSpecification computedOffsetSpecification, String name) {
            this.offsetSpecification = computedOffsetSpecification;
            this.name = name;
        }

        @Override
        public OffsetSpecification offsetSpecification() {
            return this.offsetSpecification;
        }

        @Override
        public void offsetSpecification(OffsetSpecification offsetSpecification) {
            this.offsetSpecification = offsetSpecification;
        }

        @Override
        public String stream() {
            return this.name;
        }

        public String toString() {
            return "SubscriptionContext{offsetSpecification=" + String.valueOf(this.offsetSpecification) + "}";
        }
    }

    private class ClientSubscriptionsManager
    implements Comparable<ClientSubscriptionsManager> {
        private final long id;
        private final Client.Broker node;
        private final Client client;
        private final String name;
        private final Map<String, Set<SubscriptionTracker>> streamToStreamSubscriptions = new ConcurrentHashMap<String, Set<SubscriptionTracker>>();
        private volatile List<SubscriptionTracker> subscriptionTrackers = this.createSubscriptionTrackerList();
        private final AtomicInteger consumerIndexSequence = new AtomicInteger(0);
        private volatile int trackerCount;
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final Lock subscriptionManagerLock = new ReentrantLock();

        private ClientSubscriptionsManager(Client.Broker targetNode, List<Utils.BrokerWrapper> candidates, Client.ClientParameters clientParameters) {
            this.id = ConsumersCoordinator.this.managerIdSequence.getAndIncrement();
            this.trackerCount = 0;
            AtomicReference<String> nameReference = new AtomicReference<String>();
            AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
            Client.ChunkListener chunkListener = (client, subscriptionId, offset, messageCount, dataSize) -> {
                ConsumerFlowStrategy.MessageProcessedCallback processCallback;
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(subscriptionId & 0xFF);
                if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
                    processCallback = subscriptionTracker.flowStrategy.start(new DefaultConsumerFlowStrategyContext(subscriptionId, client, messageCount));
                } else {
                    LOGGER.debug("Could not find stream subscription {} or subscription closing, not providing credits", (Object)(subscriptionId & 0xFF));
                    processCallback = null;
                }
                return processCallback;
            };
            Client.CreditNotification creditNotification = (subscriptionId, responseCode) -> {
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(subscriptionId & 0xFF);
                String stream = subscriptionTracker == null ? "?" : subscriptionTracker.stream;
                LOGGER.debug("Received credit notification for subscription {} (stream '{}'): {}", new Object[]{subscriptionId & 0xFF, stream, Utils.formatConstant(responseCode)});
            };
            Client.MessageListener messageListener = (subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext, message) -> {
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(subscriptionId & 0xFF);
                if (subscriptionTracker != null) {
                    subscriptionTracker.offset = offset;
                    subscriptionTracker.hasReceivedSomething = true;
                    subscriptionTracker.messageHandler.handle(new MessageHandlerContext(offset, chunkTimestamp, committedChunkId, subscriptionTracker.consumer, (ConsumerFlowStrategy.MessageProcessedCallback)chunkContext), message);
                } else {
                    LOGGER.debug("Could not find stream subscription {} in manager {}, node {} for message listener", new Object[]{subscriptionId, this.id, nameReference.get()});
                }
            };
            Client.MessageIgnoredListener messageIgnoredListener = (subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext) -> {
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(subscriptionId & 0xFF);
                if (subscriptionTracker != null) {
                    MessageHandlerContext messageHandlerContext = new MessageHandlerContext(offset, chunkTimestamp, committedChunkId, subscriptionTracker.consumer, (ConsumerFlowStrategy.MessageProcessedCallback)chunkContext);
                    ((ConsumerFlowStrategy.MessageProcessedCallback)chunkContext).processed(messageHandlerContext);
                } else {
                    LOGGER.debug("Could not find stream subscription {} in manager {}, node {} for message ignored listener", new Object[]{subscriptionId, this.id, nameReference.get()});
                }
            };
            Client.ShutdownListener shutdownListener = shutdownContext -> {
                if (clientInitializedInManager.get()) {
                    this.closed.set(true);
                    ConsumersCoordinator.this.managers.remove(this);
                }
                if (shutdownContext.isShutdownUnexpected()) {
                    LOGGER.debug("Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment", nameReference.get());
                    LOGGER.debug("Subscription connection has {} consumer(s) over {} stream(s) to recover", (Object)this.subscriptionTrackers.stream().filter(Objects::nonNull).count(), (Object)this.streamToStreamSubscriptions.size());
                    ConsumersCoordinator.this.environment.scheduledExecutorService().execute(Utils.namedRunnable(() -> {
                        if (Thread.currentThread().isInterrupted()) {
                            return;
                        }
                        this.subscriptionTrackers.stream().filter(Objects::nonNull).filter(t -> t.state() == SubscriptionState.ACTIVE).forEach(SubscriptionTracker::detachFromManager);
                        for (Map.Entry<String, Set<SubscriptionTracker>> entry : this.streamToStreamSubscriptions.entrySet()) {
                            if (Thread.currentThread().isInterrupted()) {
                                LOGGER.debug("Interrupting consumer re-assignment task");
                                break;
                            }
                            String stream = entry.getKey();
                            Set<SubscriptionTracker> trackersToReAssign = entry.getValue();
                            if (trackersToReAssign == null || trackersToReAssign.isEmpty()) {
                                LOGGER.debug("No consumer to re-assign to stream {} after disconnection", (Object)stream);
                                continue;
                            }
                            LOGGER.debug("Re-assigning {} consumer(s) to stream {} after disconnection", (Object)trackersToReAssign.size(), (Object)stream);
                            this.assignConsumersToStream(trackersToReAssign, stream, ConsumersCoordinator.this.recoveryBackOffDelayPolicy(), false);
                        }
                    }, "Consumers re-assignment after disconnection from %s", nameReference.get()));
                }
            };
            Client.MetadataListener metadataListener = (stream, code) -> {
                Set<SubscriptionTracker> affectedSubscriptions;
                LOGGER.debug("Received metadata notification for '{}', stream is likely to have become unavailable", (Object)stream);
                this.subscriptionManagerLock.lock();
                try {
                    Set<SubscriptionTracker> subscriptions = this.streamToStreamSubscriptions.remove(stream);
                    if (subscriptions != null && !subscriptions.isEmpty()) {
                        List<SubscriptionTracker> newSubscriptions = this.createSubscriptionTrackerList();
                        for (int i = 0; i < 256; ++i) {
                            newSubscriptions.set(i, this.subscriptionTrackers.get(i));
                        }
                        for (SubscriptionTracker subscription : subscriptions) {
                            LOGGER.debug("Subscription {} ({}) was at offset {} (received something? {})", new Object[]{subscription.subscriptionIdInClient, subscription.label(), subscription.offset, subscription.hasReceivedSomething});
                            newSubscriptions.set(subscription.subscriptionIdInClient & 0xFF, null);
                            subscription.consumer.setSubscriptionClient(null);
                        }
                        this.setSubscriptionTrackers(newSubscriptions);
                    }
                    affectedSubscriptions = subscriptions;
                }
                finally {
                    this.subscriptionManagerLock.unlock();
                }
                if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) {
                    ConsumersCoordinator.this.environment.scheduledExecutorService().execute(Utils.namedRunnable(() -> {
                        if (Thread.currentThread().isInterrupted()) {
                            return;
                        }
                        LOGGER.debug("Trying to move {} subscription(s) (stream '{}')", (Object)affectedSubscriptions.size(), (Object)stream);
                        this.assignConsumersToStream(affectedSubscriptions, stream, ConsumersCoordinator.this.metadataUpdateBackOffDelayPolicy(), true);
                    }, "Consumers re-assignment after metadata update on stream '%s'", stream));
                }
            };
            Client.ConsumerUpdateListener consumerUpdateListener = (client, subscriptionId, active) -> {
                OffsetSpecification result = null;
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(subscriptionId & 0xFF);
                if (subscriptionTracker != null) {
                    if (Utils.isSac(subscriptionTracker.subscriptionProperties)) {
                        result = subscriptionTracker.consumer.consumerUpdate(active);
                    } else {
                        LOGGER.debug("Subscription {} is not a single active consumer, nothing to do.", (Object)subscriptionId);
                    }
                } else {
                    LOGGER.debug("Could not find stream subscription {} for consumer update", (Object)subscriptionId);
                }
                return result;
            };
            String connectionName = ConsumersCoordinator.this.connectionNamingStrategy.apply(Utils.ClientConnectionType.CONSUMER);
            Utils.ClientFactoryContext clientFactoryContext = new Utils.ClientFactoryContext(clientParameters.clientProperty("connection_name", connectionName).chunkListener(chunkListener).creditNotification(creditNotification).messageListener(messageListener).messageIgnoredListener(messageIgnoredListener).shutdownListener(shutdownListener).metadataListener(metadataListener).consumerUpdateListener(consumerUpdateListener), Utils.keyForNode(targetNode), candidates.stream().map(Utils.BrokerWrapper::broker).collect(Collectors.toList()));
            this.client = ConsumersCoordinator.this.clientFactory.client(clientFactoryContext);
            this.node = Utils.brokerFromClient(this.client);
            this.name = Utils.keyForNode(this.node);
            nameReference.set(this.name);
            LOGGER.debug("creating subscription manager on {}", (Object)this.name);
            LOGGER.debug("Created consumer connection '{}'", (Object)connectionName);
            clientInitializedInManager.set(true);
        }

        private void assignConsumersToStream(Collection<SubscriptionTracker> subscriptions, String stream, BackOffDelayPolicy delayPolicy, boolean maybeCloseClient) {
            Runnable consumersClosingCallback = () -> {
                LOGGER.debug("Running consumer closing callback after recovery failure, closing {} subscription(s)", (Object)subscriptions.size());
                for (SubscriptionTracker affectedSubscription : subscriptions) {
                    try {
                        affectedSubscription.consumer.closeAfterStreamDeletion();
                    }
                    catch (Exception e) {
                        LOGGER.debug("Error while closing consumer: {}", (Object)e.getMessage());
                    }
                }
            };
            ((CompletableFuture)AsyncRetry.asyncRetry(ConsumersCoordinator.this.findCandidateNodes(stream)).description("Candidate lookup to consume from '%s'", stream).scheduler(ConsumersCoordinator.this.environment.scheduledExecutorService()).retry(ex -> !(ex instanceof StreamDoesNotExistException)).delayPolicy(delayPolicy).build().thenAccept(candidateNodes -> {
                List candidates = candidateNodes;
                if (candidates == null) {
                    LOGGER.debug("No candidate nodes to consume from '{}'", (Object)stream);
                    consumersClosingCallback.run();
                } else {
                    for (SubscriptionTracker affectedSubscription : subscriptions) {
                        this.maybeRecoverSubscription(candidates, affectedSubscription);
                    }
                    if (maybeCloseClient) {
                        this.closeIfEmpty();
                    }
                }
            })).exceptionally(ex -> {
                LOGGER.debug("Error while trying to assign {} consumer(s) to {}", new Object[]{subscriptions.size(), stream, ex});
                consumersClosingCallback.run();
                if (maybeCloseClient) {
                    this.closeIfEmpty();
                }
                return null;
            });
        }

        private List<SubscriptionTracker> createSubscriptionTrackerList() {
            ArrayList<SubscriptionTracker> newSubscriptions = new ArrayList<SubscriptionTracker>(256);
            IntStream.range(0, 256).forEach(i -> newSubscriptions.add(null));
            return newSubscriptions;
        }

        private void maybeRecoverSubscription(List<Utils.BrokerWrapper> candidates, SubscriptionTracker tracker) {
            if (tracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
                try {
                    this.recoverSubscription(candidates, tracker);
                }
                catch (Exception e) {
                    LOGGER.warn("Error while recovering consumer tracker {}. Reason: {}", (Object)tracker.label(), (Object)Utils.exceptionMessage(e));
                }
            } else {
                LOGGER.debug("Not recovering consumer tracker {}, state is {}, expected is {}", new Object[]{tracker.label(), tracker.state(), SubscriptionState.ACTIVE});
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private void recoverSubscription(List<Utils.BrokerWrapper> candidates, SubscriptionTracker tracker) {
            boolean reassignmentCompleted = false;
            while (!reassignmentCompleted) {
                try {
                    if (tracker.consumer.isOpen()) {
                        Client.Broker broker = ConsumersCoordinator.pickBroker(ConsumersCoordinator.this.brokerPicker, candidates);
                        LOGGER.debug("Using {} to resume consuming from {}", (Object)broker, (Object)tracker.stream);
                        tracker.consumer.lock();
                        try {
                            if (!tracker.consumer.isOpen()) return;
                            OffsetSpecification offsetSpecification = tracker.hasReceivedSomething ? OffsetSpecification.offset(tracker.offset) : tracker.initialOffsetSpecification;
                            ConsumersCoordinator.this.addToManager(broker, candidates, tracker, offsetSpecification, false);
                            return;
                        }
                        finally {
                            tracker.consumer.unlock();
                        }
                    } else {
                        LOGGER.debug("Not re-assigning consumer {} (stream '{}') because it has been closed", (Object)tracker.consumer.id(), (Object)tracker.stream);
                    }
                    return;
                }
                catch (StreamNotAvailableException | ConnectionStreamException | ClientClosedException e) {
                    LOGGER.debug("Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, refreshing candidates and retrying", (Object)tracker.consumer.id(), (Object)tracker.stream);
                    candidates = Utils.callAndMaybeRetry(ConsumersCoordinator.this.findCandidateNodes(tracker.stream), ex -> !(ex instanceof StreamDoesNotExistException), ConsumersCoordinator.this.recoveryBackOffDelayPolicy(), "Candidate lookup to consume from '%s' (subscription recovery)", tracker.stream);
                }
                catch (StreamException e) {
                    LOGGER.warn("Stream error while re-assigning subscription from stream {} (name {})", new Object[]{tracker.stream, tracker.offsetTrackingReference, e});
                    if (e.getCode() == 3) {
                        LOGGER.debug("Subscription ID already existing, retrying");
                        continue;
                    }
                    LOGGER.debug("Not re-assigning consumer '{}' because of '{}'", (Object)tracker.label(), (Object)e.getMessage());
                    return;
                }
                catch (Exception e) {
                    LOGGER.warn("Error while re-assigning subscription from stream {} (name {})", new Object[]{tracker.stream, tracker.offsetTrackingReference, e});
                    LOGGER.debug("Not re-assigning consumer '{}' because of '{}'", (Object)tracker.label(), (Object)e.getMessage());
                    return;
                }
            }
        }

        private void checkNotClosed() {
            if (!this.client.isOpen()) {
                throw new ClientClosedException();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void add(SubscriptionTracker subscriptionTracker, OffsetSpecification offsetSpecification, boolean isInitialSubscription) {
            this.subscriptionManagerLock.lock();
            try {
                if (this.isFull()) {
                    LOGGER.debug("Cannot add subscription tracker for stream '{}', manager is full", (Object)subscriptionTracker.stream);
                    throw new IllegalStateException("Cannot add subscription tracker, the manager is full");
                }
                if (this.isClosed()) {
                    LOGGER.debug("Cannot add subscription tracker for stream '{}', manager is closed", (Object)subscriptionTracker.stream);
                    throw new IllegalStateException("Cannot add subscription tracker, the manager is closed");
                }
                this.checkNotClosed();
                byte subscriptionId = (byte)ConsumersCoordinator.pickSlot(this.subscriptionTrackers, this.consumerIndexSequence);
                List<SubscriptionTracker> previousSubscriptions = this.subscriptionTrackers;
                LOGGER.debug("Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}, subscription ID is {}", new Object[]{subscriptionTracker.stream, offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification, subscriptionTracker.offsetTrackingReference, subscriptionTracker.subscriptionProperties, subscriptionId});
                try {
                    subscriptionTracker.assign(subscriptionId, this);
                    this.streamToStreamSubscriptions.computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet()).add(subscriptionTracker);
                    this.setSubscriptionTrackers(this.update(previousSubscriptions, subscriptionId, subscriptionTracker));
                    String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
                    if (offsetTrackingReference != null) {
                        this.checkNotClosed();
                        Client.QueryOffsetResponse queryOffsetResponse = Utils.callAndMaybeRetry(() -> this.client.queryOffset(offsetTrackingReference, subscriptionTracker.stream), RETRY_ON_TIMEOUT, "Offset query for consumer %s on stream '%s' (reference %s)", subscriptionTracker.consumer.id(), subscriptionTracker.stream, offsetTrackingReference);
                        if (queryOffsetResponse.isOk() && queryOffsetResponse.getOffset() != 0L) {
                            if (offsetSpecification != null && isInitialSubscription) {
                                LOGGER.info("Requested offset specification {} not used in favor of stored offset found for reference {}", (Object)offsetSpecification, (Object)offsetTrackingReference);
                            }
                            LOGGER.debug("Using offset {} to start consuming from {} with consumer {} (instead of {})", new Object[]{queryOffsetResponse.getOffset(), subscriptionTracker.stream, offsetTrackingReference, offsetSpecification});
                            offsetSpecification = OffsetSpecification.offset(queryOffsetResponse.getOffset() + 1L);
                        }
                    }
                    offsetSpecification = offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
                    DefaultSubscriptionContext subscriptionContext = new DefaultSubscriptionContext(offsetSpecification, subscriptionTracker.stream);
                    subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
                    LOGGER.info("Computed offset specification {}, offset specification used after subscription listener {}", (Object)offsetSpecification, (Object)subscriptionContext.offsetSpecification());
                    this.checkNotClosed();
                    Client.Response subscribeResponse = Utils.callAndMaybeRetry(() -> this.client.subscribe(subscriptionId, subscriptionTracker.stream, subscriptionContext.offsetSpecification(), subscriptionTracker.flowStrategy.initialCredits(), subscriptionTracker.subscriptionProperties), RETRY_ON_TIMEOUT, "Subscribe request for consumer %s on stream '%s'", subscriptionTracker.consumer.id(), subscriptionTracker.stream);
                    if (!subscribeResponse.isOk()) {
                        String message = "Subscription to stream " + subscriptionTracker.stream + " failed with code " + Utils.formatConstant(subscribeResponse.getResponseCode());
                        LOGGER.debug(message);
                        if (subscribeResponse.getResponseCode() == 3 && LOGGER.isDebugEnabled()) {
                            SubscriptionTracker initialTracker = previousSubscriptions.get(subscriptionId);
                            LOGGER.debug("Subscription ID already exists");
                            LOGGER.debug("Initial tracker with sub ID {}: consumer {}, stream {}, name {}", new Object[]{subscriptionId, initialTracker.consumer.id(), initialTracker.stream, initialTracker.offsetTrackingReference});
                        }
                        throw Utils.convertCodeToException(subscribeResponse.getResponseCode(), subscriptionTracker.stream, () -> message);
                    }
                }
                catch (RuntimeException e) {
                    subscriptionTracker.assign((byte)-1, null);
                    this.setSubscriptionTrackers(previousSubscriptions);
                    this.streamToStreamSubscriptions.computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet()).remove(subscriptionTracker);
                    this.maybeCleanStreamToStreamSubscriptions(subscriptionTracker.stream);
                    throw e;
                }
                subscriptionTracker.state(SubscriptionState.ACTIVE);
                LOGGER.debug("Subscribed to '{}'", (Object)subscriptionTracker.stream);
            }
            finally {
                this.subscriptionManagerLock.unlock();
            }
        }

        private void maybeCleanStreamToStreamSubscriptions(String stream) {
            this.streamToStreamSubscriptions.compute(stream, (s, trackers) -> {
                if (trackers == null || trackers.isEmpty()) {
                    return null;
                }
                return trackers;
            });
        }

        void remove(SubscriptionTracker subscriptionTracker) {
            Utils.lock(this.subscriptionManagerLock, () -> {
                byte subscriptionIdInClient = subscriptionTracker.subscriptionIdInClient;
                try {
                    Client.Response unsubscribeResponse = Utils.callAndMaybeRetry(() -> {
                        if (this.client.isOpen()) {
                            return this.client.unsubscribe(subscriptionIdInClient);
                        }
                        return Client.responseOk();
                    }, RETRY_ON_TIMEOUT, "Unsubscribe request for consumer %d on stream '%s'", subscriptionTracker.consumer.id(), subscriptionTracker.stream);
                    if (!unsubscribeResponse.isOk()) {
                        LOGGER.warn("Unexpected response code when unsubscribing from {}: {} (subscription ID {})", new Object[]{subscriptionTracker.stream, Utils.formatConstant(unsubscribeResponse.getResponseCode()), subscriptionIdInClient});
                    }
                }
                catch (TimeoutStreamException e) {
                    LOGGER.debug("Reached timeout when trying to unsubscribe consumer {} from stream '{}'", (Object)subscriptionTracker.consumer.id(), (Object)subscriptionTracker.stream);
                }
                this.setSubscriptionTrackers(this.update(this.subscriptionTrackers, subscriptionIdInClient, null));
                this.streamToStreamSubscriptions.compute(subscriptionTracker.stream, (stream, subscriptionsForThisStream) -> {
                    if (subscriptionsForThisStream == null || subscriptionsForThisStream.isEmpty()) {
                        return null;
                    }
                    subscriptionsForThisStream.remove(subscriptionTracker);
                    return subscriptionsForThisStream.isEmpty() ? null : subscriptionsForThisStream;
                });
                this.closeIfEmpty();
            });
        }

        private List<SubscriptionTracker> update(List<SubscriptionTracker> original, byte index, SubscriptionTracker newValue) {
            List<SubscriptionTracker> newSubcriptions = this.createSubscriptionTrackerList();
            int intIndex = index & 0xFF;
            for (int i = 0; i < 256; ++i) {
                newSubcriptions.set(i, i == intIndex ? newValue : original.get(i));
            }
            return newSubcriptions;
        }

        private void setSubscriptionTrackers(List<SubscriptionTracker> trackers) {
            this.subscriptionTrackers = trackers;
            this.trackerCount = (int)this.subscriptionTrackers.stream().filter(Objects::nonNull).count();
        }

        boolean isFull() {
            return this.trackerCount == ConsumersCoordinator.this.maxConsumersByConnection;
        }

        boolean isEmpty() {
            return this.trackerCount == 0;
        }

        boolean isClosed() {
            if (!this.client.isOpen()) {
                this.close();
            }
            return this.closed.get();
        }

        void closeIfEmpty() {
            Utils.lock(this.subscriptionManagerLock, () -> {
                if (this.isEmpty()) {
                    this.close();
                }
            });
        }

        void close() {
            Utils.lock(this.subscriptionManagerLock, () -> {
                if (this.closed.compareAndSet(false, true)) {
                    ConsumersCoordinator.this.managers.remove(this);
                    LOGGER.debug("Closing consumer subscription manager on {}, id {}", (Object)this.name, (Object)this.id);
                    if (this.client != null && this.client.isOpen()) {
                        for (int i = 0; i < this.subscriptionTrackers.size(); ++i) {
                            SubscriptionTracker tracker = this.subscriptionTrackers.get(i);
                            if (tracker == null) continue;
                            try {
                                if (this.client.isOpen() && tracker.consumer.isOpen()) {
                                    this.client.unsubscribe(tracker.subscriptionIdInClient);
                                }
                            }
                            catch (Exception e) {
                                LOGGER.debug("Error while unsubscribing from {}, registration {}", (Object)tracker.stream, (Object)tracker.subscriptionIdInClient);
                            }
                            this.subscriptionTrackers.set(i, null);
                        }
                        this.streamToStreamSubscriptions.clear();
                        if (this.client.isOpen()) {
                            this.client.close();
                        }
                    }
                }
            });
        }

        @Override
        public int compareTo(ClientSubscriptionsManager o) {
            return Long.compare(this.id, o.id);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ClientSubscriptionsManager that = (ClientSubscriptionsManager)o;
            return this.id == that.id;
        }

        public int hashCode() {
            return Objects.hash(this.id);
        }
    }

    private static final class MessageHandlerContext
    implements MessageHandler.Context {
        private final long offset;
        private final long timestamp;
        private final long committedOffset;
        private final StreamConsumer consumer;
        private final ConsumerFlowStrategy.MessageProcessedCallback processedCallback;

        private MessageHandlerContext(long offset, long timestamp, long committedOffset, StreamConsumer consumer, ConsumerFlowStrategy.MessageProcessedCallback processedCallback) {
            this.offset = offset;
            this.timestamp = timestamp;
            this.committedOffset = committedOffset;
            this.consumer = consumer;
            this.processedCallback = processedCallback;
        }

        @Override
        public long offset() {
            return this.offset;
        }

        @Override
        public void storeOffset() {
            this.consumer.store(this.offset);
        }

        @Override
        public long timestamp() {
            return this.timestamp;
        }

        @Override
        public long committedChunkId() {
            return this.committedOffset;
        }

        @Override
        public String stream() {
            return this.consumer.stream();
        }

        @Override
        public Consumer consumer() {
            return this.consumer;
        }

        @Override
        public void processed() {
            this.processedCallback.processed(this);
        }
    }

    private static enum SubscriptionState {
        OPENING,
        ACTIVE,
        RECOVERING,
        CLOSED;

    }

    private static class SubscriptionTracker {
        private final long id;
        private final String stream;
        private final OffsetSpecification initialOffsetSpecification;
        private final String offsetTrackingReference;
        private final MessageHandler messageHandler;
        private final StreamConsumer consumer;
        private final SubscriptionListener subscriptionListener;
        private final Runnable trackingClosingCallback;
        private final Map<String, String> subscriptionProperties;
        private volatile long offset;
        private volatile boolean hasReceivedSomething = false;
        private volatile byte subscriptionIdInClient;
        private volatile ClientSubscriptionsManager manager;
        private final AtomicReference<SubscriptionState> state = new AtomicReference<SubscriptionState>(SubscriptionState.OPENING);
        private final ConsumerFlowStrategy flowStrategy;
        private final Lock subscriptionTrackerLock = new ReentrantLock();

        private SubscriptionTracker(long id, StreamConsumer consumer, String stream, OffsetSpecification initialOffsetSpecification, String offsetTrackingReference, SubscriptionListener subscriptionListener, Runnable trackingClosingCallback, MessageHandler messageHandler, Map<String, String> subscriptionProperties, ConsumerFlowStrategy flowStrategy) {
            this.id = id;
            this.consumer = consumer;
            this.stream = stream;
            this.initialOffsetSpecification = initialOffsetSpecification;
            this.offsetTrackingReference = offsetTrackingReference;
            this.subscriptionListener = subscriptionListener;
            this.trackingClosingCallback = trackingClosingCallback;
            this.messageHandler = messageHandler;
            this.flowStrategy = flowStrategy;
            if (this.offsetTrackingReference == null) {
                this.subscriptionProperties = subscriptionProperties;
            } else {
                ConcurrentHashMap<String, String> properties = new ConcurrentHashMap<String, String>(subscriptionProperties.size() + 1);
                properties.putAll(subscriptionProperties);
                properties.put("name", this.offsetTrackingReference);
                this.subscriptionProperties = Collections.unmodifiableMap(properties);
            }
        }

        void cancel() {
            Utils.lock(this.subscriptionTrackerLock, () -> {
                LOGGER.debug("Calling tracking consumer closing callback (may be no-op)");
                this.trackingClosingCallback.run();
                if (this.manager != null) {
                    LOGGER.debug("Removing tracker {} from manager", (Object)this.label());
                    this.manager.remove(this);
                } else {
                    LOGGER.debug("No manager to remove consumer from");
                }
                this.state(SubscriptionState.CLOSED);
            });
        }

        void assign(byte subscriptionIdInClient, ClientSubscriptionsManager manager) {
            Utils.lock(this.subscriptionTrackerLock, () -> {
                this.subscriptionIdInClient = subscriptionIdInClient;
                this.manager = manager;
                if (this.manager == null) {
                    if (this.consumer != null) {
                        this.consumer.setSubscriptionClient(null);
                    }
                } else {
                    this.consumer.setSubscriptionClient(this.manager.client);
                }
            });
        }

        void detachFromManager() {
            Utils.lock(this.subscriptionTrackerLock, () -> {
                this.manager = null;
                this.consumer.setSubscriptionClient(null);
            });
        }

        void state(SubscriptionState state) {
            this.state.set(state);
        }

        boolean compareAndSet(SubscriptionState expected, SubscriptionState newValue) {
            return this.state.compareAndSet(expected, newValue);
        }

        SubscriptionState state() {
            return this.state.get();
        }

        String label() {
            return String.format("[id=%d, stream=%s, name=%s, consumer=%d]", this.id, this.stream, this.offsetTrackingReference, this.consumer.id());
        }
    }
}

