/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

public class ActiveClientTokenManager
implements TokenManager {
    private final ClientLogger logger = new ClientLogger(ActiveClientTokenManager.class);
    private final AtomicBoolean hasScheduled = new AtomicBoolean();
    private final AtomicBoolean hasDisposed = new AtomicBoolean();
    private final Mono<ClaimsBasedSecurityNode> cbsNode;
    private final String tokenAudience;
    private final String scopes;
    private final ReplayProcessor<AmqpResponseCode> authorizationResults = ReplayProcessor.create((int)1);
    private final FluxSink<AmqpResponseCode> authorizationResultsSink = this.authorizationResults.sink(FluxSink.OverflowStrategy.BUFFER);
    private final EmitterProcessor<Duration> durationSource = EmitterProcessor.create();
    private final FluxSink<Duration> durationSourceSink = this.durationSource.sink();
    private final AtomicReference<Duration> lastRefreshInterval = new AtomicReference<Duration>(Duration.ofMinutes(1L));
    private volatile Disposable subscription;

    public ActiveClientTokenManager(Mono<ClaimsBasedSecurityNode> cbsNode, String tokenAudience, String scopes) {
        this.cbsNode = cbsNode;
        this.tokenAudience = tokenAudience;
        this.scopes = scopes;
    }

    @Override
    public Flux<AmqpResponseCode> getAuthorizationResults() {
        return this.authorizationResults;
    }

    @Override
    public Mono<Long> authorize() {
        if (this.hasDisposed.get()) {
            return Mono.error((Throwable)new AzureException("Cannot authorize with CBS node when this token manager has been disposed of."));
        }
        return this.cbsNode.flatMap(cbsNode -> cbsNode.authorize(this.tokenAudience, this.scopes)).map(expiresOn -> {
            Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn);
            long refreshSeconds = (long)Math.floor((double)between.getSeconds() * 0.9);
            long refreshIntervalMS = refreshSeconds * 1000L;
            if (!this.hasScheduled.getAndSet(true)) {
                this.logger.info("Scheduling refresh token task. scopes[{}]", new Object[]{this.scopes});
                Duration firstInterval = Duration.ofMillis(refreshIntervalMS);
                this.lastRefreshInterval.set(firstInterval);
                this.authorizationResultsSink.next((Object)AmqpResponseCode.ACCEPTED);
                this.subscription = this.scheduleRefreshTokenTask(firstInterval);
            }
            return refreshIntervalMS;
        });
    }

    @Override
    public void close() {
        if (this.hasDisposed.getAndSet(true)) {
            return;
        }
        this.authorizationResultsSink.complete();
        this.durationSourceSink.complete();
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    private Disposable scheduleRefreshTokenTask(Duration initialRefresh) {
        this.durationSourceSink.next((Object)initialRefresh);
        return Flux.switchOnNext((Publisher)this.durationSource.map(Flux::interval)).flatMap(delay -> {
            this.logger.info("Refreshing token. scopes[{}] ", new Object[]{this.scopes});
            return this.authorize();
        }).onErrorContinue(error -> error instanceof AmqpException && ((AmqpException)((Object)((Object)error))).isTransient(), (amqpException, interval) -> {
            Duration lastRefresh = this.lastRefreshInterval.get();
            this.logger.error("Error is transient. Rescheduling authorization task at interval {} ms. scopes[{}]", new Object[]{lastRefresh.toMillis(), this.scopes, amqpException});
            this.durationSourceSink.next((Object)this.lastRefreshInterval.get());
        }).subscribe(interval -> {
            this.logger.info("Authorization successful. Refreshing token in {} ms. scopes[{}]", new Object[]{interval, this.scopes});
            this.authorizationResultsSink.next((Object)AmqpResponseCode.ACCEPTED);
            Duration nextRefresh = Duration.ofMillis(interval);
            this.lastRefreshInterval.set(nextRefresh);
            this.durationSourceSink.next((Object)Duration.ofMillis(interval));
        }, error -> {
            this.logger.error("Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again. audience[{}] scopes[{}]", new Object[]{this.tokenAudience, this.scopes, error});
            this.hasScheduled.set(false);
            this.durationSourceSink.complete();
            this.authorizationResultsSink.error(error);
        }, () -> this.logger.verbose("Completed refresh token task."));
    }
}

