/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ActiveClientTokenManager
implements TokenManager {
    private final ClientLogger logger = new ClientLogger(ActiveClientTokenManager.class);
    private final AtomicBoolean hasScheduled = new AtomicBoolean();
    private final AtomicBoolean hasDisposed = new AtomicBoolean();
    private final Mono<ClaimsBasedSecurityNode> cbsNode;
    private final String tokenAudience;
    private final String scopes;
    private final Sinks.Many<AmqpResponseCode> authorizationResults = Sinks.many().replay().latest();
    private final Sinks.Many<Duration> durationSource = Sinks.many().multicast().onBackpressureBuffer();
    private final AtomicReference<Duration> lastRefreshInterval = new AtomicReference<Duration>(Duration.ofMinutes(1L));
    private volatile Disposable subscription;

    public ActiveClientTokenManager(Mono<ClaimsBasedSecurityNode> cbsNode, String tokenAudience, String scopes) {
        this.cbsNode = cbsNode;
        this.tokenAudience = tokenAudience;
        this.scopes = scopes;
    }

    @Override
    public Flux<AmqpResponseCode> getAuthorizationResults() {
        return this.authorizationResults.asFlux();
    }

    @Override
    public Mono<Long> authorize() {
        if (this.hasDisposed.get()) {
            return Mono.error((Throwable)new AzureException("Cannot authorize with CBS node when this token manager has been disposed of."));
        }
        return this.cbsNode.flatMap(cbsNode -> cbsNode.authorize(this.tokenAudience, this.scopes)).map(expiresOn -> {
            Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn);
            long refreshSeconds = (long)Math.floor((double)between.getSeconds() * 0.9);
            long refreshIntervalMS = refreshSeconds * 1000L;
            if (!this.hasScheduled.getAndSet(true)) {
                this.logger.info("Scheduling refresh token task. scopes[{}]", new Object[]{this.scopes});
                Duration firstInterval = Duration.ofMillis(refreshIntervalMS);
                this.lastRefreshInterval.set(firstInterval);
                this.authorizationResults.emitNext((Object)AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
                    this.logger.verbose("signalType[{}] result[{}] Could not emit ACCEPTED.", new Object[]{signalType, emitResult});
                    return false;
                });
                this.subscription = this.scheduleRefreshTokenTask(firstInterval);
            }
            return refreshIntervalMS;
        });
    }

    @Override
    public void close() {
        if (this.hasDisposed.getAndSet(true)) {
            return;
        }
        this.authorizationResults.emitComplete((signalType, emitResult) -> {
            this.logger.verbose("signalType[{}] result[{}] Could not close authorizationResults.", new Object[]{signalType, emitResult});
            return false;
        });
        this.durationSource.emitComplete((signalType, emitResult) -> {
            this.logger.verbose("signalType[{}] result[{}] Could not close durationSource.", new Object[]{signalType, emitResult});
            return false;
        });
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    private Disposable scheduleRefreshTokenTask(Duration initialRefresh) {
        this.durationSource.emitNext((Object)initialRefresh, (signalType, emitResult) -> {
            this.logger.verbose("signalType[{}] result[{}] Could not emit initial refresh interval.", new Object[]{signalType, emitResult});
            return false;
        });
        return Flux.switchOnNext((Publisher)this.durationSource.asFlux().map(Flux::interval)).takeUntil(duration -> this.hasDisposed.get()).flatMap(delay -> {
            this.logger.info("Refreshing token. scopes[{}] ", new Object[]{this.scopes});
            return this.authorize();
        }).onErrorContinue(error -> error instanceof AmqpException && ((AmqpException)((Object)((Object)error))).isTransient(), (amqpException, interval) -> {
            Duration lastRefresh = this.lastRefreshInterval.get();
            this.logger.error("Error is transient. Rescheduling authorization task at interval {} ms. scopes[{}]", new Object[]{lastRefresh.toMillis(), this.scopes, amqpException});
            this.durationSource.emitNext((Object)lastRefresh, (signalType, emitResult) -> {
                this.logger.verbose("signalType[{}] result[{}] Could not emit lastRefresh[{}].", new Object[]{signalType, emitResult, lastRefresh});
                return false;
            });
        }).subscribe(interval -> {
            this.logger.verbose("Authorization successful. Refreshing token in {} ms. scopes[{}]", new Object[]{interval, this.scopes});
            this.authorizationResults.emitNext((Object)AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
                this.logger.verbose("signalType[{}] result[{}] Could not emit ACCEPTED after refresh.", new Object[]{signalType, emitResult});
                return false;
            });
            Duration nextRefresh = Duration.ofMillis(interval);
            this.lastRefreshInterval.set(nextRefresh);
            this.durationSource.emitNext((Object)nextRefresh, (signalType, emitResult) -> {
                this.logger.verbose("signalType[{}] result[{}] Could not emit nextRefresh[{}].", new Object[]{signalType, emitResult, nextRefresh});
                return false;
            });
        }, error -> {
            this.logger.error("Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again. audience[{}] scopes[{}]", new Object[]{this.tokenAudience, this.scopes, error});
            if (!this.hasDisposed.getAndSet(true)) {
                this.hasScheduled.set(false);
                this.durationSource.emitComplete((signalType, emitResult) -> {
                    this.logger.verbose("signalType[{}] result[{}] Could not close durationSource.", new Object[]{signalType, emitResult});
                    return false;
                });
                this.authorizationResults.emitError(error, (signalType, emitResult) -> {
                    this.logger.verbose("signalType[{}] result[{}] Could not emit authorization error.", new Object[]{signalType, emitResult, error});
                    return false;
                });
            }
        });
    }
}

