/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class ActiveClientTokenManager
implements TokenManager {
    private final ClientLogger logger = new ClientLogger(ActiveClientTokenManager.class);
    private final AtomicBoolean hasScheduled = new AtomicBoolean();
    private final AtomicBoolean hasDisposed = new AtomicBoolean();
    private final Mono<ClaimsBasedSecurityNode> cbsNode;
    private final String tokenAudience;
    private final String scopes;
    private final Timer timer;
    private final Flux<AmqpResponseCode> authorizationResults;
    private FluxSink<AmqpResponseCode> sink;
    private AtomicLong lastRefreshInterval = new AtomicLong();

    public ActiveClientTokenManager(Mono<ClaimsBasedSecurityNode> cbsNode, String tokenAudience, String scopes) {
        this.timer = new Timer(tokenAudience + "-tokenManager");
        this.cbsNode = cbsNode;
        this.tokenAudience = tokenAudience;
        this.scopes = scopes;
        this.authorizationResults = Flux.create(sink -> {
            if (this.hasDisposed.get()) {
                sink.complete();
            } else {
                this.sink = sink;
            }
        });
        this.lastRefreshInterval.set(Duration.ofMinutes(1L).getSeconds() * 1000L);
    }

    @Override
    public Flux<AmqpResponseCode> getAuthorizationResults() {
        return this.authorizationResults;
    }

    @Override
    public Mono<Long> authorize() {
        if (this.hasDisposed.get()) {
            return Mono.error((Throwable)new AzureException("Cannot authorize with CBS node when this token manager has been disposed of."));
        }
        return this.cbsNode.flatMap(cbsNode -> cbsNode.authorize(this.tokenAudience, this.scopes)).map(expiresOn -> {
            Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn);
            long refreshSeconds = (long)Math.floor((double)between.getSeconds() * 0.9);
            long refreshIntervalMS = refreshSeconds * 1000L;
            this.lastRefreshInterval.set(refreshIntervalMS);
            if (!this.hasScheduled.getAndSet(true)) {
                this.logger.info("Scheduling refresh token task.", new Object[0]);
                this.scheduleRefreshTokenTask(refreshIntervalMS);
            }
            return refreshIntervalMS;
        });
    }

    @Override
    public void close() {
        if (!this.hasDisposed.getAndSet(true)) {
            if (this.sink != null) {
                this.sink.complete();
            }
            this.timer.cancel();
        }
    }

    private void scheduleRefreshTokenTask(Long refreshIntervalInMS) {
        try {
            this.timer.schedule((TimerTask)new RefreshAuthorizationToken(), refreshIntervalInMS);
        }
        catch (IllegalStateException e) {
            this.logger.warning("Unable to schedule RefreshAuthorizationToken task.", new Object[]{e});
            this.hasScheduled.set(false);
        }
    }

    private class RefreshAuthorizationToken
    extends TimerTask {
        private RefreshAuthorizationToken() {
        }

        @Override
        public void run() {
            ActiveClientTokenManager.this.logger.info("Refreshing authorization token.", new Object[0]);
            ActiveClientTokenManager.this.authorize().subscribe(refreshIntervalInMS -> {
                if (ActiveClientTokenManager.this.hasDisposed.get()) {
                    ActiveClientTokenManager.this.logger.info("Token manager has been disposed of. Not rescheduling.", new Object[0]);
                    return;
                }
                ActiveClientTokenManager.this.logger.info("Authorization successful. Refreshing token in {} ms.", new Object[]{refreshIntervalInMS});
                ActiveClientTokenManager.this.sink.next((Object)AmqpResponseCode.ACCEPTED);
                ActiveClientTokenManager.this.scheduleRefreshTokenTask(refreshIntervalInMS);
            }, error -> {
                if (error instanceof AmqpException && ((AmqpException)((Object)((Object)error))).isTransient()) {
                    ActiveClientTokenManager.this.logger.error("Error is transient. Rescheduling authorization task.", new Object[]{error});
                    ActiveClientTokenManager.this.scheduleRefreshTokenTask(ActiveClientTokenManager.this.lastRefreshInterval.get());
                } else {
                    ActiveClientTokenManager.this.logger.error("Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", new Object[]{error});
                    ActiveClientTokenManager.this.hasScheduled.set(false);
                }
                ActiveClientTokenManager.this.sink.error(error);
            });
        }
    }
}

