/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.fullfidelity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.PartitionSynchronizer;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionSynchronizerImpl
implements PartitionSynchronizer {
    private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
    private final ChangeFeedContextClient documentClient;
    private final CosmosAsyncContainer collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;
    private final String collectionResourceId;

    public PartitionSynchronizerImpl(ChangeFeedContextClient documentClient, CosmosAsyncContainer collectionSelfLink, LeaseContainer leaseContainer, LeaseManager leaseManager, int degreeOfParallelism, int maxBatchSize, String collectionResourceId) {
        this.documentClient = documentClient;
        this.collectionSelfLink = collectionSelfLink;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = degreeOfParallelism;
        this.maxBatchSize = maxBatchSize;
        this.collectionResourceId = collectionResourceId;
    }

    @Override
    public Mono<Void> createMissingLeases() {
        return this.enumPartitionKeyRanges().map(Resource::getId).collectList().flatMap(partitionKeyRangeIds -> {
            HashSet<String> leaseTokens = new HashSet<String>((Collection<String>)partitionKeyRangeIds);
            return this.createLeases(leaseTokens).then();
        }).onErrorResume(throwable -> Mono.empty());
    }

    @Override
    public Flux<Lease> splitPartition(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String leaseToken = lease.getLeaseToken();
        String lastContinuationToken = lease.getContinuationToken();
        this.logger.info("Partition {} is gone due to split; will attempt to resume using continuation token {}.", (Object)leaseToken, (Object)lastContinuationToken);
        return this.enumPartitionKeyRanges().filter(range -> range != null && range.getParents() != null && range.getParents().contains(leaseToken)).map(Resource::getId).collectList().flatMapMany(addedLeaseTokens -> {
            if (addedLeaseTokens.size() == 0) {
                this.logger.error("Partition {} had split but we failed to find at least one child partition", (Object)leaseToken);
                throw new RuntimeException(String.format("Partition %s had split but we failed to find at least one child partition", leaseToken));
            }
            return Flux.fromIterable((Iterable)addedLeaseTokens);
        }).flatMap(addedRangeId -> this.leaseManager.createLeaseIfNotExist((String)addedRangeId, lastContinuationToken), this.degreeOfParallelism).map(newLease -> {
            this.logger.info("Partition {} split into new partition with lease token {} and continuation token {}.", new Object[]{leaseToken, newLease.getLeaseToken(), lastContinuationToken});
            return newLease;
        });
    }

    private Flux<PartitionKeyRange> enumPartitionKeyRanges() {
        String partitionKeyRangesPath = BridgeInternal.extractContainerSelfLink(this.collectionSelfLink);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount(cosmosQueryRequestOptions, null, this.maxBatchSize);
        return this.documentClient.readPartitionKeyRangeFeed(partitionKeyRangesPath, cosmosQueryRequestOptions).map(FeedResponse::getResults).flatMap(Flux::fromIterable).onErrorResume(throwable -> Flux.empty());
    }

    private Flux<Lease> createLeases(Set<String> leaseTokens) {
        HashSet<String> addedLeaseTokens = new HashSet<String>(leaseTokens);
        return this.leaseContainer.getAllLeases().map(lease -> {
            if (lease != null) {
                addedLeaseTokens.remove(lease.getLeaseToken());
            }
            return lease;
        }).thenMany((Publisher)Flux.fromIterable(addedLeaseTokens).flatMap(addedRangeId -> this.leaseManager.createLeaseIfNotExist((String)addedRangeId, null), this.degreeOfParallelism).map(lease -> lease));
    }
}

