/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public class ReactorReceiver
implements AmqpReceiveLink,
AsyncCloseable,
AutoCloseable {
    private final String entityPath;
    private final Receiver receiver;
    private final ReceiveLinkHandler handler;
    private final TokenManager tokenManager;
    private final ReactorDispatcher dispatcher;
    private final Disposable subscriptions;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final Flux<Message> messagesProcessor;
    private final AmqpRetryOptions retryOptions;
    private final ClientLogger logger;
    private final Flux<AmqpEndpointState> endpointStates;
    private final AtomicReference<Supplier<Integer>> creditSupplier = new AtomicReference();

    protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Receiver receiver, ReceiveLinkHandler handler, TokenManager tokenManager, ReactorDispatcher dispatcher, AmqpRetryOptions retryOptions) {
        this.entityPath = entityPath;
        this.receiver = receiver;
        this.handler = handler;
        this.tokenManager = tokenManager;
        this.dispatcher = dispatcher;
        Map<String, Object> loggingContext = AmqpLoggingUtils.createContextWithConnectionId(handler.getConnectionId());
        loggingContext.put("linkName", this.handler.getLinkName());
        this.logger = new ClientLogger(ReactorReceiver.class, loggingContext);
        this.messagesProcessor = this.handler.getDeliveredMessages().flatMap(delivery -> Mono.create(sink -> {
            try {
                this.dispatcher.invoke(() -> {
                    Message message = this.decodeDelivery((Delivery)delivery);
                    int creditsLeft = receiver.getRemoteCredit();
                    if (creditsLeft > 0) {
                        sink.success((Object)message);
                        return;
                    }
                    Supplier<Integer> supplier = this.creditSupplier.get();
                    Integer credits = supplier.get();
                    if (credits != null && credits > 0) {
                        this.logger.atInfo().addKeyValue("credits", (Object)credits).log("Adding credits.");
                        receiver.flow(credits.intValue());
                    } else {
                        this.logger.atVerbose().addKeyValue("credits", (Object)credits).log("There are no credits to add.");
                    }
                    sink.success((Object)message);
                });
            }
            catch (IOException | RejectedExecutionException e) {
                sink.error((Throwable)e);
            }
        }), 1);
        this.retryOptions = retryOptions;
        this.endpointStates = this.handler.getEndpointStates().map(state -> {
            this.logger.atVerbose().addKeyValue("entityPath", entityPath).log("State {}", new Object[]{state});
            return AmqpEndpointStateUtil.getConnectionState(state);
        }).doOnError(error -> {
            String message = this.isDisposed.getAndSet(true) ? "This was already disposed. Dropping error." : "Freeing resources due to error.";
            this.logger.atInfo().addKeyValue("entityPath", entityPath).log(message);
            this.completeClose();
        }).doOnComplete(() -> {
            String message = this.isDisposed.getAndSet(true) ? "This was already disposed." : "Freeing resources.";
            this.logger.atVerbose().addKeyValue("entityPath", entityPath).log(message);
            this.completeClose();
        }).cache(1);
        this.subscriptions = Disposables.composite((Disposable[])new Disposable[]{this.endpointStates.subscribe(), this.tokenManager.getAuthorizationResults().onErrorResume(error -> {
            Mono<Void> operation = this.closeAsync("Token renewal failure. Disposing receive link.", new ErrorCondition(Symbol.getSymbol((String)AmqpErrorCondition.NOT_ALLOWED.getErrorCondition()), error.getMessage()));
            return operation.then(Mono.empty());
        }).subscribe(response -> this.logger.atVerbose().addKeyValue("response", (Object)response).log("Token refreshed."), error -> {}, () -> {
            this.logger.atVerbose().addKeyValue("entityPath", entityPath).log("Authorization completed.");
            this.closeAsync("Authorization completed. Disposing.", null).subscribe();
        }), amqpConnection.getShutdownSignals().flatMap(signal -> {
            this.logger.verbose("Shutdown signal received.");
            return this.closeAsync("Connection shutdown.", null);
        }).subscribe()});
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates.distinct();
    }

    @Override
    public Flux<Message> receive() {
        return this.messagesProcessor;
    }

    @Override
    public Mono<Void> addCredits(int credits) {
        if (this.isDisposed()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("Cannot add credits to closed link: " + this.getLinkName()));
        }
        return Mono.create(sink -> {
            try {
                this.dispatcher.invoke(() -> {
                    this.receiver.flow(credits);
                    sink.success();
                });
            }
            catch (IOException e) {
                sink.error((Throwable)new UncheckedIOException(String.format("connectionId[%s] linkName[%s] Unable to schedule work to add more credits.", this.handler.getConnectionId(), this.getLinkName()), e));
            }
            catch (RejectedExecutionException e) {
                sink.error((Throwable)e);
            }
        });
    }

    @Override
    public int getCredits() {
        return this.receiver.getRemoteCredit();
    }

    @Override
    public void setEmptyCreditListener(Supplier<Integer> creditSupplier) {
        Objects.requireNonNull(creditSupplier, "'creditSupplier' cannot be null.");
        this.creditSupplier.set(creditSupplier);
    }

    @Override
    public String getLinkName() {
        return this.handler.getLinkName();
    }

    @Override
    public String getEntityPath() {
        return this.entityPath;
    }

    @Override
    public String getHostname() {
        return this.handler.getHostname();
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        this.close();
    }

    @Override
    public void close() {
        this.closeAsync().block(this.retryOptions.getTryTimeout());
    }

    @Override
    public Mono<Void> closeAsync() {
        return this.closeAsync("User invoked close operation.", null);
    }

    protected Message decodeDelivery(Delivery delivery) {
        int messageSize = delivery.pending();
        byte[] buffer = new byte[messageSize];
        int read = this.receiver.recv(buffer, 0, messageSize);
        this.receiver.advance();
        Message message = Proton.message();
        message.decode(buffer, 0, read);
        delivery.settle();
        return message;
    }

    protected Mono<Void> closeAsync(String message, ErrorCondition errorCondition) {
        if (this.isDisposed.getAndSet(true)) {
            return this.getIsClosedMono();
        }
        AmqpLoggingUtils.addErrorCondition(this.logger.atVerbose(), errorCondition).addKeyValue("entityPath", this.entityPath).log("Setting error condition and disposing. {}", new Object[]{message});
        Runnable closeReceiver = () -> {
            if (this.receiver.getLocalState() != EndpointState.CLOSED) {
                this.receiver.close();
                if (this.receiver.getCondition() == null) {
                    this.receiver.setCondition(errorCondition);
                }
            }
        };
        return Mono.fromRunnable(() -> {
            try {
                this.dispatcher.invoke(closeReceiver);
            }
            catch (IOException e) {
                this.logger.warning("IO sink was closed when scheduling work. Manually invoking and completing close.", new Object[]{e});
                closeReceiver.run();
                this.completeClose();
            }
            catch (RejectedExecutionException e) {
                this.logger.info("RejectedExecutionException when scheduling on ReactorDispatcher. Manually invoking and completing close.");
                closeReceiver.run();
                this.completeClose();
            }
        }).then(this.isClosedMono.asMono()).publishOn(Schedulers.boundedElastic());
    }

    protected Mono<Void> getIsClosedMono() {
        return this.isClosedMono.asMono().publishOn(Schedulers.boundedElastic());
    }

    private void completeClose() {
        this.isClosedMono.emitEmpty((signalType, result) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, result).log("Unable to emit shutdown signal.");
            return false;
        });
        this.subscriptions.dispose();
        if (this.tokenManager != null) {
            this.tokenManager.close();
        }
        this.handler.close();
        this.receiver.free();
    }

    public String toString() {
        return String.format("connectionId: [%s] entity path: [%s] linkName: [%s]", this.receiver.getName(), this.entityPath, this.getLinkName());
    }
}

