/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneHandler;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneMergeHandler;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneSplitHandler;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.Range;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionSynchronizerImpl
implements PartitionSynchronizer {
    private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
    private final ChangeFeedContextClient documentClient;
    private final CosmosAsyncContainer collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;

    public PartitionSynchronizerImpl(ChangeFeedContextClient documentClient, CosmosAsyncContainer collectionSelfLink, LeaseContainer leaseContainer, LeaseManager leaseManager, int degreeOfParallelism, int maxBatchSize) {
        this.documentClient = documentClient;
        this.collectionSelfLink = collectionSelfLink;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = degreeOfParallelism;
        this.maxBatchSize = maxBatchSize;
    }

    @Override
    public Mono<Void> createMissingLeases() {
        return this.documentClient.getOverlappingRanges(PartitionKeyInternalHelper.FullRange).flatMap(pkRangeList -> this.createLeases((List<PartitionKeyRange>)pkRangeList).then()).onErrorResume(throwable -> {
            this.logger.error("Create lease failed", throwable);
            return Mono.empty();
        });
    }

    @Override
    public Mono<FeedRangeGoneHandler> getFeedRangeGoneHandler(Lease lease) {
        Preconditions.checkNotNull(lease, "Argument 'lease' can not be null");
        String leaseToken = lease.getLeaseToken();
        String lastContinuationToken = lease.getContinuationToken();
        this.logger.info("Lease with token {} is gone due to split or merge; will attempt to resume using continuation token {}.", (Object)leaseToken, (Object)lastContinuationToken);
        return this.documentClient.getOverlappingRanges(((FeedRangeEpkImpl)lease.getFeedRange()).getRange()).flatMap(pkRangeList -> {
            if (pkRangeList.size() == 0) {
                this.logger.error("Lease with token {} is gone but we failed to find at least one child range", (Object)leaseToken);
                return Mono.error((Throwable)new RuntimeException(String.format("Lease %s is gone but we failed to find at least one child partition", leaseToken)));
            }
            if (pkRangeList.size() > 1) {
                return Mono.just((Object)new FeedRangeGoneSplitHandler(lease, (List<PartitionKeyRange>)pkRangeList, this.leaseManager));
            }
            return Mono.just((Object)new FeedRangeGoneMergeHandler(lease, (PartitionKeyRange)pkRangeList.get(0)));
        });
    }

    private Flux<Lease> createLeases(List<PartitionKeyRange> partitionKeyRanges) {
        return this.leaseContainer.getAllLeases().collectList().flatMapMany(leaseList -> Flux.fromIterable((Iterable)partitionKeyRanges).flatMap(pkRange -> {
            boolean anyMatch = leaseList.stream().anyMatch(lease -> {
                Range<String> epkRange = ((FeedRangeEpkImpl)lease.getFeedRange()).getRange();
                return epkRange.getMin().equals(pkRange.getMinInclusive()) || epkRange.getMax().equals(pkRange.getMaxExclusive());
            });
            if (!anyMatch) {
                return Mono.just((Object)pkRange);
            }
            return Mono.empty();
        }).flatMap(pkRange -> {
            FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(pkRange.toRange());
            return this.leaseManager.createLeaseIfNotExist(feedRangeEpk, null);
        }, this.degreeOfParallelism));
    }
}

