/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpExceptionHandler;
import com.azure.core.amqp.implementation.ClaimsBasedSecurityChannel;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorExecutor;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

public class ReactorConnection
implements AmqpConnection {
    private static final String CBS_SESSION_NAME = "cbs-session";
    private static final String CBS_ADDRESS = "$cbs";
    private static final String CBS_LINK_NAME = "cbs";
    private final ClientLogger logger = new ClientLogger(ReactorConnection.class);
    private final ConcurrentMap<String, AmqpSession> sessionMap = new ConcurrentHashMap<String, AmqpSession>();
    private final AtomicBoolean hasConnection = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final DirectProcessor<AmqpShutdownSignal> shutdownSignals = DirectProcessor.create();
    private final ReplayProcessor<AmqpEndpointState> endpointStates = ReplayProcessor.cacheLastOrDefault((Object)((Object)AmqpEndpointState.UNINITIALIZED));
    private FluxSink<AmqpEndpointState> endpointStatesSink = this.endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
    private final String connectionId;
    private final Mono<Connection> connectionMono;
    private final ConnectionHandler handler;
    private final ReactorHandlerProvider handlerProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final ConnectionOptions connectionOptions;
    private final ReactorProvider reactorProvider;
    private final Disposable.Composite subscriptions;
    private final AmqpRetryPolicy retryPolicy;
    private ReactorExecutor executor;
    private ReactorExceptionHandler reactorExceptionHandler;
    private volatile ClaimsBasedSecurityChannel cbsChannel;
    private volatile Connection connection;

    public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer) {
        this.connectionOptions = connectionOptions;
        this.reactorProvider = reactorProvider;
        this.connectionId = connectionId;
        this.handlerProvider = handlerProvider;
        this.tokenManagerProvider = Objects.requireNonNull(tokenManagerProvider, "'tokenManagerProvider' cannot be null.");
        this.messageSerializer = messageSerializer;
        this.handler = handlerProvider.createConnectionHandler(connectionId, connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getTransportType(), connectionOptions.getProxyOptions());
        this.retryPolicy = RetryUtil.getRetryPolicy(connectionOptions.getRetry());
        this.connectionMono = Mono.fromCallable(this::getOrCreateConnection).doOnSubscribe(c -> this.hasConnection.set(true));
        this.subscriptions = Disposables.composite((Disposable[])new Disposable[]{this.handler.getEndpointStates().subscribe(state -> {
            this.logger.verbose("Connection state: {}", new Object[]{state});
            this.endpointStatesSink.next((Object)AmqpEndpointStateUtil.getConnectionState(state));
        }, error -> {
            this.logger.error("Error occurred in connection.", new Object[]{error});
            this.endpointStatesSink.error(error);
        }, () -> {
            this.endpointStatesSink.next((Object)AmqpEndpointState.CLOSED);
            this.endpointStatesSink.complete();
        }), this.handler.getErrors().subscribe(error -> {
            this.logger.error("Error occurred in connection.", new Object[]{error});
            this.endpointStatesSink.error(error);
        })});
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override
    public Flux<AmqpShutdownSignal> getShutdownSignals() {
        return this.shutdownSignals;
    }

    @Override
    public Mono<ClaimsBasedSecurityNode> getClaimsBasedSecurityNode() {
        Mono cbsNodeMono = RetryUtil.withRetry(this.getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE), this.connectionOptions.getRetry().getTryTimeout(), this.retryPolicy).then(Mono.fromCallable(this::getOrCreateCBSNode));
        return this.hasConnection.get() ? cbsNodeMono : this.connectionMono.then(cbsNodeMono);
    }

    @Override
    public String getId() {
        return this.connectionId;
    }

    @Override
    public String getFullyQualifiedNamespace() {
        return this.handler.getHostname();
    }

    @Override
    public int getMaxFrameSize() {
        return this.handler.getMaxFrameSize();
    }

    @Override
    public Map<String, Object> getConnectionProperties() {
        return this.handler.getConnectionProperties();
    }

    @Override
    public Mono<AmqpSession> createSession(String sessionName) {
        AmqpSession existingSession = (AmqpSession)this.sessionMap.get(sessionName);
        if (existingSession != null) {
            return Mono.just((Object)existingSession);
        }
        return this.connectionMono.map(connection -> this.sessionMap.computeIfAbsent(sessionName, key -> {
            SessionHandler handler = this.handlerProvider.createSessionHandler(this.connectionId, this.getFullyQualifiedNamespace(), sessionName, this.connectionOptions.getRetry().getTryTimeout());
            Session session = connection.session();
            BaseHandler.setHandler((Extendable)session, (Handler)handler);
            return this.createSession(sessionName, session, handler);
        }));
    }

    protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) {
        return new ReactorSession(session, handler, sessionName, this.reactorProvider, this.handlerProvider, this.getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.messageSerializer, this.connectionOptions.getRetry().getTryTimeout());
    }

    @Override
    public boolean removeSession(String sessionName) {
        return sessionName != null && this.sessionMap.remove(sessionName) != null;
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        if (this.executor != null) {
            this.executor.close();
        }
        this.subscriptions.dispose();
        this.endpointStatesSink.complete();
        HashMap<String, AmqpSession> map = new HashMap<String, AmqpSession>(this.sessionMap);
        this.sessionMap.clear();
        map.forEach((name, session) -> session.close());
    }

    protected Mono<Connection> getReactorConnection() {
        return this.connectionMono;
    }

    protected Mono<RequestResponseChannel> createRequestResponseChannel(String sessionName, String linkName, String entityPath) {
        return this.createSession(sessionName).cast(ReactorSession.class).map(reactorSession -> new RequestResponseChannel(this.getId(), this.getFullyQualifiedNamespace(), linkName, entityPath, reactorSession.session(), this.connectionOptions.getRetry(), this.handlerProvider, this.reactorProvider, this.messageSerializer));
    }

    private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
        if (this.cbsChannel == null) {
            this.logger.info("Setting CBS channel.", new Object[0]);
            this.cbsChannel = new ClaimsBasedSecurityChannel(this.createRequestResponseChannel(CBS_SESSION_NAME, CBS_LINK_NAME, CBS_ADDRESS), this.connectionOptions.getTokenCredential(), this.connectionOptions.getAuthorizationType(), this.connectionOptions.getRetry());
        }
        return this.cbsChannel;
    }

    private synchronized Connection getOrCreateConnection() throws IOException {
        if (this.connection == null) {
            this.logger.info("Creating and starting connection to {}:{}", new Object[]{this.handler.getHostname(), this.handler.getProtocolPort()});
            Reactor reactor = this.reactorProvider.createReactor(this.connectionId, this.handler.getMaxFrameSize());
            this.connection = reactor.connectionToHost(this.handler.getHostname(), this.handler.getProtocolPort(), (Handler)this.handler);
            this.reactorExceptionHandler = new ReactorExceptionHandler();
            this.executor = new ReactorExecutor(reactor, this.connectionOptions.getScheduler(), this.connectionId, this.reactorExceptionHandler, this.connectionOptions.getRetry().getTryTimeout(), this.connectionOptions.getFullyQualifiedNamespace());
            this.executor.start();
        }
        return this.connection;
    }

    private static final class ReactorExceptionHandler
    extends AmqpExceptionHandler {
        private ReactorExceptionHandler() {
        }

        @Override
        public void onConnectionError(Throwable exception) {
            super.onConnectionError(exception);
        }
    }
}

