/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.Resource;
import com.rabbitmq.client.amqp.impl.AmqpConnection;
import com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder;
import com.rabbitmq.client.amqp.impl.AmqpMessage;
import com.rabbitmq.client.amqp.impl.DefaultAddressBuilder;
import com.rabbitmq.client.amqp.impl.ExceptionUtils;
import com.rabbitmq.client.amqp.impl.ResourceBase;
import com.rabbitmq.client.amqp.impl.RetryUtils;
import com.rabbitmq.client.amqp.impl.SessionHandler;
import com.rabbitmq.client.amqp.impl.Utils;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
import com.rabbitmq.qpid.protonj2.client.Delivery;
import com.rabbitmq.qpid.protonj2.client.DeliveryMode;
import com.rabbitmq.qpid.protonj2.client.DeliveryState;
import com.rabbitmq.qpid.protonj2.client.Receiver;
import com.rabbitmq.qpid.protonj2.client.ReceiverOptions;
import com.rabbitmq.qpid.protonj2.client.Session;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIOException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.impl.ClientReceiver;
import com.rabbitmq.qpid.protonj2.client.util.DeliveryQueue;
import com.rabbitmq.qpid.protonj2.engine.EventHandler;
import com.rabbitmq.qpid.protonj2.engine.Scheduler;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonLinkCreditState;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonReceiver;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow;
import com.rabbitmq.qpid.protonj2.types.DescribedType;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AmqpConsumer
extends ResourceBase
implements Consumer {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0L);
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConsumer.class);
    private volatile ClientReceiver nativeReceiver;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final int initialCredits;
    private final Consumer.MessageHandler messageHandler;
    private final Long id;
    private final String address;
    private final String queue;
    private final Map<String, DescribedType> filters;
    private final Map<String, Object> linkProperties;
    private final ConsumerBuilder.SubscriptionListener subscriptionListener;
    private final AmqpConnection connection;
    private final AtomicReference<PauseStatus> pauseStatus = new AtomicReference<PauseStatus>(PauseStatus.UNPAUSED);
    private final AtomicReference<CountDownLatch> echoedFlowAfterPauseLatch = new AtomicReference();
    private final MetricsCollector metricsCollector;
    private final SessionHandler sessionHandler;
    private final AtomicLong unsettledMessageCount = new AtomicLong(0L);
    private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
    private final ExecutorService dispatchingExecutorService;
    private final java.util.function.Consumer<Delivery> nativeHandler;
    private final java.util.function.Consumer<ClientException> nativeCloseHandler;
    private ProtonReceiver protonReceiver;
    private volatile Scheduler protonExecutor;
    private DeliveryQueue protonDeliveryQueue;
    private ProtonSessionIncomingWindow sessionWindow;
    private ProtonLinkCreditState creditState;

    AmqpConsumer(AmqpConsumerBuilder builder) {
        super(builder.listeners());
        this.id = ID_SEQUENCE.getAndIncrement();
        this.initialCredits = builder.initialCredits();
        this.messageHandler = builder.connection().observationCollector().subscribe(builder.queue(), builder.messageHandler());
        DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
        addressBuilder.queue(builder.queue());
        this.address = addressBuilder.address();
        this.queue = builder.queue();
        this.filters = Map.copyOf(builder.filters());
        this.linkProperties = Map.copyOf(builder.properties());
        this.subscriptionListener = Optional.ofNullable(builder.subscriptionListener()).orElse(AmqpConsumerBuilder.NO_OP_SUBSCRIPTION_LISTENER);
        this.connection = builder.connection();
        this.sessionHandler = this.connection.createSessionHandler();
        this.dispatchingExecutorService = this.connection.dispatchingExecutorService();
        this.nativeHandler = this.createNativeHandler(this.messageHandler);
        this.nativeCloseHandler = e -> this.dispatchingExecutorService.submit(() -> {
            boolean ignored = AmqpConsumer.maybeCloseConsumerOnException(this, e);
        });
        this.nativeReceiver = this.createNativeReceiver(this.sessionHandler.session(), this.address, this.linkProperties, this.filters, this.subscriptionListener, this.nativeHandler, this.nativeCloseHandler);
        this.initStateFromNativeReceiver(this.nativeReceiver);
        this.metricsCollector = this.connection.metricsCollector();
        try {
            this.nativeReceiver.addCredit(this.initialCredits);
        }
        catch (ClientException e2) {
            AmqpException ex = ExceptionUtils.convert(e2);
            this.close(ex);
            throw ex;
        }
        this.state(Resource.State.OPEN);
        this.metricsCollector.openConsumer();
    }

    @Override
    public void pause() {
        block6: {
            if (this.pauseStatus.compareAndSet(PauseStatus.UNPAUSED, PauseStatus.PAUSING)) {
                try {
                    CountDownLatch latch = new CountDownLatch(1);
                    this.echoedFlowAfterPauseLatch.set(latch);
                    this.protonExecutor.execute(this::doPause);
                    try {
                        boolean echoed = latch.await(10L, TimeUnit.SECONDS);
                        if (echoed) {
                            this.pauseStatus.set(PauseStatus.PAUSED);
                            break block6;
                        }
                        LOGGER.warn("Did not receive echoed flow to pause receiver");
                        this.pauseStatus.set(PauseStatus.UNPAUSED);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                catch (Exception e) {
                    this.pauseStatus.set(PauseStatus.UNPAUSED);
                }
            }
        }
    }

    @Override
    public void unpause() {
        this.checkOpen();
        if (this.pauseStatus.compareAndSet(PauseStatus.PAUSED, PauseStatus.UNPAUSED)) {
            try {
                this.nativeReceiver.addCredit(this.initialCredits);
            }
            catch (ClientException e) {
                throw ExceptionUtils.convert(e);
            }
        }
    }

    @Override
    public long unsettledMessageCount() {
        return this.unsettledMessageCount.get();
    }

    @Override
    public void close() {
        this.close(null);
    }

    private ClientReceiver createNativeReceiver(Session nativeSession, String address, Map<String, Object> properties, Map<String, DescribedType> filters, ConsumerBuilder.SubscriptionListener subscriptionListener, java.util.function.Consumer<Delivery> nativeHandler, java.util.function.Consumer<ClientException> closeHandler) {
        try {
            filters = new LinkedHashMap<String, DescribedType>(filters);
            ConsumerBuilder.StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
            subscriptionListener.preSubscribe(() -> streamOptions);
            ReceiverOptions receiverOptions = (ReceiverOptions)((ReceiverOptions)((ReceiverOptions)new ReceiverOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)).autoAccept(false).autoSettle(false)).handler(nativeHandler).closeHandler(closeHandler).creditWindow(0).properties(properties);
            Map<String, Object> localSourceFilters = Collections.emptyMap();
            if (!filters.isEmpty()) {
                localSourceFilters = Map.copyOf(filters);
                receiverOptions.sourceOptions().filters(localSourceFilters);
            }
            ClientReceiver receiver = (ClientReceiver)ExceptionUtils.wrapGet(nativeSession.openReceiver(address, receiverOptions).openFuture());
            if (!filters.isEmpty()) {
                Map<String, String> remoteSourceFilters = receiver.source().filters();
                for (Map.Entry<String, Object> localEntry : localSourceFilters.entrySet()) {
                    if (remoteSourceFilters.containsKey(localEntry.getKey())) continue;
                    LOGGER.warn("Missing filter value in attach response: {} => {}", (Object)localEntry.getKey(), localEntry.getValue());
                }
            }
            return receiver;
        }
        catch (ClientException e) {
            throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);
        }
    }

    private java.util.function.Consumer<Delivery> createNativeHandler(Consumer.MessageHandler handler) {
        return delivery -> {
            this.unsettledMessageCount.incrementAndGet();
            this.metricsCollector.consume();
            this.dispatchingExecutorService.submit(() -> {
                AmqpMessage message;
                try {
                    message = new AmqpMessage(delivery.message());
                }
                catch (ClientException e) {
                    LOGGER.warn("Error while decoding message: {}", (Object)e.getMessage());
                    try {
                        delivery.disposition(DeliveryState.rejected("", ""), true);
                    }
                    catch (ClientException ex) {
                        LOGGER.warn("Error while rejecting non-decoded message: {}", (Object)ex.getMessage());
                    }
                    return;
                }
                DeliveryContext context = new DeliveryContext((Delivery)delivery, this.protonExecutor, this.metricsCollector, this.unsettledMessageCount, this.replenishCreditOperation, this);
                handler.handle(context, message);
            });
        };
    }

    private Runnable createReceiveTask(Receiver receiver, Consumer.MessageHandler messageHandler) {
        return () -> {
            try {
                receiver.addCredit(this.initialCredits);
                while (!Thread.currentThread().isInterrupted()) {
                    Delivery delivery = receiver.receive(100L, TimeUnit.MILLISECONDS);
                    if (delivery == null) continue;
                    this.unsettledMessageCount.incrementAndGet();
                    this.metricsCollector.consume();
                    AmqpMessage message = new AmqpMessage(delivery.message());
                    DeliveryContext context = new DeliveryContext(delivery, this.protonExecutor, this.metricsCollector, this.unsettledMessageCount, this.replenishCreditOperation, this);
                    messageHandler.handle(context, message);
                }
            }
            catch (ClientLinkRemotelyClosedException | ClientSessionRemotelyClosedException e) {
                if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e) || ExceptionUtils.unauthorizedAccess(e)) {
                    this.close(ExceptionUtils.convert(e));
                }
            }
            catch (ClientConnectionRemotelyClosedException e) {
            }
            catch (ClientException e) {
                java.util.function.Consumer<String> log = this.closed.get() ? m -> LOGGER.debug(m, (Throwable)e) : m -> LOGGER.warn(m, (Throwable)e);
                log.accept("Error while polling AMQP receiver");
            }
            catch (Exception e) {
                LOGGER.warn("Unexpected error in consumer loop", (Throwable)e);
            }
        };
    }

    void recoverAfterConnectionFailure() {
        this.nativeReceiver = RetryUtils.callAndMaybeRetry(() -> this.createNativeReceiver(this.sessionHandler.sessionNoCheck(), this.address, this.linkProperties, this.filters, this.subscriptionListener, this.nativeHandler, this.nativeCloseHandler), e -> {
            boolean shouldRetry = e instanceof AmqpException.AmqpResourceClosedException && e.getMessage().contains("stream queue") && e.getMessage().contains("does not have a running replica on the local node");
            LOGGER.debug("Retrying receiver creation on consumer recovery: {}", (Object)shouldRetry);
            return shouldRetry;
        }, List.of(Duration.ofSeconds(1L), Duration.ofSeconds(2L), Duration.ofSeconds(3L), BackOffDelayPolicy.TIMEOUT), "Create AMQP receiver to address '%s'", this.address);
        this.initStateFromNativeReceiver(this.nativeReceiver);
        this.pauseStatus.set(PauseStatus.UNPAUSED);
        this.unsettledMessageCount.set(0L);
        try {
            this.nativeReceiver.addCredit(this.initialCredits);
        }
        catch (ClientException e2) {
            throw ExceptionUtils.convert(e2);
        }
    }

    void close(Throwable cause) {
        if (this.closed.compareAndSet(false, true)) {
            this.state(Resource.State.CLOSING, cause);
            this.connection.removeConsumer(this);
            try {
                this.nativeReceiver.close();
                this.sessionHandler.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing receiver", (Throwable)e);
            }
            this.state(Resource.State.CLOSED, cause);
            this.metricsCollector.closeConsumer();
        }
    }

    long id() {
        return this.id;
    }

    String queue() {
        return this.queue;
    }

    private void initStateFromNativeReceiver(ClientReceiver receiver) {
        try {
            Scheduler protonExecutor = receiver.executor();
            CountDownLatch fieldsSetLatch = new CountDownLatch(1);
            protonExecutor.execute(() -> {
                this.protonReceiver = (ProtonReceiver)receiver.protonReceiver();
                this.creditState = this.protonReceiver.getCreditState();
                this.sessionWindow = this.protonReceiver.sessionWindow();
                this.protonDeliveryQueue = receiver.deliveryQueue();
                EventHandler<com.rabbitmq.qpid.protonj2.engine.Receiver> eventHandler = this.protonReceiver.linkCreditUpdatedHandler();
                EventHandler<com.rabbitmq.qpid.protonj2.engine.Receiver> decorator = target -> {
                    eventHandler.handle((com.rabbitmq.qpid.protonj2.engine.Receiver)target);
                    CountDownLatch latch = this.echoedFlowAfterPauseLatch.getAndSet(null);
                    if (latch != null) {
                        latch.countDown();
                    }
                };
                this.protonReceiver.creditStateUpdateHandler((EventHandler)decorator);
                this.protonExecutor = protonExecutor;
                fieldsSetLatch.countDown();
            });
            if (!fieldsSetLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AmqpException("Could not initialize consumer internal state", new Object[0]);
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void replenishCreditIfNeeded() {
        if (!this.pausedOrPausing() && this.state() == Resource.State.OPEN) {
            int potentialPrefetch;
            int creditWindow = this.initialCredits;
            int currentCredit = this.protonReceiver.getCredit();
            if ((double)currentCredit <= (double)creditWindow * 0.5 && (double)(potentialPrefetch = currentCredit + this.protonDeliveryQueue.size()) <= (double)creditWindow * 0.7) {
                int additionalCredit = creditWindow - potentialPrefetch;
                try {
                    this.protonReceiver.addCredit(additionalCredit);
                }
                catch (Exception ex) {
                    LOGGER.debug("Error caught during credit top-up", (Throwable)ex);
                }
            }
        }
    }

    private void doPause() {
        this.creditState.updateCredit(0);
        this.creditState.updateEcho(true);
        this.sessionWindow.writeFlow(this.protonReceiver);
    }

    boolean pausedOrPausing() {
        return this.pauseStatus.get() != PauseStatus.UNPAUSED;
    }

    private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) {
        return ExceptionUtils.maybeCloseConsumerOnException(consumer::close, ex);
    }

    private static class DeliveryContext
    implements Consumer.Context {
        private final AtomicBoolean settled = new AtomicBoolean(false);
        private final Delivery delivery;
        private final Scheduler protonExecutor;
        private final MetricsCollector metricsCollector;
        private final AtomicLong unsettledMessageCount;
        private final Runnable replenishCreditOperation;
        private final AmqpConsumer consumer;

        private DeliveryContext(Delivery delivery, Scheduler protonExecutor, MetricsCollector metricsCollector, AtomicLong unsettledMessageCount, Runnable replenishCreditOperation, AmqpConsumer consumer) {
            this.delivery = delivery;
            this.protonExecutor = protonExecutor;
            this.metricsCollector = metricsCollector;
            this.unsettledMessageCount = unsettledMessageCount;
            this.replenishCreditOperation = replenishCreditOperation;
            this.consumer = consumer;
        }

        @Override
        public void accept() {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.accepted(), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED);
                }
                catch (Exception e) {
                    this.handleException(e, "accept");
                }
            }
        }

        @Override
        public void discard() {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.rejected("", ""), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
                }
                catch (Exception e) {
                    this.handleException(e, "discard");
                }
            }
        }

        @Override
        public void discard(Map<String, Object> annotations) {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    annotations = annotations == null ? Collections.emptyMap() : annotations;
                    Utils.checkMessageAnnotations(annotations);
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.modified(true, true, annotations), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
                }
                catch (Exception e) {
                    this.handleException(e, "discard (modified)");
                }
            }
        }

        @Override
        public void requeue() {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.released(), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
                }
                catch (Exception e) {
                    this.handleException(e, "requeue");
                }
            }
        }

        @Override
        public void requeue(Map<String, Object> annotations) {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    annotations = annotations == null ? Collections.emptyMap() : annotations;
                    Utils.checkMessageAnnotations(annotations);
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.modified(false, false, annotations), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
                }
                catch (Exception e) {
                    this.handleException(e, "requeue (modified)");
                }
            }
        }

        private void handleException(Exception ex, String operation) {
            if (AmqpConsumer.maybeCloseConsumerOnException(this.consumer, ex)) {
                return;
            }
            if (ex instanceof ClientIllegalStateException || ex instanceof RejectedExecutionException || ex instanceof ClientIOException) {
                LOGGER.debug("message {} failed: {}", (Object)operation, (Object)ex.getMessage());
            } else if (ex instanceof ClientException) {
                throw ExceptionUtils.convert((ClientException)ex);
            }
        }
    }

    static enum PauseStatus {
        UNPAUSED,
        PAUSING,
        PAUSED;

    }
}

