/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.AddressResolver;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.ConsumerFlowStrategy;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.NoOffsetException;
import com.rabbitmq.stream.ObservationCollector;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ProducerBuilder;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamStats;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.AsyncRetry;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Clock;
import com.rabbitmq.stream.impl.Codecs;
import com.rabbitmq.stream.impl.ConsumersCoordinator;
import com.rabbitmq.stream.impl.DefaultExecutorServiceFactory;
import com.rabbitmq.stream.impl.ExecutorServiceFactory;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator;
import com.rabbitmq.stream.impl.ProducersCoordinator;
import com.rabbitmq.stream.impl.StreamConsumer;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder;
import com.rabbitmq.stream.impl.StreamProducer;
import com.rabbitmq.stream.impl.StreamProducerBuilder;
import com.rabbitmq.stream.impl.StreamStreamCreator;
import com.rabbitmq.stream.impl.ThreadUtils;
import com.rabbitmq.stream.impl.Utils;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.UsernamePasswordCredentialsProvider;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.URLDecoder;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamEnvironment
implements Environment {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class);
    private final EventLoopGroup eventLoopGroup;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ScheduledExecutorService locatorReconnectionScheduledExecutorService;
    private final boolean privateScheduleExecutorService;
    private final Client.ClientParameters clientParametersPrototype;
    private final List<Address> addresses;
    private final List<StreamProducer> producers = new CopyOnWriteArrayList<StreamProducer>();
    private final List<StreamConsumer> consumers = new CopyOnWriteArrayList<StreamConsumer>();
    private final Codec codec;
    private final BackOffDelayPolicy recoveryBackOffDelayPolicy;
    private final BackOffDelayPolicy topologyUpdateBackOffDelayPolicy;
    private final ConsumersCoordinator consumersCoordinator;
    private final ProducersCoordinator producersCoordinator;
    private final OffsetTrackingCoordinator offsetTrackingCoordinator;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AddressResolver addressResolver;
    private final Clock clock = new Clock();
    private final ScheduledFuture<?> clockRefreshFuture;
    private final ByteBufAllocator byteBufAllocator;
    private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false);
    private final Runnable locatorInitializationSequence;
    private final List<Locator> locators;
    private final ExecutorServiceFactory executorServiceFactory;
    private final ObservationCollector<?> observationCollector;
    private final Duration rpcTimeout;

    @SuppressFBWarnings(value={"CT_CONSTRUCTOR_THROW"})
    StreamEnvironment(ScheduledExecutorService scheduledExecutorService, Client.ClientParameters clientParametersPrototype, List<URI> uris, BackOffDelayPolicy recoveryBackOffDelayPolicy, BackOffDelayPolicy topologyBackOffDelayPolicy, AddressResolver addressResolver, int maxProducersByConnection, int maxTrackingConsumersByConnection, int maxConsumersByConnection, StreamEnvironmentBuilder.DefaultTlsConfiguration tlsConfiguration, ByteBufAllocator byteBufAllocator, boolean lazyInit, Function<Utils.ClientConnectionType, String> connectionNamingStrategy, Function<Client.ClientParameters, Client> clientFactory, ObservationCollector<?> observationCollector, boolean forceReplicaForConsumers, boolean forceLeaderForProducers, Duration producerNodeRetryDelay, Duration consumerNodeRetryDelay, int expectedLocatorCount) {
        ScheduledExecutorService executorService;
        String username;
        CredentialsProvider credentialsProvider;
        boolean tls;
        this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
        this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
        this.byteBufAllocator = byteBufAllocator;
        this.rpcTimeout = Optional.ofNullable(clientParametersPrototype.rpcTimeout()).orElse(Client.DEFAULT_RPC_TIMEOUT);
        clientParametersPrototype = clientParametersPrototype.byteBufAllocator(byteBufAllocator);
        clientParametersPrototype = this.maybeSetUpClientParametersFromUris(uris, clientParametersPrototype);
        this.observationCollector = observationCollector;
        if (tlsConfiguration != null && tlsConfiguration.enabled()) {
            tls = true;
            try {
                SslContext sslContext = tlsConfiguration.sslContext() == null ? SslContextBuilder.forClient().build() : tlsConfiguration.sslContext();
                clientParametersPrototype.sslContext(sslContext);
            }
            catch (SSLException e) {
                throw new StreamException("Error while creating Netty SSL context", e);
            }
        } else {
            tls = false;
        }
        if (uris.isEmpty()) {
            this.addresses = Collections.singletonList(new Address(clientParametersPrototype.host(), clientParametersPrototype.port()));
        } else {
            int defaultPort = tls ? 5551 : 5552;
            this.addresses = uris.stream().map(uriItem -> new Address(uriItem.getHost() == null ? "localhost" : uriItem.getHost(), uriItem.getPort() == -1 ? defaultPort : uriItem.getPort())).collect(Collectors.toList());
        }
        AddressResolver addressResolverToUse = addressResolver;
        if (this.addresses.size() == 1 && "localhost".equals(this.addresses.get(0).host()) && addressResolver == Utils.DEFAULT_ADDRESS_RESOLVER && (credentialsProvider = clientParametersPrototype.credentialsProvider()) instanceof UsernamePasswordCredentialsProvider && "guest".equals(username = ((UsernamePasswordCredentialsProvider)credentialsProvider).getUsername())) {
            Address address = new Address("localhost", clientParametersPrototype.port());
            ConcurrentHashMap.KeySetView passedInAddresses = ConcurrentHashMap.newKeySet();
            addressResolverToUse = addr -> {
                passedInAddresses.add(addr);
                if (passedInAddresses.size() > 1) {
                    LOGGER.warn("Assumed development environment but it seems incorrect.");
                    passedInAddresses.clear();
                }
                return address;
            };
            LOGGER.info("Connecting to localhost with {} user, assuming development environment", (Object)"guest");
            LOGGER.info("Using address resolver to always connect to localhost");
        }
        this.addressResolver = addressResolverToUse;
        int locatorCount = expectedLocatorCount > 0 ? expectedLocatorCount : Math.min(this.addresses.size(), 3);
        LOGGER.debug("Using {} locator connection(s)", (Object)locatorCount);
        List lctrs = IntStream.range(0, locatorCount).mapToObj(i -> {
            Address addr = this.addresses.get(i % this.addresses.size());
            return new Locator(i, addr);
        }).collect(Collectors.toList());
        this.locators = List.copyOf(lctrs);
        this.executorServiceFactory = new DefaultExecutorServiceFactory(this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
        if (clientParametersPrototype.eventLoopGroup == null) {
            this.eventLoopGroup = Utils.eventLoopGroup();
            this.clientParametersPrototype = clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
        } else {
            this.eventLoopGroup = null;
            this.clientParametersPrototype = clientParametersPrototype.duplicate().eventLoopGroup(clientParametersPrototype.eventLoopGroup);
        }
        if (scheduledExecutorService == null) {
            int threads = Utils.AVAILABLE_PROCESSORS;
            LOGGER.debug("Creating scheduled executor service with {} thread(s)", (Object)threads);
            ThreadFactory threadFactory = ThreadUtils.threadFactory("rabbitmq-stream-environment-scheduler-");
            executorService = Executors.newScheduledThreadPool(threads, threadFactory);
            this.privateScheduleExecutorService = true;
        } else {
            executorService = scheduledExecutorService;
            this.privateScheduleExecutorService = false;
        }
        this.scheduledExecutorService = executorService;
        this.producersCoordinator = new ProducersCoordinator(this, maxProducersByConnection, maxTrackingConsumersByConnection, connectionNamingStrategy, Utils.coordinatorClientFactory(this, producerNodeRetryDelay), forceLeaderForProducers);
        this.consumersCoordinator = new ConsumersCoordinator(this, maxConsumersByConnection, connectionNamingStrategy, Utils.coordinatorClientFactory(this, consumerNodeRetryDelay), forceReplicaForConsumers, Utils.brokerPicker());
        this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
        ThreadFactory threadFactory = ThreadUtils.threadFactory("rabbitmq-stream-environment-locator-scheduler-");
        this.locatorReconnectionScheduledExecutorService = Executors.newScheduledThreadPool(this.locators.size(), threadFactory);
        Client.ClientParameters clientParametersForInit = this.locatorParametersCopy();
        Runnable locatorInitSequence = () -> {
            RuntimeException lastException = null;
            for (int i = 0; i < this.locators.size(); ++i) {
                Address address = this.addresses.get(i % this.addresses.size());
                Locator locator = this.locator(i);
                address = addressResolver.resolve(address);
                String connectionName = (String)connectionNamingStrategy.apply(Utils.ClientConnectionType.LOCATOR);
                Client.ClientParameters locatorParameters = clientParametersForInit.duplicate().host(address.host()).port(address.port()).clientProperty("connection_name", connectionName).shutdownListener(this.shutdownListener(locator, connectionNamingStrategy, clientFactory));
                try {
                    Client client = (Client)clientFactory.apply(locatorParameters);
                    locator.client(client);
                    LOGGER.debug("Created locator connection '{}'", (Object)connectionName);
                    LOGGER.debug("Locator connected to {}", (Object)address);
                    continue;
                }
                catch (RuntimeException e) {
                    LOGGER.debug("Error while try to connect to {}: {}", (Object)address, (Object)e.getMessage());
                    lastException = e;
                }
            }
            if (this.locators.stream().allMatch(rec$ -> ((Locator)rec$).isNotSet())) {
                throw lastException == null ? new StreamException("Not locator available") : lastException;
            }
            this.locators.forEach(l -> {
                if (l.isNotSet()) {
                    Client.ShutdownListener shutdownListener = this.shutdownListener((Locator)l, connectionNamingStrategy, clientFactory);
                    Client.ClientParameters newLocatorParameters = this.locatorParametersCopy().shutdownListener(shutdownListener);
                    StreamEnvironment.scheduleLocatorConnection(newLocatorParameters, this.addressResolver, l, connectionNamingStrategy, clientFactory, this.locatorReconnectionScheduledExecutorService, this.recoveryBackOffDelayPolicy, l.label());
                }
            });
        };
        if (lazyInit) {
            this.locatorInitializationSequence = locatorInitSequence;
        } else {
            locatorInitSequence.run();
            this.locatorsInitialized.set(true);
            this.locatorInitializationSequence = () -> {};
        }
        this.codec = clientParametersPrototype.codec() == null ? Codecs.DEFAULT : clientParametersPrototype.codec();
        this.clockRefreshFuture = this.scheduledExecutorService.scheduleAtFixedRate(Utils.namedRunnable(this.clock::refresh, "Background clock refresh", new Object[0]), 1L, 1L, TimeUnit.SECONDS);
    }

    private Client.ShutdownListener shutdownListener(Locator locator, Function<Utils.ClientConnectionType, String> connectionNamingStrategy, Function<Client.ClientParameters, Client> clientFactory) {
        AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<Client.ShutdownListener>();
        Client.ShutdownListener shutdownListener = shutdownContext -> {
            String label = locator.label();
            LOGGER.debug("Locator {} disconnected", (Object)label);
            if (shutdownContext.isShutdownUnexpected()) {
                locator.client(null);
                BackOffDelayPolicy delayPolicy = this.recoveryBackOffDelayPolicy;
                LOGGER.debug("Unexpected locator disconnection for on '{}', scheduling recovery with {}", (Object)label, (Object)delayPolicy);
                Client.ClientParameters newLocatorParameters = this.locatorParametersCopy().shutdownListener((Client.ShutdownListener)shutdownListenerReference.get());
                StreamEnvironment.scheduleLocatorConnection(newLocatorParameters, this.addressResolver, locator, connectionNamingStrategy, clientFactory, this.locatorReconnectionScheduledExecutorService, delayPolicy, label);
            } else {
                LOGGER.debug("Locator connection '{}' closing normally", (Object)label);
            }
        };
        shutdownListenerReference.set(shutdownListener);
        return shutdownListener;
    }

    private static void scheduleLocatorConnection(Client.ClientParameters newLocatorParameters, AddressResolver addressResolver, Locator locator, Function<Utils.ClientConnectionType, String> connectionNamingStrategy, Function<Client.ClientParameters, Client> clientFactory, ScheduledExecutorService scheduler, BackOffDelayPolicy delayPolicy, String locatorLabel) {
        LOGGER.debug("Scheduling locator '{}' connection with delay policy {}", (Object)locatorLabel, (Object)delayPolicy);
        try {
            ((CompletableFuture)AsyncRetry.asyncRetry(() -> {
                LOGGER.debug("Locator reconnection...");
                Address resolvedAddress = addressResolver.resolve(locator.address());
                String connectionName = (String)connectionNamingStrategy.apply(Utils.ClientConnectionType.LOCATOR);
                LOGGER.debug("Trying to reconnect locator on {}, with client connection name '{}'", (Object)resolvedAddress, (Object)connectionName);
                Client newLocator = (Client)clientFactory.apply(newLocatorParameters.host(resolvedAddress.host()).port(resolvedAddress.port()).clientProperty("connection_name", connectionName));
                LOGGER.debug("Created locator connection '{}'", (Object)connectionName);
                LOGGER.debug("Locator connected on {}", (Object)resolvedAddress);
                return newLocator;
            }).description("Locator '%s' connection", locatorLabel).scheduler(scheduler).delayPolicy(delayPolicy).build().thenAccept(locator::client)).exceptionally(ex -> {
                LOGGER.debug("Locator connection failed", ex);
                return null;
            });
        }
        catch (Exception e) {
            LOGGER.debug("Error while scheduling locator '{}' reconnection", (Object)locatorLabel, (Object)e);
        }
    }

    private Locator locator(int i) {
        return this.locators.get(i);
    }

    private static String uriDecode(String s) {
        try {
            return URLDecoder.decode(s.replace("+", "%2B"), "US-ASCII");
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    Client.ClientParameters maybeSetUpClientParametersFromUris(List<URI> uris, Client.ClientParameters clientParametersPrototype) {
        String path;
        String userInfo;
        int port;
        if (uris.isEmpty()) {
            return clientParametersPrototype;
        }
        URI uri = uris.get(0);
        clientParametersPrototype = clientParametersPrototype.duplicate();
        String host = uri.getHost();
        if (host != null) {
            clientParametersPrototype.host(host);
        }
        if ((port = uri.getPort()) != -1) {
            clientParametersPrototype.port(port);
        }
        if ((userInfo = uri.getRawUserInfo()) != null) {
            String[] userPassword = userInfo.split(":");
            if (userPassword.length > 2) {
                throw new IllegalArgumentException("Bad user info in URI " + userInfo);
            }
            clientParametersPrototype.username(StreamEnvironment.uriDecode(userPassword[0]));
            if (userPassword.length == 2) {
                clientParametersPrototype.password(StreamEnvironment.uriDecode(userPassword[1]));
            }
        }
        if ((path = uri.getRawPath()) != null && path.length() > 0) {
            if (path.indexOf(47, 1) != -1) {
                throw new IllegalArgumentException("Multiple segments in path of URI: " + path);
            }
            clientParametersPrototype.virtualHost(StreamEnvironment.uriDecode(uri.getPath().substring(1)));
        }
        return clientParametersPrototype;
    }

    public ByteBufAllocator byteBufAllocator() {
        return this.byteBufAllocator;
    }

    void maybeInitializeLocator() {
        if (this.locatorsInitialized.compareAndSet(false, true)) {
            try {
                this.locatorInitializationSequence.run();
            }
            catch (RuntimeException e) {
                this.locatorsInitialized.set(false);
                throw e;
            }
        }
    }

    @Override
    public StreamCreator streamCreator() {
        this.checkNotClosed();
        return new StreamStreamCreator(this);
    }

    @Override
    public void deleteStream(String stream) {
        this.checkNotClosed();
        this.maybeInitializeLocator();
        Client.Response response = this.locator().client().delete(stream);
        if (!response.isOk()) {
            throw new StreamException("Error while deleting stream " + stream + " (" + Utils.formatConstant(response.getResponseCode()) + ")", response.getResponseCode());
        }
    }

    @Override
    public void deleteSuperStream(String superStream) {
        this.checkNotClosed();
        this.maybeInitializeLocator();
        Client.Response response = this.locator().client().deleteSuperStream(superStream);
        if (!response.isOk()) {
            throw new StreamException("Error while deleting super stream " + superStream + " (" + Utils.formatConstant(response.getResponseCode()) + ")", response.getResponseCode());
        }
    }

    @Override
    public StreamStats queryStreamStats(String stream) {
        this.checkNotClosed();
        this.maybeInitializeLocator();
        Client.StreamStatsResponse response = this.locatorOperation(Utils.namedFunction(client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
        if (response.isOk()) {
            Map<String, Long> info = response.getInfo();
            BiFunction<String, String, LongSupplier> offsetSupplierLogic = (key, message) -> {
                if (!info.containsKey(key) || (Long)info.get(key) == -1L) {
                    return () -> {
                        throw new NoOffsetException((String)message);
                    };
                }
                try {
                    long offset = (Long)info.get(key);
                    return () -> offset;
                }
                catch (NumberFormatException e) {
                    return () -> {
                        throw new NoOffsetException((String)message);
                    };
                }
            };
            LongSupplier firstOffsetSupplier = offsetSupplierLogic.apply("first_chunk_id", "No first offset for stream " + stream);
            LongSupplier committedOffsetSupplier = offsetSupplierLogic.apply("committed_chunk_id", "No committed chunk ID for stream " + stream);
            return new DefaultStreamStats(firstOffsetSupplier, committedOffsetSupplier);
        }
        throw Utils.convertCodeToException(response.getResponseCode(), stream, () -> "Error while querying stream stats: " + Utils.formatConstant(response.getResponseCode()) + ".");
    }

    @Override
    public boolean streamExists(String stream) {
        this.checkNotClosed();
        this.maybeInitializeLocator();
        short responseCode = this.locatorOperation(Utils.namedFunction(client -> {
            try {
                return client.streamStats(stream).getResponseCode();
            }
            catch (UnsupportedOperationException e) {
                Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
                return metadata.get(stream).getResponseCode();
            }
        }, "Stream exists for stream '%s'", stream));
        if (responseCode == 1) {
            return true;
        }
        if (responseCode == 2) {
            return false;
        }
        throw Utils.convertCodeToException(responseCode, stream, () -> String.format("Unexpected result when checking if stream '%s' exists: %s.", stream, Utils.formatConstant(responseCode)));
    }

    @Override
    public ProducerBuilder producerBuilder() {
        this.checkNotClosed();
        return new StreamProducerBuilder(this);
    }

    void addProducer(StreamProducer producer) {
        this.producers.add(producer);
    }

    void removeProducer(StreamProducer producer) {
        this.producers.remove(producer);
    }

    void addConsumer(StreamConsumer consumer) {
        this.consumers.add(consumer);
    }

    void removeConsumer(StreamConsumer consumer) {
        this.consumers.remove(consumer);
    }

    @Override
    public ConsumerBuilder consumerBuilder() {
        this.checkNotClosed();
        return new StreamConsumerBuilder(this);
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            for (StreamProducer producer : this.producers) {
                try {
                    producer.closeFromEnvironment();
                }
                catch (Exception e) {
                    LOGGER.warn("Error while closing producer, moving on to the next one", (Throwable)e);
                }
            }
            for (StreamConsumer consumer : this.consumers) {
                try {
                    consumer.closeFromEnvironment();
                }
                catch (Exception e) {
                    LOGGER.warn("Error while closing consumer, moving on to the next one", (Throwable)e);
                }
            }
            this.producersCoordinator.close();
            this.consumersCoordinator.close();
            this.offsetTrackingCoordinator.close();
            for (Locator locator : this.locators) {
                try {
                    if (!locator.isSet()) continue;
                    locator.client().close();
                    locator.client(null);
                }
                catch (Exception e) {
                    LOGGER.warn("Error while closing locator client", (Throwable)e);
                }
            }
            try {
                this.executorServiceFactory.close();
            }
            catch (Exception e) {
                LOGGER.info("Error while closing executor service factory: {}", (Object)e.getMessage());
            }
            this.clockRefreshFuture.cancel(false);
            if (this.privateScheduleExecutorService) {
                this.scheduledExecutorService.shutdownNow();
            }
            if (this.locatorReconnectionScheduledExecutorService != null) {
                this.locatorReconnectionScheduledExecutorService.shutdownNow();
            }
            try {
                if (!(this.eventLoopGroup == null || this.eventLoopGroup.isShuttingDown() && this.eventLoopGroup.isShutdown())) {
                    LOGGER.debug("Closing Netty event loop group");
                    this.eventLoopGroup.shutdownGracefully(1L, 10L, TimeUnit.SECONDS).get(10L, TimeUnit.SECONDS);
                }
            }
            catch (InterruptedException e) {
                LOGGER.info("Event loop group closing has been interrupted");
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                LOGGER.info("Event loop group closing failed", (Throwable)e);
            }
            catch (TimeoutException e) {
                LOGGER.info("Could not close event loop group in 10 seconds");
            }
        }
    }

    ScheduledExecutorService scheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    Duration rpcTimeout() {
        return this.rpcTimeout;
    }

    void execute(Runnable task, String description, Object ... args) {
        this.scheduledExecutorService().execute(Utils.namedRunnable(task, description, args));
    }

    BackOffDelayPolicy recoveryBackOffDelayPolicy() {
        return this.recoveryBackOffDelayPolicy;
    }

    BackOffDelayPolicy topologyUpdateBackOffDelayPolicy() {
        return this.topologyUpdateBackOffDelayPolicy;
    }

    CompressionCodecFactory compressionCodecFactory() {
        return this.clientParametersPrototype.compressionCodecFactory;
    }

    ObservationCollector<?> observationCollector() {
        return this.observationCollector;
    }

    Runnable registerConsumer(StreamConsumer consumer, String stream, OffsetSpecification offsetSpecification, String trackingReference, SubscriptionListener subscriptionListener, Runnable trackingClosingCallback, MessageHandler messageHandler, Map<String, String> subscriptionProperties, ConsumerFlowStrategy flowStrategy) {
        return this.consumersCoordinator.subscribe(consumer, stream, offsetSpecification, trackingReference, subscriptionListener, trackingClosingCallback, messageHandler, subscriptionProperties, flowStrategy);
    }

    Runnable registerProducer(StreamProducer producer, String reference, String stream) {
        return this.producersCoordinator.registerProducer(producer, reference, stream);
    }

    Locator locator() {
        if (LOGGER.isDebugEnabled()) {
            try {
                LOGGER.debug("Locators: {}", (Object)this.locators.stream().map(l -> l.label() + " is set " + l.isSet()).collect(Collectors.joining(", ")));
            }
            catch (Exception e) {
                LOGGER.debug("Error while listing locators: {}", (Object)e.getMessage());
            }
        }
        return this.locators.stream().filter(rec$ -> ((Locator)rec$).isSet()).findAny().orElseThrow(LocatorNotAvailableException::new);
    }

    <T> T locatorOperation(Function<Client, T> operation) {
        return StreamEnvironment.locatorOperation(operation, this::locator, this.recoveryBackOffDelayPolicy);
    }

    static <T> T locatorOperation(Function<Client, T> operation, Supplier<Locator> locatorSupplier, BackOffDelayPolicy backOffDelayPolicy) {
        int maxAttempt = 3;
        boolean executed = false;
        Exception lastException = null;
        T result = null;
        LOGGER.debug("Starting locator operation '{}'", operation);
        long start = System.nanoTime();
        for (int attempt = 0; attempt < maxAttempt; ++attempt) {
            try {
                Locator locator = locatorSupplier.get();
                Client client = locator.client();
                LOGGER.debug("Using locator {} on {}:{} to run operation '{}'", new Object[]{locator.id(), client.getHost(), client.getPort(), operation});
                result = operation.apply(client);
                LOGGER.debug("Locator operation '{}' succeeded in {}", operation, (Object)Duration.ofNanos(System.nanoTime() - start));
                executed = true;
                break;
            }
            catch (LocatorNotAvailableException e) {
                Duration waitTime = backOffDelayPolicy.delay(attempt);
                LOGGER.debug("No locator available for operation '{}', waiting for {} before retrying", operation, (Object)waitTime);
                try {
                    Thread.sleep(waitTime.toMillis());
                    continue;
                }
                catch (InterruptedException ex) {
                    lastException = ex;
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            catch (Exception e) {
                LOGGER.debug("Exception during locator operation '{}': {}", operation, (Object)Utils.exceptionMessage(e));
                lastException = e;
                break;
            }
        }
        if (!executed) {
            if (lastException == null) {
                throw new LocatorNotAvailableException();
            }
            throw new StreamException("Could not execute operation after " + maxAttempt + " attempts", lastException);
        }
        return result;
    }

    boolean filteringSupported() {
        return this.locatorOperation(Client::filteringSupported);
    }

    Clock clock() {
        return this.clock;
    }

    AddressResolver addressResolver() {
        return this.addressResolver;
    }

    Codec codec() {
        return this.codec;
    }

    Client.ClientParameters clientParametersCopy() {
        return this.clientParametersPrototype.duplicate();
    }

    private Client.ClientParameters locatorParametersCopy() {
        return this.clientParametersPrototype.duplicate().executorServiceFactory(this.executorServiceFactory).dispatchingExecutorServiceFactory(Utils.NO_OP_EXECUTOR_SERVICE_FACTORY);
    }

    TrackingConsumerRegistration registerTrackingConsumer(StreamConsumer streamConsumer, StreamConsumerBuilder.TrackingConfiguration configuration) {
        Runnable closingCallable = this.producersCoordinator.registerTrackingConsumer(streamConsumer);
        OffsetTrackingCoordinator.Registration offsetTrackingRegistration = this.offsetTrackingCoordinator.needTrackingRegistration(configuration) ? this.offsetTrackingCoordinator.registerTrackingConsumer(streamConsumer, configuration) : null;
        Runnable closingSequence = offsetTrackingRegistration == null ? closingCallable : () -> {
            try {
                LOGGER.debug("Executing offset tracking registration closing sequence ");
                offsetTrackingRegistration.closingCallback().run();
                LOGGER.debug("Offset tracking registration closing sequence executed");
            }
            catch (Exception e) {
                LOGGER.warn("Error while executing offset tracking registration closing sequence: {}", (Object)e.getMessage());
            }
            closingCallable.run();
        };
        return new TrackingConsumerRegistration(closingSequence, offsetTrackingRegistration == null ? null : offsetTrackingRegistration.postMessageProcessingCallback(), offsetTrackingRegistration == null ? Utils.NO_OP_LONG_CONSUMER : offsetTrackingRegistration.trackingCallback(), offsetTrackingRegistration == null ? Utils.NO_OP_LONG_SUPPLIER : offsetTrackingRegistration::flush);
    }

    public String toString() {
        return "{ \"locators\" : [" + this.locators.stream().map(l -> Utils.quote(l.label())).collect(Collectors.joining(",")) + "], " + Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount()) + "," + Utils.jsonField("consumer_client_count", this.consumersCoordinator.managerCount()) + ",\"producers\" : " + this.producersCoordinator + ", \"consumers\" : " + this.consumersCoordinator + ", \"offset_tracking\" : " + this.offsetTrackingCoordinator + "}";
    }

    private void checkNotClosed() {
        if (this.closed.get()) {
            throw new IllegalStateException("This environment instance has been closed");
        }
    }

    static class Locator {
        private final long id;
        private final Address address;
        private volatile Optional<Client> client;
        private volatile LocalDateTime lastChanged;

        Locator(long id, Address address) {
            this.id = id;
            this.address = address;
            this.client = Optional.empty();
            this.lastChanged = LocalDateTime.now();
            LOGGER.debug("Locator wrapper '{}' created with no connection at {}", (Object)this.label(), (Object)this.lastChanged);
        }

        Locator client(Client client) {
            Client previous = this.nullableClient();
            this.client = Optional.ofNullable(client);
            LocalDateTime now = LocalDateTime.now();
            LOGGER.debug("Locator wrapper '{}' updated from {} to {}, last changed {}, {} ago", new Object[]{this.label(), previous, client, this.lastChanged, Duration.between(this.lastChanged, now)});
            this.lastChanged = now;
            return this;
        }

        private long id() {
            return this.id;
        }

        private boolean isNotSet() {
            return !this.isSet();
        }

        private boolean isSet() {
            return this.client.isPresent();
        }

        Client client() {
            return this.client.orElseThrow(() -> new LocatorNotAvailableException(this.id));
        }

        private Client nullableClient() {
            return this.client.orElse(null);
        }

        private Address address() {
            return this.address;
        }

        private String label() {
            Client c = this.nullableClient();
            if (c == null) {
                return String.format("%s:%d (id %d)", this.address.host(), this.address.port(), this.id);
            }
            return String.format("%s:%d [id %d, advertised %s:%d]", c.getHost(), c.getPort(), this.id(), c.serverAdvertisedHost(), c.serverAdvertisedPort());
        }

        public String toString() {
            return "Locator{address=" + this.address + ", client=" + this.client + "}";
        }
    }

    static class LocatorNotAvailableException
    extends StreamException {
        public LocatorNotAvailableException() {
            super("Locator not available");
        }

        public LocatorNotAvailableException(long id) {
            super(String.format("Locator %d not available", id));
        }
    }

    static class TrackingConsumerRegistration {
        private final Runnable closingCallback;
        private final Consumer<MessageHandler.Context> postMessageProcessingCallback;
        private final LongConsumer trackingCallback;
        private final LongSupplier flushOperation;

        TrackingConsumerRegistration(Runnable closingCallback, Consumer<MessageHandler.Context> postMessageProcessingCallback, LongConsumer trackingCallback, LongSupplier flushOperation) {
            this.closingCallback = closingCallback;
            this.postMessageProcessingCallback = postMessageProcessingCallback;
            this.trackingCallback = trackingCallback;
            this.flushOperation = flushOperation;
        }

        Runnable closingCallback() {
            return this.closingCallback;
        }

        LongConsumer trackingCallback() {
            return this.trackingCallback;
        }

        Consumer<MessageHandler.Context> postMessageProcessingCallback() {
            return this.postMessageProcessingCallback;
        }

        long flush() {
            return this.flushOperation.getAsLong();
        }
    }

    private static class DefaultStreamStats
    implements StreamStats {
        private final LongSupplier firstOffsetSupplier;
        private final LongSupplier committedOffsetSupplier;

        private DefaultStreamStats(LongSupplier firstOffsetSupplier, LongSupplier committedOffsetSupplier) {
            this.firstOffsetSupplier = firstOffsetSupplier;
            this.committedOffsetSupplier = committedOffsetSupplier;
        }

        @Override
        public long firstOffset() {
            return this.firstOffsetSupplier.getAsLong();
        }

        @Override
        public long committedChunkId() {
            return this.committedOffsetSupplier.getAsLong();
        }

        public String toString() {
            return "StreamStats{firstOffset=" + this.firstOffset() + ", committedOffset=" + this.committedChunkId() + "}";
        }
    }
}

