/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpManagementNode;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.AmqpChannelProcessor;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageUtils;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseUtils;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.DeliveryOutcome;
import com.azure.core.amqp.models.DeliveryState;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.util.HashMap;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

public class ManagementChannel
implements AmqpManagementNode {
    private final TokenManager tokenManager;
    private final AmqpChannelProcessor<RequestResponseChannel> createChannel;
    private final String fullyQualifiedNamespace;
    private final ClientLogger logger;
    private final String entityPath;

    public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> createChannel, String fullyQualifiedNamespace, String entityPath, TokenManager tokenManager) {
        this.createChannel = Objects.requireNonNull(createChannel, "'createChannel' cannot be null.");
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        HashMap<String, String> globalLoggingContext = new HashMap<String, String>();
        globalLoggingContext.put("entityPath", entityPath);
        this.logger = new ClientLogger(ManagementChannel.class, globalLoggingContext);
        this.tokenManager = Objects.requireNonNull(tokenManager, "'tokenManager' cannot be null.");
    }

    @Override
    public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {
        return this.isAuthorized().then(this.createChannel.flatMap(channel -> {
            Message protonJMessage = MessageUtils.toProtonJMessage(message);
            return channel.sendWithAck(protonJMessage).handle((responseMessage, sink) -> this.handleResponse((Message)responseMessage, (SynchronousSink<AmqpAnnotatedMessage>)sink, channel.getErrorContext())).switchIfEmpty(this.errorIfEmpty((RequestResponseChannel)channel, null));
        }));
    }

    @Override
    public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOutcome deliveryOutcome) {
        return this.isAuthorized().then(this.createChannel.flatMap(channel -> {
            Message protonJMessage = MessageUtils.toProtonJMessage(message);
            org.apache.qpid.proton.amqp.transport.DeliveryState protonJDeliveryState = MessageUtils.toProtonJDeliveryState(deliveryOutcome);
            return channel.sendWithAck(protonJMessage, protonJDeliveryState).handle((responseMessage, sink) -> this.handleResponse((Message)responseMessage, (SynchronousSink<AmqpAnnotatedMessage>)sink, channel.getErrorContext())).switchIfEmpty(this.errorIfEmpty((RequestResponseChannel)channel, deliveryOutcome.getDeliveryState()));
        }));
    }

    public Mono<Void> closeAsync() {
        return this.createChannel.flatMap(channel -> channel.closeAsync()).cache();
    }

    private void handleResponse(Message response, SynchronousSink<AmqpAnnotatedMessage> sink, AmqpErrorContext errorContext) {
        if (RequestResponseUtils.isSuccessful(response)) {
            sink.next((Object)MessageUtils.toAmqpAnnotatedMessage(response));
            return;
        }
        AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(response);
        if (statusCode == AmqpResponseCode.NO_CONTENT) {
            sink.next((Object)MessageUtils.toAmqpAnnotatedMessage(response));
            return;
        }
        String errorCondition = RequestResponseUtils.getErrorCondition(response);
        if (statusCode == AmqpResponseCode.NOT_FOUND) {
            AmqpErrorCondition amqpErrorCondition = AmqpErrorCondition.fromString(errorCondition);
            if (amqpErrorCondition == AmqpErrorCondition.MESSAGE_NOT_FOUND) {
                this.logger.info("There was no matching message found.");
                sink.next((Object)MessageUtils.toAmqpAnnotatedMessage(response));
            } else if (amqpErrorCondition == AmqpErrorCondition.SESSION_NOT_FOUND) {
                this.logger.info("There was no matching session found.");
                sink.next((Object)MessageUtils.toAmqpAnnotatedMessage(response));
            }
            return;
        }
        String statusDescription = RequestResponseUtils.getStatusDescription(response);
        LoggingEventBuilder log = this.logger.atWarning().addKeyValue("status", (Object)statusCode);
        AmqpLoggingUtils.addKeyValueIfNotNull(log, "errorDescription", statusDescription);
        AmqpLoggingUtils.addKeyValueIfNotNull(log, "errorCondition", errorCondition);
        log.log("Operation not successful.");
        Exception throwable = ExceptionUtil.toException(errorCondition, statusDescription, errorContext);
        sink.error((Throwable)throwable);
    }

    private <T> Mono<T> errorIfEmpty(RequestResponseChannel channel, DeliveryState deliveryState) {
        return Mono.error(() -> {
            String error = String.format("entityPath[%s] deliveryState[%s] No response received from management channel.", new Object[]{this.entityPath, deliveryState});
            AmqpException exception = new AmqpException(true, error, channel.getErrorContext());
            return this.logger.atError().addKeyValue("deliveryState", (Object)deliveryState).log((RuntimeException)((Object)exception));
        });
    }

    private Mono<Void> isAuthorized() {
        return this.tokenManager.getAuthorizationResults().next().switchIfEmpty(Mono.error(() -> new AmqpException(false, "Did not get response from tokenManager: " + this.entityPath, this.getErrorContext()))).handle((response, sink) -> {
            if (RequestResponseUtils.isSuccessful(response)) {
                sink.complete();
            } else {
                String message = String.format("User does not have authorization to perform operation on entity [%s]. Response: [%s]", this.entityPath, response);
                sink.error((Throwable)ExceptionUtil.amqpResponseCodeToException(response.getValue(), message, this.getErrorContext()));
            }
        });
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.fullyQualifiedNamespace, this.entityPath);
    }
}

