/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.leases;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.retrieval.AWSExceptionManager;

@KinesisClientInternalApi
public class KinesisShardDetector
implements ShardDetector {
    private static final Logger log = LoggerFactory.getLogger(KinesisShardDetector.class);
    private final Object $lock = new Object[0];
    @NonNull
    private final KinesisAsyncClient kinesisClient;
    @NonNull
    private final StreamIdentifier streamIdentifier;
    private final long listShardsBackoffTimeInMillis;
    private final int maxListShardsRetryAttempts;
    private final long listShardsCacheAllowedAgeInSeconds;
    private final int maxCacheMissesBeforeReload;
    private final int cacheMissWarningModulus;
    private final Duration kinesisRequestTimeout;
    private volatile Map<String, Shard> cachedShardMap = null;
    private volatile Instant lastCacheUpdateTime;
    private AtomicInteger cacheMisses = new AtomicInteger(0);

    @Deprecated
    public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, int cacheMissWarningModulus) {
        this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
    }

    public KinesisShardDetector(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, int cacheMissWarningModulus, Duration kinesisRequestTimeout) {
        this.kinesisClient = kinesisClient;
        this.streamIdentifier = streamIdentifier;
        this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
        this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
        this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
        this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
        this.cacheMissWarningModulus = cacheMissWarningModulus;
        this.kinesisRequestTimeout = kinesisRequestTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Shard shard(@NonNull String shardId) {
        Shard shard;
        if (shardId == null) {
            throw new NullPointerException("shardId");
        }
        if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
            KinesisShardDetector kinesisShardDetector = this;
            synchronized (kinesisShardDetector) {
                if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
                    this.listShards();
                }
            }
        }
        if ((shard = this.cachedShardMap.get(shardId)) == null && (this.cacheMisses.incrementAndGet() > this.maxCacheMissesBeforeReload || this.shouldRefreshCache())) {
            KinesisShardDetector kinesisShardDetector = this;
            synchronized (kinesisShardDetector) {
                shard = this.cachedShardMap.get(shardId);
                if (shard == null) {
                    log.info("Too many shard map cache misses or cache is out of date -- forcing a refresh");
                    this.listShards();
                    shard = this.cachedShardMap.get(shardId);
                    if (shard == null) {
                        log.warn("Even after cache refresh shard '{}' wasn't found. This could indicate a bigger problem.", (Object)shardId);
                    }
                    this.cacheMisses.set(0);
                } else {
                    this.cacheMisses.set(0);
                }
            }
        }
        if (shard == null) {
            String message = String.format("Cannot find the shard given the shardId %s. Cache misses: %s", shardId, this.cacheMisses);
            if (this.cacheMisses.get() % this.cacheMissWarningModulus == 0) {
                log.warn(message);
            } else {
                log.debug(message);
            }
        }
        return shard;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Shard> listShards() {
        Object object = this.$lock;
        synchronized (object) {
            return this.listShardsWithFilter(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
        Object object = this.$lock;
        synchronized (object) {
            ListShardsResponse result;
            ArrayList<Shard> shards = new ArrayList<Shard>();
            String nextToken = null;
            do {
                if ((result = this.listShards(shardFilter, nextToken)) == null) {
                    return null;
                }
                shards.addAll(result.shards());
                nextToken = result.nextToken();
            } while (StringUtils.isNotEmpty((CharSequence)result.nextToken()));
            this.cachedShardMap(shards);
            return shards;
        }
    }

    private ListShardsResponse listShards(ShardFilter shardFilter, String nextToken) {
        AWSExceptionManager exceptionManager = new AWSExceptionManager();
        exceptionManager.add(ResourceNotFoundException.class, t -> t);
        exceptionManager.add(LimitExceededException.class, t -> t);
        exceptionManager.add(ResourceInUseException.class, t -> t);
        exceptionManager.add(KinesisException.class, t -> t);
        ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder().shardFilter(shardFilter);
        request = StringUtils.isEmpty((CharSequence)nextToken) ? request.streamName(this.streamIdentifier.streamName()) : request.nextToken(nextToken);
        ListShardsResponse result = null;
        LimitExceededException lastException = null;
        int remainingRetries = this.maxListShardsRetryAttempts;
        while (result == null) {
            try {
                try {
                    result = this.getListShardsResponse((ListShardsRequest)request.build());
                }
                catch (ExecutionException e) {
                    throw exceptionManager.apply(e.getCause());
                }
                catch (InterruptedException e) {
                    log.debug("Interrupted exception caught, shutdown initiated, returning null");
                    return null;
                }
            }
            catch (ResourceInUseException e) {
                log.info("Stream is not in Active/Updating status, returning null (wait until stream is in Active or Updating)");
                return null;
            }
            catch (LimitExceededException e) {
                log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", (Object)this.streamIdentifier, (Object)this.listShardsBackoffTimeInMillis);
                try {
                    Thread.sleep(this.listShardsBackoffTimeInMillis);
                }
                catch (InterruptedException ie) {
                    log.debug("Stream {} : Sleep  was interrupted ", (Object)this.streamIdentifier, (Object)ie);
                }
                lastException = e;
            }
            catch (ResourceNotFoundException e) {
                log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.", (Object)this.streamIdentifier.streamName());
                return (ListShardsResponse)ListShardsResponse.builder().shards(Collections.emptyList()).nextToken(null).build();
            }
            catch (TimeoutException te) {
                throw new RuntimeException(te);
            }
            if (--remainingRetries > 0 || result != null) continue;
            if (lastException != null) {
                throw lastException;
            }
            throw new IllegalStateException("Received null from ListShards call.");
        }
        return result;
    }

    void cachedShardMap(List<Shard> shards) {
        this.cachedShardMap = shards.stream().collect(Collectors.toMap(Shard::shardId, Function.identity()));
        this.lastCacheUpdateTime = Instant.now();
    }

    private boolean shouldRefreshCache() {
        Duration secondsSinceLastUpdate = Duration.between(this.lastCacheUpdateTime, Instant.now());
        String message = String.format("Shard map cache is %d seconds old", secondsSinceLastUpdate.getSeconds());
        if (secondsSinceLastUpdate.compareTo(Duration.of(this.listShardsCacheAllowedAgeInSeconds, ChronoUnit.SECONDS)) > 0) {
            log.info("{}. Age exceeds limit of {} seconds -- Refreshing.", (Object)message, (Object)this.listShardsCacheAllowedAgeInSeconds);
            return true;
        }
        log.debug("{}. Age doesn't exceed limit of {} seconds.", (Object)message, (Object)this.listShardsCacheAllowedAgeInSeconds);
        return false;
    }

    @Override
    public ListShardsResponse getListShardsResponse(ListShardsRequest request) throws ExecutionException, TimeoutException, InterruptedException {
        return (ListShardsResponse)FutureUtils.resolveOrCancelFuture(this.kinesisClient.listShards(request), this.kinesisRequestTimeout);
    }

    @Override
    public List<ChildShard> getChildShards(String shardId) throws InterruptedException, ExecutionException, TimeoutException {
        GetShardIteratorRequest getShardIteratorRequest = (GetShardIteratorRequest)KinesisRequestsBuilder.getShardIteratorRequestBuilder().streamName(this.streamIdentifier.streamName()).shardIteratorType(ShardIteratorType.LATEST).shardId(shardId).build();
        GetShardIteratorResponse getShardIteratorResponse = (GetShardIteratorResponse)FutureUtils.resolveOrCancelFuture(this.kinesisClient.getShardIterator(getShardIteratorRequest), this.kinesisRequestTimeout);
        GetRecordsRequest getRecordsRequest = (GetRecordsRequest)KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(getShardIteratorResponse.shardIterator()).build();
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse)FutureUtils.resolveOrCancelFuture(this.kinesisClient.getRecords(getRecordsRequest), this.kinesisRequestTimeout);
        return getRecordsResponse.childShards();
    }

    @Override
    @NonNull
    public StreamIdentifier streamIdentifier() {
        return this.streamIdentifier;
    }

    AtomicInteger cacheMisses() {
        return this.cacheMisses;
    }
}

