/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessageWithLockToken;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorSession;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;

public class ServiceBusReactorReceiver
extends ReactorReceiver
implements ServiceBusReceiveLink {
    private static final Message EMPTY_MESSAGE = Proton.message();
    private final ClientLogger logger = new ClientLogger(ServiceBusReactorReceiver.class);
    private final ConcurrentHashMap<String, Delivery> unsettledDeliveries = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, UpdateDispositionWorkItem> pendingUpdates = new ConcurrentHashMap();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Disposable subscription;
    private final Receiver receiver;
    private final boolean isSettled;
    private final Duration timeout;
    private final AmqpRetryPolicy retryPolicy;
    private final ReceiveLinkHandler handler;
    private final ReactorProvider provider;
    private final Mono<String> sessionIdMono;
    private final Mono<OffsetDateTime> sessionLockedUntil;

    public ServiceBusReactorReceiver(AmqpConnection connection, String entityPath, Receiver receiver, ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, Duration timeout, AmqpRetryPolicy retryPolicy) {
        super(connection, entityPath, receiver, handler, tokenManager, provider.getReactorDispatcher(), retryPolicy.getRetryOptions());
        this.receiver = receiver;
        this.handler = handler;
        this.provider = provider;
        this.isSettled = receiver.getSenderSettleMode() == SenderSettleMode.SETTLED;
        this.timeout = timeout;
        this.retryPolicy = retryPolicy;
        this.subscription = Flux.interval((Duration)timeout).subscribe(i -> this.cleanupWorkItems());
        this.sessionIdMono = this.getEndpointStates().filter(x -> x == AmqpEndpointState.ACTIVE).next().flatMap(state -> {
            Map remoteSource = ((Source)receiver.getRemoteSource()).getFilter();
            Object value = remoteSource.get(ServiceBusReactorSession.SESSION_FILTER);
            if (value == null) {
                this.logger.info("entityPath[{}], linkName[{}]. There is no session id.", new Object[]{entityPath, this.getLinkName()});
                return Mono.empty();
            }
            String actualSessionId = String.valueOf(value);
            return Mono.just((Object)actualSessionId);
        }).cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO);
        this.sessionLockedUntil = this.getEndpointStates().filter(x -> x == AmqpEndpointState.ACTIVE).next().map(state -> {
            if (receiver.getRemoteProperties() != null && receiver.getRemoteProperties().containsKey(ServiceBusReactorSession.LOCKED_UNTIL_UTC)) {
                long ticks = (Long)receiver.getRemoteProperties().get(ServiceBusReactorSession.LOCKED_UNTIL_UTC);
                return MessageUtils.convertDotNetTicksToOffsetDateTime(ticks);
            }
            this.logger.info("entityPath[{}], linkName[{}]. Locked until not set.", new Object[]{entityPath, this.getLinkName()});
            return Instant.EPOCH.atOffset(ZoneOffset.UTC);
        }).cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO);
    }

    @Override
    public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("Cannot perform operations on a disposed receiver."));
        }
        return this.updateDispositionInternal(lockToken, deliveryState);
    }

    public Flux<Message> receive() {
        return super.receive().filter(message -> message != EMPTY_MESSAGE).publishOn(Schedulers.boundedElastic());
    }

    @Override
    public Mono<String> getSessionId() {
        return this.sessionIdMono;
    }

    @Override
    public Mono<OffsetDateTime> getSessionLockedUntil() {
        return this.sessionLockedUntil;
    }

    public Mono<Void> closeAsync() {
        return this.closeAsync("User invoked close operation.", null);
    }

    protected Mono<Void> closeAsync(String message, ErrorCondition errorCondition) {
        Mono disposeMono;
        if (this.isDisposed.getAndSet(true)) {
            return super.getIsClosedMono();
        }
        this.cleanupWorkItems();
        if (!this.pendingUpdates.isEmpty()) {
            ArrayList<Object> pending = new ArrayList<Object>();
            StringJoiner builder = new StringJoiner(", ");
            for (UpdateDispositionWorkItem workItem : this.pendingUpdates.values()) {
                if (workItem.hasTimedout()) continue;
                if (workItem.getDeliveryState() instanceof TransactionalState) {
                    pending.add(this.updateDispositionInternal(workItem.getLockToken(), (DeliveryState)Released.getInstance()));
                } else {
                    pending.add(workItem.getMono());
                }
                builder.add(workItem.getLockToken());
            }
            this.logger.info("Waiting for pending updates to complete. Locks: {}", new Object[]{builder.toString()});
            disposeMono = Mono.when(pending);
        } else {
            disposeMono = Mono.empty();
        }
        return disposeMono.onErrorResume(error -> {
            this.logger.info("There was an exception while disposing of all links.", new Object[]{error});
            return Mono.empty();
        }).doFinally(signal -> this.subscription.dispose()).then(super.closeAsync(message, errorCondition));
    }

    protected Message decodeDelivery(Delivery delivery) {
        byte[] deliveryTag = delivery.getTag();
        UUID lockToken = deliveryTag != null && deliveryTag.length == 16 ? MessageUtils.convertDotNetBytesToUUID(deliveryTag) : MessageUtils.ZERO_LOCK_TOKEN;
        String lockTokenString = lockToken.toString();
        if (lockToken == MessageUtils.ZERO_LOCK_TOKEN || !this.unsettledDeliveries.containsKey(lockTokenString)) {
            int messageSize = delivery.pending();
            byte[] buffer = new byte[messageSize];
            int read = this.receiver.recv(buffer, 0, messageSize);
            Message message = Proton.message();
            message.decode(buffer, 0, read);
            if (this.isSettled) {
                delivery.disposition((DeliveryState)Accepted.getInstance());
                delivery.settle();
            } else {
                this.unsettledDeliveries.putIfAbsent(lockToken.toString(), delivery);
                this.receiver.advance();
            }
            return new MessageWithLockToken(message, lockToken);
        }
        this.updateOutcome(lockTokenString, delivery);
        return EMPTY_MESSAGE;
    }

    private Mono<Void> updateDispositionInternal(String lockToken, DeliveryState deliveryState) {
        Delivery unsettled = this.unsettledDeliveries.get(lockToken);
        if (unsettled == null) {
            this.logger.warning("entityPath[{}], linkName[{}], deliveryTag[{}]. Delivery not found to update disposition.", new Object[]{this.getEntityPath(), this.getLinkName(), lockToken});
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)Exceptions.propagate((Throwable)new IllegalArgumentException("Delivery not on receive link.")));
        }
        UpdateDispositionWorkItem workItem = new UpdateDispositionWorkItem(lockToken, deliveryState, this.timeout);
        Mono result = Mono.create(sink -> {
            workItem.start((MonoSink<Void>)sink);
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    unsettled.disposition(deliveryState);
                    this.pendingUpdates.put(lockToken, workItem);
                });
            }
            catch (IOException error) {
                sink.error((Throwable)new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", (Throwable)error, this.handler.getErrorContext((Link)this.receiver)));
            }
        }).cache();
        workItem.setMono((Mono<Void>)result);
        return result;
    }

    private void updateOutcome(String lockToken, Delivery delivery) {
        DeliveryState remoteState = delivery.getRemoteState();
        this.logger.verbose("entityPath[{}], linkName[{}], deliveryTag[{}], state[{}] Received update disposition delivery.", new Object[]{this.getEntityPath(), this.getLinkName(), lockToken, remoteState});
        Object remoteOutcome = remoteState instanceof Outcome ? (Outcome)remoteState : (remoteState instanceof TransactionalState ? ((TransactionalState)remoteState).getOutcome() : null);
        if (remoteOutcome == null) {
            this.logger.warning("linkName[{}], deliveryTag[{}]. No outcome associated with delivery. Delivery: {}", new Object[]{this.getLinkName(), lockToken, delivery});
            return;
        }
        UpdateDispositionWorkItem workItem = this.pendingUpdates.get(lockToken);
        if (workItem == null) {
            this.logger.warning("linkName[{}], deliveryTag[{}]. No pending update for delivery. Delivery: {}", new Object[]{this.getLinkName(), lockToken, delivery});
            return;
        }
        if (remoteState.getType() == workItem.getDeliveryState().getType()) {
            this.completeWorkItem(lockToken, delivery, (MonoSink<Void>)workItem.getSink(), null);
            return;
        }
        this.logger.info("Received delivery '{}' state '{}' doesn't match expected state '{}'", new Object[]{lockToken, remoteState, workItem.getDeliveryState()});
        switch (remoteState.getType()) {
            case Rejected: {
                Rejected rejected = (Rejected)remoteOutcome;
                ErrorCondition errorCondition = rejected.getError();
                Exception exception = ExceptionUtil.toException((String)errorCondition.getCondition().toString(), (String)errorCondition.getDescription(), (AmqpErrorContext)this.handler.getErrorContext((Link)this.receiver));
                Duration retry = this.retryPolicy.calculateRetryDelay((Throwable)exception, workItem.incrementRetry());
                if (retry == null) {
                    this.logger.info("deliveryTag[{}], state[{}]. Retry attempts exhausted.", new Object[]{lockToken, exception});
                    this.completeWorkItem(lockToken, delivery, (MonoSink<Void>)workItem.getSink(), exception);
                    break;
                }
                workItem.setLastException(exception);
                workItem.resetStartTime();
                try {
                    this.provider.getReactorDispatcher().invoke(() -> delivery.disposition(workItem.getDeliveryState()));
                }
                catch (IOException error) {
                    RuntimeException amqpException = this.logger.logExceptionAsError((RuntimeException)new AmqpException(false, "linkName[%s], deliveryTag[%s]. Retrying updateDisposition failed to dispatch to Reactor.", (Throwable)error, this.handler.getErrorContext((Link)this.receiver)));
                    this.completeWorkItem(lockToken, delivery, (MonoSink<Void>)workItem.getSink(), amqpException);
                }
                break;
            }
            case Released: {
                AmqpException cancelled = new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED, "AMQP layer unexpectedly aborted or disconnected.", this.handler.getErrorContext((Link)this.receiver));
                this.logger.info("deliveryTag[{}], state[{}]. Completing pending updateState operation with exception.", new Object[]{lockToken, remoteState.getType(), cancelled});
                this.completeWorkItem(lockToken, delivery, (MonoSink<Void>)workItem.getSink(), (Throwable)cancelled);
                break;
            }
            default: {
                AmqpException error = new AmqpException(false, remoteOutcome.toString(), this.handler.getErrorContext((Link)this.receiver));
                this.logger.info("deliveryTag[{}], state[{}] Completing pending updateState operation with exception.", new Object[]{lockToken, remoteState.getType(), error});
                this.completeWorkItem(lockToken, delivery, (MonoSink<Void>)workItem.getSink(), (Throwable)error);
            }
        }
    }

    private void cleanupWorkItems() {
        if (this.pendingUpdates.isEmpty()) {
            return;
        }
        this.logger.verbose("linkName[{}]: Cleaning timed out update work tasks.", new Object[]{this.getLinkName()});
        this.pendingUpdates.forEach((key, value) -> {
            if (value == null || !((UpdateDispositionWorkItem)value).hasTimedout()) {
                return;
            }
            this.pendingUpdates.remove(key);
            Throwable error = ((UpdateDispositionWorkItem)value).getLastException() != null ? ((UpdateDispositionWorkItem)value).getLastException() : new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Update disposition request timed out.", this.handler.getErrorContext((Link)this.receiver));
            this.completeWorkItem((String)key, null, (MonoSink<Void>)((UpdateDispositionWorkItem)value).getSink(), error);
        });
    }

    private void completeWorkItem(String lockToken, Delivery delivery, MonoSink<Void> sink, Throwable error) {
        boolean isSettled;
        boolean bl = isSettled = delivery != null && delivery.remotelySettled();
        if (isSettled) {
            delivery.settle();
        }
        if (error != null) {
            Throwable loggedError = error instanceof RuntimeException ? this.logger.logExceptionAsError((RuntimeException)error) : error;
            sink.error(loggedError);
        } else {
            sink.success();
        }
        if (isSettled) {
            this.pendingUpdates.remove(lockToken);
            this.unsettledDeliveries.remove(lockToken);
        }
    }

    private static final class UpdateDispositionWorkItem {
        private final String lockToken;
        private final DeliveryState state;
        private final Duration timeout;
        private final AtomicInteger retryAttempts = new AtomicInteger();
        private final AtomicBoolean isDisposed = new AtomicBoolean();
        private Mono<Void> mono;
        private Instant expirationTime;
        private MonoSink<Void> sink;
        private Throwable throwable;

        private UpdateDispositionWorkItem(String lockToken, DeliveryState state, Duration timeout) {
            this.lockToken = lockToken;
            this.state = state;
            this.timeout = timeout;
        }

        private boolean hasTimedout() {
            return this.expirationTime.isBefore(Instant.now());
        }

        private void resetStartTime() {
            this.expirationTime = Instant.now().plus(this.timeout);
        }

        private int incrementRetry() {
            return this.retryAttempts.incrementAndGet();
        }

        private Throwable getLastException() {
            return this.throwable;
        }

        private void setLastException(Throwable throwable) {
            this.throwable = throwable;
        }

        private void setMono(Mono<Void> mono) {
            this.mono = mono;
        }

        private Mono<Void> getMono() {
            return this.mono;
        }

        private MonoSink<Void> getSink() {
            return this.sink;
        }

        private void start(MonoSink<Void> sink) {
            Objects.requireNonNull(sink, "'sink' cannot be null.");
            this.sink = sink;
            this.sink.onDispose(() -> this.isDisposed.set(true));
            this.sink.onCancel(() -> this.isDisposed.set(true));
            this.resetStartTime();
        }

        private DeliveryState getDeliveryState() {
            return this.state;
        }

        public String getLockToken() {
            return this.lockToken;
        }
    }
}

