/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Management;
import com.rabbitmq.client.amqp.impl.AmqpBindingManagement;
import com.rabbitmq.client.amqp.impl.AmqpConnection;
import com.rabbitmq.client.amqp.impl.AmqpExchangeSpecification;
import com.rabbitmq.client.amqp.impl.AmqpManagementParameters;
import com.rabbitmq.client.amqp.impl.AmqpQueueSpecification;
import com.rabbitmq.client.amqp.impl.ExceptionUtils;
import com.rabbitmq.client.amqp.impl.TopologyListener;
import com.rabbitmq.client.amqp.impl.UriUtils;
import com.rabbitmq.qpid.protonj2.client.Delivery;
import com.rabbitmq.qpid.protonj2.client.DeliveryMode;
import com.rabbitmq.qpid.protonj2.client.Message;
import com.rabbitmq.qpid.protonj2.client.Receiver;
import com.rabbitmq.qpid.protonj2.client.ReceiverOptions;
import com.rabbitmq.qpid.protonj2.client.Sender;
import com.rabbitmq.qpid.protonj2.client.SenderOptions;
import com.rabbitmq.qpid.protonj2.client.Session;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.types.Binary;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AmqpManagement
implements Management {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0L);
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpManagement.class);
    private static final String MANAGEMENT_NODE_ADDRESS = "/management";
    private static final String REPLY_TO = "$me";
    private static final String GET = "GET";
    private static final String POST = "POST";
    private static final String PUT = "PUT";
    private static final String DELETE = "DELETE";
    private static final int CODE_200 = 200;
    private static final int CODE_201 = 201;
    private static final int CODE_204 = 204;
    private static final int CODE_400 = 400;
    private static final int CODE_404 = 404;
    private static final int CODE_409 = 409;
    private final AmqpConnection connection;
    private final Long id;
    private volatile Session session;
    private volatile Sender sender;
    private volatile Receiver receiver;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Duration rpcTimeout = Duration.ofSeconds(10L);
    private final ConcurrentMap<UUID, OutstandingRequest> outstandingRequests = new ConcurrentHashMap<UUID, OutstandingRequest>();
    private volatile Future<?> receiveLoop;
    private final TopologyListener topologyListener;
    private final Supplier<String> nameSupplier;
    private final AtomicReference<State> state = new AtomicReference<State>(State.CREATED);
    private volatile boolean initializing = false;
    private final Lock initializationLock = new ReentrantLock();
    private final Duration receiveLoopIdleTimeout;
    private final Lock instanceLock = new ReentrantLock();

    AmqpManagement(AmqpManagementParameters parameters) {
        this.id = ID_SEQUENCE.getAndIncrement();
        this.connection = parameters.connection();
        this.topologyListener = parameters.topologyListener() == null ? TopologyListener.NO_OP : parameters.topologyListener();
        this.nameSupplier = parameters.nameSupplier();
        this.receiveLoopIdleTimeout = parameters.receiveLoopIdleTimeout() == null ? Duration.ofSeconds(20L) : parameters.receiveLoopIdleTimeout();
    }

    @Override
    public Management.QueueSpecification queue() {
        this.checkAvailable();
        return new AmqpQueueSpecification(this);
    }

    @Override
    public Management.QueueSpecification queue(String name) {
        this.checkAvailable();
        return this.queue().name(name);
    }

    @Override
    public Management.QueueInfo queueInfo(String name) {
        this.checkAvailable();
        try {
            Map<String, Object> queueInfo = this.get(AmqpManagement.queueLocation(name)).responseBodyAsMap();
            return new DefaultQueueInfo(queueInfo);
        }
        catch (ClientException e) {
            String message = String.format("Error while fetching information for queue '%s'", name);
            LOGGER.debug(message, (Throwable)e);
            throw ExceptionUtils.convert(e, message, new Object[0]);
        }
    }

    @Override
    public Management.QueueDeletion queueDeletion() {
        return this::queueDelete;
    }

    @Override
    public void queueDelete(String name) {
        this.checkAvailable();
        Map<String, Object> responseBody = this.delete(AmqpManagement.queueLocation(name), 200);
        this.topologyListener.queueDeleted(name);
        if (!responseBody.containsKey("message_count")) {
            throw new AmqpException("Response body should contain message_count", new Object[0]);
        }
    }

    @Override
    public Management.ExchangeSpecification exchange() {
        this.checkAvailable();
        return new AmqpExchangeSpecification(this);
    }

    @Override
    public Management.ExchangeSpecification exchange(String name) {
        this.checkAvailable();
        return this.exchange().name(name);
    }

    @Override
    public Management.ExchangeDeletion exchangeDeletion() {
        return this::exchangeDelete;
    }

    @Override
    public void exchangeDelete(String name) {
        this.checkAvailable();
        this.delete(AmqpManagement.exchangeLocation(name), 204);
        this.topologyListener.exchangeDeleted(name);
    }

    @Override
    public Management.BindingSpecification binding() {
        this.checkAvailable();
        return new AmqpBindingManagement.AmqpBindingSpecification(this);
    }

    @Override
    public Management.UnbindSpecification unbind() {
        this.checkAvailable();
        return new AmqpBindingManagement.AmqpUnbindSpecification(this);
    }

    @Override
    public Management.PurgeStatus queuePurge(String queue) {
        Map<String, Object> responseBody = this.delete(AmqpManagement.queueLocation(queue) + "/messages", 200);
        if (!responseBody.containsKey("message_count") && !(responseBody.get("message_count") instanceof Number)) {
            throw new AmqpException("Response body should contain message_count", new Object[0]);
        }
        return new DefaultPurgeStatus(((Number)responseBody.get("message_count")).longValue());
    }

    void setToken(String token) {
        if (!this.connection.setTokenSupported()) {
            throw new UnsupportedOperationException("Token renewal requires at least RabbitMQ 4.1.0");
        }
        this.checkAvailable();
        UUID requestId = AmqpManagement.messageId();
        try {
            Message<Binary> request = Message.create(new Binary(token.getBytes(StandardCharsets.UTF_8))).to("/auth/tokens").subject(PUT);
            OutstandingRequest outstandingRequest = this.request(request, requestId);
            outstandingRequest.block();
            AmqpManagement.checkResponse(outstandingRequest, requestId, 204);
        }
        catch (ClientException e) {
            throw new AmqpException("Error on set-token operation", e);
        }
    }

    @Override
    public void close() {
        if (this.initializing) {
            throw new AmqpException.AmqpResourceInvalidStateException("Management is initializing, retry closing later.", new Object[0]);
        }
        if (this.closed.compareAndSet(false, true)) {
            this.releaseResources(null, State.CLOSED);
            if (this.receiver != null) {
                try {
                    this.receiver.close();
                }
                catch (Exception e) {
                    LOGGER.debug("Error while closing management receiver: {}", (Object)e.getMessage());
                }
            }
            if (this.sender != null) {
                try {
                    this.sender.close();
                }
                catch (Exception e) {
                    LOGGER.debug("Error while closing management sender: {}", (Object)e.getMessage());
                }
            }
            if (this.session != null) {
                try {
                    this.session.close();
                }
                catch (Exception e) {
                    LOGGER.debug("Error while closing management session: {}", (Object)e.getMessage());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void init() {
        String cName = this.connection.name();
        LOGGER.debug("Trying to initialize management for connection {}", (Object)cName);
        State state = this.state();
        if (state != State.OPEN) {
            if (!this.initializing) {
                try {
                    this.initializationLock.lock();
                    boolean initInProgress = this.initializing;
                    state = this.state();
                    if (!initInProgress && state != State.OPEN) {
                        this.initializing = true;
                        LOGGER.debug("Initializing management for connection {} ({}).", (Object)cName, (Object)this);
                        this.markUnavailable();
                        try {
                            if (this.receiveLoop != null) {
                                this.receiveLoop.cancel(true);
                                this.receiveLoop = null;
                            }
                            LOGGER.debug("Creating management session ({}).", (Object)this);
                            this.session = this.connection.nativeConnection().openSession();
                            String linkPairName = "management-link-pair";
                            Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
                            LOGGER.debug("Creating management sender ({}).", (Object)this);
                            this.sender = this.session.openSender(MANAGEMENT_NODE_ADDRESS, (SenderOptions)((SenderOptions)((SenderOptions)new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE)).linkName(linkPairName)).properties(properties));
                            LOGGER.debug("Creating management receiver ({}).", (Object)this);
                            this.receiver = this.session.openReceiver(MANAGEMENT_NODE_ADDRESS, ((ReceiverOptions)((ReceiverOptions)((ReceiverOptions)new ReceiverOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE)).linkName(linkPairName)).properties(properties)).creditWindow(100));
                            this.sender.openFuture().get(this.rpcTimeout.toMillis(), TimeUnit.MILLISECONDS);
                            LOGGER.debug("Management sender created ({}).", (Object)this);
                            this.receiver.openFuture().get(this.rpcTimeout.toMillis(), TimeUnit.MILLISECONDS);
                            LOGGER.debug("Management receiver created ({}).", (Object)this);
                            this.state(State.OPEN);
                            this.closed.set(false);
                        }
                        catch (Exception e) {
                            LOGGER.info("Error during management {} initialization: {}", (Object)cName, (Object)e.getMessage());
                            throw ExceptionUtils.convert(e);
                        }
                        finally {
                            this.initializing = false;
                        }
                    }
                    LOGGER.debug("Not initializing management {}: init in progress {}, state {}", new Object[]{cName, initInProgress, state});
                }
                finally {
                    this.initializationLock.unlock();
                }
            } else {
                LOGGER.debug("Not initializing management {} because it is already initializing", (Object)cName);
            }
        } else {
            LOGGER.debug("Not initializing management {} because state is {}", (Object)cName, (Object)state);
        }
    }

    private Runnable receiveTask() {
        return () -> {
            try {
                Duration waitDuration = Duration.ofMillis(100L);
                long idleTime = 0L;
                while (!Thread.currentThread().isInterrupted()) {
                    Delivery delivery = this.receiver.receive(waitDuration.toMillis(), TimeUnit.MILLISECONDS);
                    if (delivery != null) {
                        idleTime = 0L;
                        Object correlationId = delivery.message().correlationId();
                        if (correlationId instanceof UUID) {
                            OutstandingRequest request = (OutstandingRequest)this.outstandingRequests.remove(correlationId);
                            if (request != null) {
                                request.complete(delivery.message());
                                continue;
                            }
                            LOGGER.info("Could not find outstanding request {}", correlationId);
                            continue;
                        }
                        LOGGER.info("Could not correlate inbound message with management request");
                        continue;
                    }
                    if ((idleTime += waitDuration.toMillis()) <= this.receiveLoopIdleTimeout.toMillis()) continue;
                    LOGGER.debug("Management receive loop has been idle for more than {}, finishing it.", (Object)this.receiveLoopIdleTimeout);
                    this.receiveLoop = null;
                    return;
                }
            }
            catch (ClientConnectionRemotelyClosedException | ClientLinkRemotelyClosedException waitDuration) {
            }
            catch (ClientSessionRemotelyClosedException e) {
                this.markUnavailable();
                LOGGER.info("Management session closed in receive loop: {} ({})", (Object)e.getMessage(), (Object)this);
                AmqpException exception = ExceptionUtils.convert(e);
                this.failRequests(exception);
                if (exception instanceof AmqpException.AmqpSecurityException) {
                    LOGGER.debug("Recovering AMQP management because the failure was a security exception ({}).", (Object)this);
                    this.init();
                }
            }
            catch (ClientException e) {
                Consumer<String> log = this.closed.get() ? m -> LOGGER.debug(m, (Throwable)e) : m -> LOGGER.info(m, (Throwable)e);
                log.accept("Error while polling AMQP receiver");
            }
        };
    }

    private void failRequests(AmqpException exception) {
        Iterator iterator = this.outstandingRequests.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry request = iterator.next();
            LOGGER.info("Failing management request {}", request.getKey());
            ((OutstandingRequest)request.getValue()).fail(exception);
            iterator.remove();
        }
    }

    void releaseResources(AmqpException e) {
        this.releaseResources(e, null);
    }

    void releaseResources(AmqpException e, State state) {
        if (state == null) {
            this.markUnavailable();
        } else {
            this.state(state);
        }
        if (this.receiveLoop != null) {
            this.receiveLoop.cancel(true);
            this.receiveLoop = null;
        }
        this.failRequests(e);
    }

    Management.QueueInfo declareQueue(String name, Map<String, Object> body) {
        if (name == null || name.isBlank()) {
            DefaultQueueInfo info = null;
            while (info == null) {
                name = this.nameSupplier.get();
                Response<Map<String, Object>> response = this.declare(body, AmqpManagement.queueLocation(name), 200, 201, 409);
                if (response.code() != 201) continue;
                info = new DefaultQueueInfo(response.body());
            }
            return info;
        }
        return new DefaultQueueInfo(this.declare(body, AmqpManagement.queueLocation(name), 200, 201).body());
    }

    void declareExchange(String name, Map<String, Object> body) {
        this.declare(body, AmqpManagement.exchangeLocation(name), 204);
    }

    private Response<Map<String, Object>> declare(Map<String, Object> body, String target, int ... expectedResponseCodes) {
        return this.declare(body, target, PUT, expectedResponseCodes);
    }

    private Response<Map<String, Object>> declare(Map<String, Object> body, String target, String operation, int ... expectedResponseCodes) {
        this.checkAvailable();
        UUID requestId = AmqpManagement.messageId();
        try {
            Message<Map<String, Object>> request = Message.create(body).to(target).subject(operation);
            OutstandingRequest outstandingRequest = this.request(request, requestId);
            outstandingRequest.block();
            AmqpManagement.checkResponse(outstandingRequest, requestId, expectedResponseCodes);
            return outstandingRequest.mapResponse();
        }
        catch (ClientException e) {
            throw new AmqpException("Error on PUT operation: " + target, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    OutstandingRequest request(Message<?> request, UUID requestId) throws ClientException {
        request.messageId(requestId).replyTo(REPLY_TO);
        OutstandingRequest outstandingRequest = new OutstandingRequest(this.rpcTimeout);
        LOGGER.debug("Enqueueing request {}", (Object)requestId);
        this.outstandingRequests.put(requestId, outstandingRequest);
        LOGGER.debug("Sending request {}", (Object)requestId);
        this.sender.send(request);
        Future<?> loop = this.receiveLoop;
        if (loop == null) {
            this.instanceLock.lock();
            try {
                loop = this.receiveLoop;
                if (loop == null) {
                    Runnable receiveTask = this.receiveTask();
                    LOGGER.debug("Starting management receive loop ({}).", (Object)this);
                    this.receiveLoop = this.connection.environment().executorService().submit(receiveTask);
                    LOGGER.debug("Management initialized ({}).", (Object)this);
                }
            }
            finally {
                this.instanceLock.unlock();
            }
        }
        return outstandingRequest;
    }

    private Map<String, Object> delete(String target, int expectedResponseCode) {
        this.checkAvailable();
        UUID requestId = AmqpManagement.messageId();
        try {
            Message request = Message.create((Map)null).to(target).subject(DELETE);
            OutstandingRequest outstandingRequest = this.request(request, requestId);
            outstandingRequest.block();
            AmqpManagement.checkResponse(outstandingRequest, requestId, expectedResponseCode);
            return outstandingRequest.responseBodyAsMap();
        }
        catch (ClientException e) {
            throw new AmqpException("Error on DELETE operation: " + target, e);
        }
    }

    private static UUID messageId() {
        return UUID.randomUUID();
    }

    private static String queueLocation(String q) {
        return "/queues/" + UriUtils.encodePathSegment(q);
    }

    private static String exchangeLocation(String e) {
        return "/exchanges/" + UriUtils.encodePathSegment(e);
    }

    private static void checkResponse(OutstandingRequest request, UUID requestId, int ... expectedResponseCodes) throws ClientException {
        String explanation;
        Message<?> response = request.responseMessage();
        if (!requestId.equals(response.correlationId())) {
            throw new AmqpException("Unexpected correlation ID", new Object[0]);
        }
        int responseCode = request.response().code();
        String string = explanation = request.response().body() instanceof String ? (String)request.response().body() : null;
        if (IntStream.of(expectedResponseCodes).noneMatch(c -> c == responseCode)) {
            if (responseCode == 404 || responseCode == 400 && AmqpManagement.queueDoesNotExist(explanation)) {
                explanation = explanation == null ? "Entity does not exist" : explanation;
                throw new AmqpException.AmqpEntityDoesNotExistException(explanation);
            }
            String message = String.format("Unexpected response code: %d instead of %s%s", responseCode, IntStream.of(expectedResponseCodes).mapToObj(String::valueOf).collect(Collectors.joining(", ")), explanation != null ? " (message: '" + explanation : "')");
            try {
                LOGGER.info("Management request failed: '{}'. Response body: '{}'", (Object)message, request.responseMessage().body());
            }
            catch (Exception e) {
                LOGGER.info("Could not get management request body: {}", (Object)e.getMessage());
            }
            throw new AmqpException(message, new Object[0]);
        }
    }

    private static boolean queueDoesNotExist(String explanation) {
        return explanation != null && explanation.contains("no queue '") && explanation.contains("in vhost '");
    }

    void bind(Map<String, Object> body) {
        this.declare(body, "/bindings", POST, 204);
    }

    void unbind(String destinationField, String source, String destination, String key, Map<String, Object> arguments) {
        if (arguments == null || arguments.isEmpty()) {
            String target = "/bindings/src=" + UriUtils.encodeNonUnreserved(source) + ";" + destinationField + "=" + UriUtils.encodeNonUnreserved(destination) + ";key=" + UriUtils.encodeNonUnreserved(key) + ";args=";
            this.delete(target, 204);
        } else {
            List<Map<String, Object>> bindings;
            String target = this.bindingsTarget(destinationField, source, destination, key);
            try {
                bindings = this.get(target).responseBodyAsList();
            }
            catch (ClientException e) {
                throw new AmqpException("Error on GET operation: " + target, e);
            }
            AmqpManagement.matchBinding(bindings, key, arguments).ifPresent(location -> this.delete((String)location, 204));
        }
    }

    private static Optional<String> matchBinding(List<Map<String, Object>> bindings, String key, Map<String, Object> arguments) {
        Optional<String> uri = !bindings.isEmpty() ? bindings.stream().filter(binding -> {
            String bindingKey = (String)binding.get("binding_key");
            Map bindingArguments = (Map)binding.get("arguments");
            if (key == null && bindingKey == null || key != null && key.equals(bindingKey)) {
                return arguments == null && bindingArguments == null || arguments != null && arguments.equals(bindingArguments);
            }
            return false;
        }).map(b -> b.get("location").toString()).findFirst() : Optional.empty();
        return uri;
    }

    private OutstandingRequest get(String target) throws ClientException {
        this.checkAvailable();
        UUID requestId = AmqpManagement.messageId();
        Message request = Message.create((Map)null).to(target).subject(GET);
        OutstandingRequest outstandingRequest = this.request(request, requestId);
        outstandingRequest.block();
        AmqpManagement.checkResponse(outstandingRequest, requestId, 200);
        return outstandingRequest;
    }

    private String bindingsTarget(String destinationField, String source, String destination, String key) {
        return "/bindings?src=" + UriUtils.encodeParameter(source) + "&" + destinationField + "=" + UriUtils.encodeParameter(destination) + "&key=" + UriUtils.encodeParameter(key);
    }

    TopologyListener recovery() {
        return this.topologyListener;
    }

    boolean hasReceiveLoop() {
        return this.receiveLoop != null;
    }

    private void checkAvailable() {
        if (this.state() == State.CLOSED) {
            throw new AmqpException.AmqpResourceClosedException("Management is closed");
        }
        if (this.state() != State.OPEN) {
            throw new AmqpException.AmqpResourceInvalidStateException("Management is not open, current state is %s", this.state().name());
        }
    }

    public String toString() {
        return this.connection.toString() + "-" + this.id;
    }

    private State state() {
        return this.state.get();
    }

    private void state(State state) {
        this.state.set(state);
    }

    void markUnavailable() {
        this.state(State.UNAVAILABLE);
    }

    boolean isClosed() {
        return this.closed.get();
    }

    private static final class DefaultPurgeStatus
    implements Management.PurgeStatus {
        private final long messageCount;

        private DefaultPurgeStatus(long messageCount) {
            this.messageCount = messageCount;
        }

        @Override
        public long messageCount() {
            return this.messageCount;
        }
    }

    static enum State {
        CREATED,
        OPEN,
        UNAVAILABLE,
        CLOSED;

    }

    private static class Response<T> {
        private final int code;
        private final T body;

        private Response(int code, T body) {
            this.code = code;
            this.body = body;
        }

        int code() {
            return this.code;
        }

        T body() {
            return this.body;
        }
    }

    private static class DefaultQueueInfo
    implements Management.QueueInfo {
        private final String name;
        private final boolean durable;
        private final boolean autoDelete;
        private final boolean exclusive;
        private final Management.QueueType type;
        private final Map<String, Object> arguments;
        private final String leader;
        private final List<String> members;
        private final long messageCount;
        private final int consumerCount;

        private DefaultQueueInfo(Map<String, Object> response) {
            this.name = (String)response.get("name");
            this.durable = (Boolean)response.get("durable");
            this.autoDelete = (Boolean)response.get("auto_delete");
            this.exclusive = (Boolean)response.get("exclusive");
            this.type = Management.QueueType.valueOf(((String)response.get("type")).toUpperCase(Locale.ENGLISH));
            this.arguments = Map.copyOf((Map)response.get("arguments"));
            this.leader = (String)response.get("leader");
            String[] members = (String[])response.get("replicas");
            this.members = members == null || members.length == 0 ? Collections.emptyList() : List.of(members);
            this.messageCount = ((Number)response.get("message_count")).longValue();
            this.consumerCount = ((Number)response.get("consumer_count")).intValue();
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public boolean durable() {
            return this.durable;
        }

        @Override
        public boolean autoDelete() {
            return this.autoDelete;
        }

        @Override
        public boolean exclusive() {
            return this.exclusive;
        }

        @Override
        public Management.QueueType type() {
            return this.type;
        }

        @Override
        public Map<String, Object> arguments() {
            return this.arguments;
        }

        @Override
        public String leader() {
            return this.leader;
        }

        @Override
        public List<String> replicas() {
            return this.members();
        }

        @Override
        public List<String> members() {
            return this.members;
        }

        @Override
        public long messageCount() {
            return this.messageCount;
        }

        @Override
        public int consumerCount() {
            return this.consumerCount;
        }

        public String toString() {
            return "DefaultQueueInfo{name='" + this.name + "', durable=" + this.durable + ", autoDelete=" + this.autoDelete + ", exclusive=" + this.exclusive + ", type=" + String.valueOf((Object)this.type) + ", arguments=" + String.valueOf(this.arguments) + ", leader='" + this.leader + "', replicas=" + String.valueOf(this.members) + ", messageCount=" + this.messageCount + ", consumerCount=" + this.consumerCount + "}";
        }
    }

    private static class OutstandingRequest {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final AtomicReference<Message<?>> responseMessage = new AtomicReference();
        private final AtomicReference<Response<?>> response = new AtomicReference();
        private final AtomicReference<AmqpException> exception = new AtomicReference();
        private final Duration timeout;

        private OutstandingRequest(Duration timeout) {
            this.timeout = timeout;
        }

        void block() {
            boolean completed;
            long start = System.nanoTime();
            try {
                completed = this.latch.await(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new AmqpException("Interrupted while waiting for management response", new Object[0]);
            }
            if (this.exception.get() != null) {
                throw this.exception.get();
            }
            if (!completed) {
                Duration duration = Duration.ofNanos(System.nanoTime() - start);
                throw new AmqpException("Could not get management response in %d ms", duration.toMillis());
            }
        }

        void complete(Message<?> response) throws ClientException {
            this.responseMessage.set(response);
            this.response.set(new Response(Integer.parseInt(response.subject()), response.body()));
            this.latch.countDown();
        }

        void fail(AmqpException e) {
            this.exception.set(e);
            this.latch.countDown();
        }

        Message<?> responseMessage() {
            return this.responseMessage.get();
        }

        private <K, V> Response<Map<K, V>> mapResponse() {
            return this.response.get();
        }

        private Response<?> response() {
            return this.response.get();
        }

        private <K, V> Map<K, V> responseBodyAsMap() throws ClientException {
            return (Map)this.responseMessage.get().body();
        }

        private <T> List<T> responseBodyAsList() throws ClientException {
            return (List)this.responseMessage.get().body();
        }
    }
}

