/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.AsyncRetry;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.StreamConsumer;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumersCoordinator {
    static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256;
    private static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next();
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumersCoordinator.class);
    private final Random random = new Random();
    private final StreamEnvironment environment;
    private final Map<String, ManagerPool> pools = new ConcurrentHashMap<String, ManagerPool>();
    private final Utils.ClientFactory clientFactory;
    private final int maxConsumersByConnection;

    ConsumersCoordinator(StreamEnvironment environment, int maxConsumersByConnection, Utils.ClientFactory clientFactory) {
        this.environment = environment;
        this.clientFactory = clientFactory;
        this.maxConsumersByConnection = maxConsumersByConnection;
    }

    private static String keyForClientSubscription(Client.Broker broker) {
        return broker.getHost() + ":" + broker.getPort();
    }

    private BackOffDelayPolicy metadataUpdateBackOffDelayPolicy() {
        return this.environment.topologyUpdateBackOffDelayPolicy();
    }

    Runnable subscribe(StreamConsumer consumer, String stream, OffsetSpecification offsetSpecification, String trackingReference, MessageHandler messageHandler) {
        List<Client.Broker> candidates = this.findBrokersForStream(stream);
        Client.Broker newNode = this.pickBroker(candidates);
        if (newNode == null) {
            throw new IllegalStateException("No available node to subscribe to");
        }
        SubscriptionTracker subscriptionTracker = new SubscriptionTracker(consumer, stream, trackingReference, messageHandler);
        String key = ConsumersCoordinator.keyForClientSubscription(newNode);
        ManagerPool managerPool = this.pools.computeIfAbsent(key, s -> new ManagerPool(key, this.environment.clientParametersCopy().host(newNode.getHost()).port(newNode.getPort())));
        managerPool.add(subscriptionTracker, offsetSpecification, true);
        return subscriptionTracker::cancel;
    }

    private Client locator() {
        return this.environment.locator();
    }

    List<Client.Broker> findBrokersForStream(String stream) {
        List<Client.Broker> brokers;
        Map<String, Client.StreamMetadata> metadata = this.locator().metadata(stream);
        if (metadata.size() == 0 || metadata.get(stream) == null) {
            throw new StreamDoesNotExistException(stream);
        }
        Client.StreamMetadata streamMetadata = metadata.get(stream);
        if (!streamMetadata.isResponseOk()) {
            if (streamMetadata.getResponseCode() == 2) {
                throw new StreamDoesNotExistException(stream);
            }
            throw new IllegalStateException("Could not get stream metadata, response code: " + Utils.formatConstant(streamMetadata.getResponseCode()));
        }
        List<Client.Broker> replicas = streamMetadata.getReplicas();
        if ((replicas == null || replicas.isEmpty()) && streamMetadata.getLeader() == null) {
            throw new IllegalStateException("No node available to consume from stream " + stream);
        }
        if (replicas == null || replicas.isEmpty()) {
            brokers = Collections.singletonList(streamMetadata.getLeader());
            LOGGER.debug("Consuming from {} on leader node {}", (Object)stream, (Object)streamMetadata.getLeader());
        } else {
            LOGGER.debug("Replicas for consuming from {}: {}", (Object)stream, replicas);
            brokers = new ArrayList<Client.Broker>(replicas);
        }
        LOGGER.debug("Candidates to consume from {}: {}", (Object)stream, brokers);
        return brokers;
    }

    private Client.Broker pickBroker(List<Client.Broker> brokers) {
        if (brokers.isEmpty()) {
            return null;
        }
        if (brokers.size() == 1) {
            return brokers.get(0);
        }
        return brokers.get(this.random.nextInt(brokers.size()));
    }

    public void close() {
        for (ManagerPool subscriptionPool : this.pools.values()) {
            subscriptionPool.close();
        }
    }

    int poolSize() {
        return this.pools.size();
    }

    public String toString() {
        return ("[ \n" + this.pools.entrySet().stream().map(poolEntry -> "  { 'broker' : '" + (String)poolEntry.getKey() + "', 'clients' : [ " + ((ManagerPool)poolEntry.getValue()).managers.stream().map(manager -> "{ 'consumer_count' : " + ((ClientSubscriptionsManager)manager).subscriptionTrackers.stream().filter(Objects::nonNull).count() + " }").collect(Collectors.joining(", ")) + " ] }").collect(Collectors.joining(", \n")) + "\n]").replace("'", "\"");
    }

    private class ClientSubscriptionsManager {
        private final Client client;
        private final Map<String, Set<SubscriptionTracker>> streamToStreamSubscriptions = new ConcurrentHashMap<String, Set<SubscriptionTracker>>();
        private final ManagerPool owner;
        private volatile List<SubscriptionTracker> subscriptionTrackers = new ArrayList<SubscriptionTracker>(ConsumersCoordinator.access$500(ConsumersCoordinator.this));

        private ClientSubscriptionsManager(ManagerPool owner, Client.ClientParameters clientParameters) {
            this.owner = owner;
            String name = owner.name;
            LOGGER.debug("creating subscription manager on {}", (Object)name);
            IntStream.range(0, ConsumersCoordinator.this.maxConsumersByConnection).forEach(i -> this.subscriptionTrackers.add(null));
            AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
            Client.ChunkListener chunkListener = (client, subscriptionId, offset, messageCount, dataSize) -> {
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(subscriptionId & 0xFF);
                if (subscriptionTracker != null && !subscriptionTracker.isClosing()) {
                    client.credit(subscriptionId, 1);
                } else {
                    LOGGER.debug("Could not find stream subscription {} or subscription closing, not providing credits", (Object)(subscriptionId & 0xFF));
                }
            };
            Client.CreditNotification creditNotification = (subscriptionId, responseCode) -> LOGGER.debug("Received credit notification for subscription {}: {}", (Object)(subscriptionId & 0xFF), (Object)Utils.formatConstant(responseCode));
            Client.MessageListener messageListener = (subscriptionId, offset, message) -> {
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(subscriptionId & 0xFF);
                if (subscriptionTracker != null) {
                    subscriptionTracker.offset = offset;
                    subscriptionTracker.messageHandler.handle(new MessageHandlerContext(offset, subscriptionTracker.consumer), message);
                } else {
                    LOGGER.debug("Could not find stream subscription {}", (Object)subscriptionId);
                }
            };
            Client.ShutdownListener shutdownListener = shutdownContext -> {
                if (clientInitializedInManager.get()) {
                    owner.remove(this);
                }
                if (shutdownContext.isShutdownUnexpected()) {
                    LOGGER.debug("Unexpected shutdown notification on subscription client {}, scheduling consumers re-assignment", (Object)name);
                    ConsumersCoordinator.this.environment.scheduledExecutorService().execute(() -> {
                        if (Thread.currentThread().isInterrupted()) {
                            return;
                        }
                        this.subscriptionTrackers.stream().filter(Objects::nonNull).forEach(SubscriptionTracker::detachFromManager);
                        for (Map.Entry<String, Set<SubscriptionTracker>> entry : this.streamToStreamSubscriptions.entrySet()) {
                            if (Thread.currentThread().isInterrupted()) break;
                            String stream = entry.getKey();
                            LOGGER.debug("Re-assigning {} consumer(s) to stream {} after disconnection", (Object)entry.getValue().size(), (Object)stream);
                            this.assignConsumersToStream((Collection<SubscriptionTracker>)entry.getValue(), stream, attempt -> ConsumersCoordinator.this.environment.recoveryBackOffDelayPolicy().delay(attempt), false);
                        }
                    });
                }
            };
            Client.MetadataListener metadataListener = (stream, code) -> {
                Set<SubscriptionTracker> affectedSubscriptions;
                LOGGER.debug("Received metadata notification for {}, stream is likely to have become unavailable", (Object)stream);
                ClientSubscriptionsManager clientSubscriptionsManager = this;
                synchronized (clientSubscriptionsManager) {
                    Set<SubscriptionTracker> subscriptions = this.streamToStreamSubscriptions.remove(stream);
                    if (subscriptions != null && !subscriptions.isEmpty()) {
                        ArrayList<SubscriptionTracker> newSubscriptions = new ArrayList<SubscriptionTracker>(ConsumersCoordinator.this.maxConsumersByConnection);
                        for (int i = 0; i < ConsumersCoordinator.this.maxConsumersByConnection; ++i) {
                            newSubscriptions.add(this.subscriptionTrackers.get(i));
                        }
                        for (SubscriptionTracker subscription : subscriptions) {
                            LOGGER.debug("Subscription {} was at offset {}", (Object)subscription.subscriptionIdInClient, (Object)subscription.offset);
                            newSubscriptions.set(subscription.subscriptionIdInClient & 0xFF, null);
                        }
                        this.subscriptionTrackers = newSubscriptions;
                    }
                    affectedSubscriptions = subscriptions;
                }
                if (this.isEmpty()) {
                    this.owner.remove(this);
                }
                if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) {
                    ConsumersCoordinator.this.environment.scheduledExecutorService().execute(() -> {
                        if (Thread.currentThread().isInterrupted()) {
                            return;
                        }
                        LOGGER.debug("Trying to move {} subscription(s) (stream {})", (Object)affectedSubscriptions.size(), (Object)stream);
                        this.assignConsumersToStream(affectedSubscriptions, stream, ConsumersCoordinator.this.metadataUpdateBackOffDelayPolicy(), this.isEmpty());
                    });
                }
            };
            Utils.ClientFactoryContext clientFactoryContext = Utils.ClientFactoryContext.fromParameters(clientParameters.clientProperty("connection_name", "rabbitmq-stream-consumer").chunkListener(chunkListener).creditNotification(creditNotification).messageListener(messageListener).shutdownListener(shutdownListener).metadataListener(metadataListener)).key(owner.name);
            this.client = ConsumersCoordinator.this.clientFactory.client(clientFactoryContext);
            clientInitializedInManager.set(true);
        }

        private void assignConsumersToStream(Collection<SubscriptionTracker> subscriptions, String stream, BackOffDelayPolicy delayPolicy, boolean closeClient) {
            Runnable consumersClosingCallback = () -> {
                for (SubscriptionTracker affectedSubscription : subscriptions) {
                    try {
                        affectedSubscription.consumer.closeAfterStreamDeletion();
                    }
                    catch (Exception e) {
                        LOGGER.debug("Error while closing consumer: {}", (Object)e.getMessage());
                    }
                }
            };
            ((CompletableFuture)AsyncRetry.asyncRetry(() -> ConsumersCoordinator.this.findBrokersForStream(stream)).description("Candidate lookup to consume from " + stream).scheduler(ConsumersCoordinator.this.environment.scheduledExecutorService()).retry(ex -> !(ex instanceof StreamDoesNotExistException)).delayPolicy(delayPolicy).build().thenAccept(candidates -> {
                if (candidates == null) {
                    consumersClosingCallback.run();
                } else {
                    for (SubscriptionTracker affectedSubscription : subscriptions) {
                        try {
                            if (affectedSubscription.consumer.isOpen()) {
                                Client.Broker broker = ConsumersCoordinator.this.pickBroker(candidates);
                                LOGGER.debug("Using {} to resume consuming from {}", (Object)broker, (Object)stream);
                                String key = ConsumersCoordinator.keyForClientSubscription(broker);
                                ManagerPool subscriptionPool = ConsumersCoordinator.this.pools.computeIfAbsent(key, s -> new ManagerPool(key, ConsumersCoordinator.this.environment.clientParametersCopy().host(broker.getHost()).port(broker.getPort())));
                                StreamConsumer streamConsumer = affectedSubscription.consumer;
                                synchronized (streamConsumer) {
                                    if (affectedSubscription.consumer.isOpen()) {
                                        subscriptionPool.add(affectedSubscription, OffsetSpecification.offset(affectedSubscription.offset), false);
                                    }
                                    continue;
                                }
                            }
                            LOGGER.debug("Not re-assigning consumer because it has been closed");
                        }
                        catch (Exception e) {
                            LOGGER.warn("Error while re-assigning subscription from stream {}", (Object)stream, (Object)e);
                        }
                    }
                    if (closeClient) {
                        this.close();
                    }
                }
            })).exceptionally(ex -> {
                LOGGER.debug("Error while trying to assign {} consumer(s) to {}", new Object[]{subscriptions.size(), stream, ex});
                consumersClosingCallback.run();
                return null;
            });
        }

        synchronized void add(SubscriptionTracker subscriptionTracker, OffsetSpecification offsetSpecification, boolean isSubcription) {
            byte subscriptionId = 0;
            for (int i = 0; i < 256; ++i) {
                if (this.subscriptionTrackers.get(i) != null) continue;
                subscriptionId = (byte)i;
                break;
            }
            List<SubscriptionTracker> previousSubscriptions = this.subscriptionTrackers;
            LOGGER.debug("Subscribing to {}, requested offset specification is {}, offset tracking reference is {}", new Object[]{subscriptionTracker.stream, offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification, subscriptionTracker.offsetTrackingReference});
            try {
                Client.Response subscribeResponse;
                long trackedOffset;
                subscriptionTracker.assign(subscriptionId, this);
                this.streamToStreamSubscriptions.computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet()).add(subscriptionTracker);
                this.subscriptionTrackers = this.update(previousSubscriptions, subscriptionId, subscriptionTracker);
                String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
                if (subscriptionTracker.offsetTrackingReference != null && (trackedOffset = this.client.queryOffset(offsetTrackingReference, subscriptionTracker.stream)) != 0L) {
                    if (offsetSpecification != null && isSubcription) {
                        LOGGER.info("Requested offset specification {} not used in favor of stored offset found for reference {}", (Object)offsetSpecification, (Object)offsetTrackingReference);
                    }
                    LOGGER.debug("Using offset {} to start consuming from {} with consumer {} (instead of {})", new Object[]{trackedOffset, subscriptionTracker.stream, offsetTrackingReference, offsetSpecification});
                    offsetSpecification = OffsetSpecification.offset(trackedOffset + 1L);
                }
                offsetSpecification = offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
                Map<String, String> subscriptionProperties = Collections.emptyMap();
                if (subscriptionTracker.offsetTrackingReference != null) {
                    subscriptionProperties = new HashMap(1);
                    subscriptionProperties.put("name", subscriptionTracker.offsetTrackingReference);
                }
                if (!(subscribeResponse = this.client.subscribe(subscriptionId, subscriptionTracker.stream, offsetSpecification, 10, subscriptionProperties)).isOk()) {
                    String message = "Subscription to stream " + subscriptionTracker.stream + " failed with code " + Utils.formatConstant(subscribeResponse.getResponseCode());
                    LOGGER.debug(message);
                    throw new StreamException(message);
                }
            }
            catch (RuntimeException e) {
                subscriptionTracker.assign((byte)-1, null);
                this.subscriptionTrackers = previousSubscriptions;
                this.streamToStreamSubscriptions.computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet()).remove(subscriptionTracker);
                throw e;
            }
            LOGGER.debug("Subscribed to {}", (Object)subscriptionTracker.stream);
        }

        synchronized void remove(SubscriptionTracker subscriptionTracker) {
            byte subscriptionIdInClient = subscriptionTracker.subscriptionIdInClient;
            Client.Response unsubscribeResponse = this.client.unsubscribe(subscriptionIdInClient);
            if (!unsubscribeResponse.isOk()) {
                LOGGER.warn("Unexpected response code when unsubscribing from {}: {} (subscription ID {})", new Object[]{subscriptionTracker.stream, Utils.formatConstant(unsubscribeResponse.getResponseCode()), subscriptionIdInClient});
            }
            this.subscriptionTrackers = this.update(this.subscriptionTrackers, subscriptionIdInClient, null);
            this.streamToStreamSubscriptions.compute(subscriptionTracker.stream, (stream, subscriptionsForThisStream) -> {
                if (subscriptionsForThisStream == null || subscriptionsForThisStream.isEmpty()) {
                    return null;
                }
                subscriptionsForThisStream.remove(subscriptionTracker);
                return subscriptionsForThisStream.isEmpty() ? null : subscriptionsForThisStream;
            });
            this.owner.maybeDisposeManager(this);
        }

        private List<SubscriptionTracker> update(List<SubscriptionTracker> original, byte index, SubscriptionTracker newValue) {
            ArrayList<SubscriptionTracker> newSubcriptions = new ArrayList<SubscriptionTracker>(ConsumersCoordinator.this.maxConsumersByConnection);
            int intIndex = index & 0xFF;
            for (int i = 0; i < ConsumersCoordinator.this.maxConsumersByConnection; ++i) {
                newSubcriptions.add(i == intIndex ? newValue : original.get(i));
            }
            return newSubcriptions;
        }

        synchronized boolean isFull() {
            return this.trackersCount() == ConsumersCoordinator.this.maxConsumersByConnection;
        }

        synchronized boolean isEmpty() {
            return this.trackersCount() == 0;
        }

        private synchronized int trackersCount() {
            return (int)this.subscriptionTrackers.stream().filter(Objects::nonNull).count();
        }

        synchronized void close() {
            if (this.client != null && this.client.isOpen()) {
                this.subscriptionTrackers.stream().filter(Objects::nonNull).forEach(tracker -> {
                    try {
                        if (this.client != null && this.client.isOpen() && ((SubscriptionTracker)tracker).consumer.isOpen()) {
                            this.client.unsubscribe(((SubscriptionTracker)tracker).subscriptionIdInClient);
                        }
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                });
                this.streamToStreamSubscriptions.clear();
                this.subscriptionTrackers.clear();
                if (this.client != null && this.client.isOpen()) {
                    this.client.close();
                }
            }
        }
    }

    private class ManagerPool {
        private final List<ClientSubscriptionsManager> managers = new CopyOnWriteArrayList<ClientSubscriptionsManager>();
        private final String name;
        private final Client.ClientParameters clientParameters;

        private ManagerPool(String name, Client.ClientParameters clientParameters) {
            this.name = name;
            this.clientParameters = clientParameters;
            LOGGER.debug("Creating client subscription pool on {}", (Object)name);
            this.managers.add(new ClientSubscriptionsManager(this, clientParameters));
        }

        private synchronized void add(SubscriptionTracker subscriptionTracker, OffsetSpecification offsetSpecification, boolean isSubscription) {
            boolean added = false;
            for (ClientSubscriptionsManager manager : this.managers) {
                if (manager.isFull()) continue;
                manager.add(subscriptionTracker, offsetSpecification, isSubscription);
                added = true;
                break;
            }
            if (!added) {
                LOGGER.debug("Creating subscription manager on {}, this is subscription manager #{}", (Object)this.name, (Object)(this.managers.size() + 1));
                ClientSubscriptionsManager manager = new ClientSubscriptionsManager(this, this.clientParameters);
                this.managers.add(manager);
                manager.add(subscriptionTracker, offsetSpecification, isSubscription);
            }
        }

        private synchronized void maybeDisposeManager(ClientSubscriptionsManager clientSubscriptionsManager) {
            if (clientSubscriptionsManager.isEmpty()) {
                clientSubscriptionsManager.close();
                this.remove(clientSubscriptionsManager);
            }
        }

        private synchronized void remove(ClientSubscriptionsManager clientSubscriptionsManager) {
            this.managers.remove(clientSubscriptionsManager);
            if (this.managers.isEmpty()) {
                ConsumersCoordinator.this.pools.remove(this.name);
                LOGGER.debug("Disposed client subscription pool on {} because it was empty", (Object)this.name);
            }
        }

        synchronized void close() {
            for (ClientSubscriptionsManager manager : this.managers) {
                manager.close();
            }
            this.managers.clear();
        }
    }

    private static final class MessageHandlerContext
    implements MessageHandler.Context {
        private final long offset;
        private final Consumer consumer;

        private MessageHandlerContext(long offset, Consumer consumer) {
            this.offset = offset;
            this.consumer = consumer;
        }

        @Override
        public long offset() {
            return this.offset;
        }

        @Override
        public void storeOffset() {
            this.consumer.store(this.offset);
        }

        @Override
        public Consumer consumer() {
            return this.consumer;
        }
    }

    private static class SubscriptionTracker {
        private final String stream;
        private final String offsetTrackingReference;
        private final MessageHandler messageHandler;
        private final StreamConsumer consumer;
        private volatile long offset;
        private volatile byte subscriptionIdInClient;
        private volatile ClientSubscriptionsManager manager;
        private volatile boolean closing = false;

        private SubscriptionTracker(StreamConsumer consumer, String stream, String offsetTrackingReference, MessageHandler messageHandler) {
            this.consumer = consumer;
            this.stream = stream;
            this.offsetTrackingReference = offsetTrackingReference;
            this.messageHandler = messageHandler;
        }

        synchronized void cancel() {
            this.closing = true;
            if (this.manager != null) {
                LOGGER.debug("Removing consumer from manager");
                this.manager.remove(this);
            } else {
                LOGGER.debug("No manager to remove consumer from");
            }
        }

        boolean isClosing() {
            return this.closing;
        }

        synchronized void assign(byte subscriptionIdInClient, ClientSubscriptionsManager manager) {
            this.subscriptionIdInClient = subscriptionIdInClient;
            this.manager = manager;
        }

        synchronized void detachFromManager() {
            this.manager = null;
        }
    }
}

