/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ActiveClientTokenManager
implements TokenManager {
    private static final ClientLogger LOGGER = new ClientLogger(ActiveClientTokenManager.class);
    private final AtomicBoolean hasScheduled = new AtomicBoolean();
    private final AtomicBoolean hasDisposed = new AtomicBoolean();
    private final Mono<ClaimsBasedSecurityNode> cbsNode;
    private final String tokenAudience;
    private final String scopes;
    private final Sinks.Many<AmqpResponseCode> authorizationResults = Sinks.many().replay().latest();
    private final Sinks.Many<Duration> durationSource = Sinks.many().multicast().onBackpressureBuffer();
    private final AtomicReference<Duration> lastRefreshInterval = new AtomicReference<Duration>(Duration.ofMinutes(1L));
    private volatile Disposable subscription;

    public ActiveClientTokenManager(Mono<ClaimsBasedSecurityNode> cbsNode, String tokenAudience, String scopes) {
        this.cbsNode = cbsNode;
        this.tokenAudience = tokenAudience;
        this.scopes = scopes;
    }

    @Override
    public Flux<AmqpResponseCode> getAuthorizationResults() {
        return this.authorizationResults.asFlux();
    }

    @Override
    public Mono<Long> authorize() {
        if (this.hasDisposed.get()) {
            return Mono.error((Throwable)new AzureException("Cannot authorize with CBS node when this token manager has been disposed of."));
        }
        return this.cbsNode.flatMap(cbsNode -> cbsNode.authorize(this.tokenAudience, this.scopes)).map(expiresOn -> {
            Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn);
            long refreshSeconds = (long)Math.floor((double)between.getSeconds() * 0.9);
            long refreshIntervalMS = refreshSeconds * 1000L;
            if (!this.hasScheduled.getAndSet(true)) {
                LOGGER.atInfo().addKeyValue("scopes", this.scopes).log("Scheduling refresh token task.");
                Duration firstInterval = Duration.ofMillis(refreshIntervalMS);
                this.lastRefreshInterval.set(firstInterval);
                this.authorizationResults.emitNext((Object)AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
                    AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not emit ACCEPTED.");
                    return false;
                });
                this.subscription = this.scheduleRefreshTokenTask(firstInterval);
            }
            return refreshIntervalMS;
        });
    }

    @Override
    public void close() {
        if (this.hasDisposed.getAndSet(true)) {
            return;
        }
        this.authorizationResults.emitComplete((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not close authorizationResults.");
            return false;
        });
        this.durationSource.emitComplete((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not close durationSource.");
            return false;
        });
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    private Disposable scheduleRefreshTokenTask(Duration initialRefresh) {
        this.durationSource.emitNext((Object)initialRefresh, (signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not emit initial refresh interval.");
            return false;
        });
        return Flux.switchOnNext((Publisher)this.durationSource.asFlux().map(Flux::interval)).takeUntil(duration -> this.hasDisposed.get()).flatMap(delay -> {
            LOGGER.atInfo().addKeyValue("scopes", this.scopes).log("Refreshing token.");
            return this.authorize();
        }).onErrorContinue(error -> error instanceof AmqpException && ((AmqpException)((Object)((Object)error))).isTransient(), (amqpException, interval) -> {
            Duration lastRefresh = this.lastRefreshInterval.get();
            LOGGER.atError().addKeyValue("scopes", this.scopes).addKeyValue("interval_ms", interval).log("Error is transient. Rescheduling authorization task.", new Object[]{amqpException});
            this.durationSource.emitNext((Object)lastRefresh, (signalType, emitResult) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).addKeyValue("lastRefresh", (Object)lastRefresh).log("Could not emit lastRefresh.");
                return false;
            });
        }).subscribe(interval -> {
            LOGGER.atVerbose().addKeyValue("scopes", this.scopes).addKeyValue("interval_ms", interval).log("Authorization successful. Refreshing token.");
            this.authorizationResults.emitNext((Object)AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not emit ACCEPTED after refresh.");
                return false;
            });
            Duration nextRefresh = Duration.ofMillis(interval);
            this.lastRefreshInterval.set(nextRefresh);
            this.durationSource.emitNext((Object)nextRefresh, (signalType, emitResult) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).addKeyValue("nextRefresh", (Object)nextRefresh).log("Could not emit nextRefresh.");
                return false;
            });
        }, error -> {
            LOGGER.atError().addKeyValue("scopes", this.scopes).addKeyValue("audience", this.tokenAudience).log("Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", new Object[]{error});
            if (!this.hasDisposed.getAndSet(true)) {
                this.hasScheduled.set(false);
                this.durationSource.emitComplete((signalType, emitResult) -> {
                    AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not close durationSource.");
                    return false;
                });
                this.authorizationResults.emitError(error, (signalType, emitResult) -> {
                    AmqpLoggingUtils.addSignalTypeAndResult(LOGGER.atVerbose(), signalType, emitResult).log("Could not emit authorization error.", new Object[]{error});
                    return false;
                });
            }
        });
    }
}

