/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSender;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

public class ReactorSession
implements AmqpSession {
    private final ConcurrentMap<String, AmqpSendLink> openSendLinks = new ConcurrentHashMap<String, AmqpSendLink>();
    private final ConcurrentMap<String, AmqpReceiveLink> openReceiveLinks = new ConcurrentHashMap<String, AmqpReceiveLink>();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final ClientLogger logger = new ClientLogger(ReactorSession.class);
    private final ReplayProcessor<AmqpEndpointState> endpointStates = ReplayProcessor.cacheLastOrDefault((Object)((Object)AmqpEndpointState.UNINITIALIZED));
    private FluxSink<AmqpEndpointState> endpointStateSink = this.endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
    private final Session session;
    private final SessionHandler sessionHandler;
    private final String sessionName;
    private final ReactorProvider provider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final Duration openTimeout;
    private final Disposable.Composite subscriptions;
    private final ReactorHandlerProvider handlerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;

    public ReactorSession(Session session, SessionHandler sessionHandler, String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider, Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, Duration openTimeout) {
        this.session = session;
        this.sessionHandler = sessionHandler;
        this.handlerProvider = handlerProvider;
        this.sessionName = sessionName;
        this.provider = provider;
        this.cbsNodeSupplier = cbsNodeSupplier;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.openTimeout = openTimeout;
        this.subscriptions = Disposables.composite((Disposable[])new Disposable[]{this.sessionHandler.getEndpointStates().subscribe(state -> {
            this.logger.verbose("Connection state: {}", new Object[]{state});
            this.endpointStateSink.next((Object)AmqpEndpointStateUtil.getConnectionState(state));
        }, error -> {
            this.logger.error("Error occurred in connection.", new Object[]{error});
            this.endpointStateSink.error(error);
        }, () -> {
            this.endpointStateSink.next((Object)AmqpEndpointState.CLOSED);
            this.endpointStateSink.complete();
        }), this.sessionHandler.getErrors().subscribe(error -> {
            this.logger.error("Error occurred in connection.", new Object[]{error});
            this.endpointStateSink.error(error);
        })});
        session.open();
    }

    Session session() {
        return this.session;
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.openReceiveLinks.forEach((key, link) -> link.close());
        this.openReceiveLinks.clear();
        this.openSendLinks.forEach((key, link) -> link.close());
        this.openSendLinks.clear();
        this.subscriptions.dispose();
    }

    @Override
    public String getSessionName() {
        return this.sessionName;
    }

    @Override
    public Duration getOperationTimeout() {
        return this.openTimeout;
    }

    @Override
    public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, entityPath);
        return RetryUtil.withRetry(this.getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), timeout, retry).then(tokenManager.authorize().then(Mono.create(sink -> {
            AmqpSendLink existingSender = (AmqpSendLink)this.openSendLinks.get(linkName);
            if (existingSender != null) {
                sink.success((Object)existingSender);
                return;
            }
            Sender sender = this.session.sender(linkName);
            Target target = new Target();
            target.setAddress(entityPath);
            sender.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
            Source source = new Source();
            sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
            SendLinkHandler sendLinkHandler = this.handlerProvider.createSendLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), linkName, entityPath);
            BaseHandler.setHandler((Extendable)sender, (Handler)sendLinkHandler);
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    sender.open();
                    ReactorSender reactorSender = new ReactorSender(entityPath, sender, sendLinkHandler, this.provider, tokenManager, this.messageSerializer, timeout, retry, 262144);
                    this.openSendLinks.put(linkName, reactorSender);
                    sink.success((Object)reactorSender);
                });
            }
            catch (IOException e) {
                sink.error((Throwable)e);
            }
        })));
    }

    @Override
    public Mono<AmqpLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return this.createConsumer(linkName, entityPath, timeout, retry, null, null, null).cast(AmqpLink.class);
    }

    @Override
    public boolean removeLink(String linkName) {
        return this.openSendLinks.remove(linkName) != null || this.openReceiveLinks.remove(linkName) != null;
    }

    protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, Map<Symbol, UnknownDescribedType> sourceFilters, Map<Symbol, Object> receiverProperties, Symbol[] receiverDesiredCapabilities) {
        TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, entityPath);
        return RetryUtil.withRetry(this.getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), timeout, retry).then(tokenManager.authorize().then(Mono.create(sink -> {
            AmqpReceiveLink existingReceiver = (AmqpReceiveLink)this.openReceiveLinks.get(linkName);
            if (existingReceiver != null) {
                sink.success((Object)existingReceiver);
                return;
            }
            Receiver receiver = this.session.receiver(linkName);
            Source source = new Source();
            source.setAddress(entityPath);
            if (sourceFilters != null && sourceFilters.size() > 0) {
                source.setFilter(sourceFilters);
            }
            receiver.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
            Target target = new Target();
            receiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
            receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
            if (receiverProperties != null && !receiverProperties.isEmpty()) {
                receiver.setProperties(receiverProperties);
            }
            if (receiverDesiredCapabilities != null && receiverDesiredCapabilities.length > 0) {
                receiver.setDesiredCapabilities(receiverDesiredCapabilities);
            }
            ReceiveLinkHandler receiveLinkHandler = this.handlerProvider.createReceiveLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), linkName, entityPath);
            BaseHandler.setHandler((Extendable)receiver, (Handler)receiveLinkHandler);
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    receiver.open();
                    ReactorReceiver reactorReceiver = new ReactorReceiver(entityPath, receiver, receiveLinkHandler, tokenManager);
                    this.openReceiveLinks.put(linkName, reactorReceiver);
                    sink.success((Object)reactorReceiver);
                });
            }
            catch (IOException e) {
                sink.error((Throwable)e);
            }
        })));
    }
}

