/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionProcessorHelper;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneHandler;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneMergeHandler;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneSplitHandler;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionSynchronizerImpl
implements PartitionSynchronizer {
    private static final Comparator<Range<String>> MIN_RANGE_COMPARATOR = new Range.MinComparator<String>();
    private static final Comparator<Range<String>> MAX_RANGE_COMPARATOR = new Range.MaxComparator<String>();
    private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
    private final ChangeFeedContextClient documentClient;
    private final String collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;
    private final ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private final ChangeFeedMode changeFeedMode;

    public PartitionSynchronizerImpl(ChangeFeedContextClient documentClient, String collectionSelfLink, LeaseContainer leaseContainer, LeaseManager leaseManager, int degreeOfParallelism, int maxBatchSize, ChangeFeedProcessorOptions changeFeedProcessorOptions, ChangeFeedMode changeFeedMode) {
        this.documentClient = documentClient;
        this.collectionSelfLink = collectionSelfLink;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = degreeOfParallelism;
        this.maxBatchSize = maxBatchSize;
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        this.changeFeedMode = changeFeedMode;
    }

    @Override
    public Mono<Void> createMissingLeases() {
        return this.documentClient.getOverlappingRanges(PartitionKeyInternalHelper.FullRange).flatMap(pkRangeList -> this.createLeases((List<PartitionKeyRange>)pkRangeList).then()).onErrorResume(throwable -> {
            this.logger.error("Create lease failed", throwable);
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> createMissingLeases(List<Lease> pkRangeIdVersionLeases) {
        return this.documentClient.getOverlappingRanges(PartitionKeyInternalHelper.FullRange).flatMap(pkRangeList -> this.createLeases((List<PartitionKeyRange>)pkRangeList, pkRangeIdVersionLeases).then()).doOnError(throwable -> this.logger.error("Create missing leases from pkRangeIdVersion leases failed", throwable));
    }

    @Override
    public Mono<FeedRangeGoneHandler> getFeedRangeGoneHandler(Lease lease) {
        Preconditions.checkNotNull(lease, "Argument 'lease' can not be null");
        String leaseToken = lease.getLeaseToken();
        String lastContinuationToken = lease.getContinuationToken();
        this.logger.info("Lease with token {} is gone due to split or merge; will attempt to resume using continuation token {}.", (Object)leaseToken, (Object)lastContinuationToken);
        return this.documentClient.getOverlappingRanges(((FeedRangeEpkImpl)lease.getFeedRange()).getRange()).flatMap(pkRangeList -> {
            if (pkRangeList.size() == 0) {
                this.logger.error("Lease with token {} is gone but we failed to find at least one child range", (Object)leaseToken);
                return Mono.error((Throwable)new RuntimeException(String.format("Lease %s is gone but we failed to find at least one child partition", leaseToken)));
            }
            if (pkRangeList.size() > 1) {
                return Mono.just((Object)new FeedRangeGoneSplitHandler(lease, (List<PartitionKeyRange>)pkRangeList, this.leaseManager, this.changeFeedProcessorOptions.getMaxScaleCount()));
            }
            return Mono.just((Object)new FeedRangeGoneMergeHandler(lease, (PartitionKeyRange)pkRangeList.get(0)));
        });
    }

    private Flux<Lease> createLeases(List<PartitionKeyRange> partitionKeyRanges) {
        return this.leaseContainer.getAllLeases().collectList().flatMapMany(leaseList -> Flux.fromIterable((Iterable)partitionKeyRanges).flatMap(pkRange -> {
            boolean anyMatch = leaseList.stream().anyMatch(lease -> {
                Range<String> epkRange = ((FeedRangeEpkImpl)lease.getFeedRange()).getRange();
                return epkRange.getMin().equals(pkRange.getMinInclusive()) || epkRange.getMax().equals(pkRange.getMaxExclusive());
            });
            if (!anyMatch) {
                return Mono.just((Object)pkRange);
            }
            return Mono.empty();
        }).flatMap(pkRange -> {
            FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(pkRange.toRange());
            return this.leaseManager.createLeaseIfNotExist(feedRangeEpk, null);
        }, this.degreeOfParallelism));
    }

    private Flux<Lease> createLeases(List<PartitionKeyRange> partitionKeyRanges, List<Lease> pkRangeIdVersionLeases) {
        return this.leaseContainer.getAllLeases().collectList().flatMapMany(existingEpkVersionLeases -> {
            Map<String, Lease> pkRangeIdVersionLeaseMap = pkRangeIdVersionLeases.stream().collect(Collectors.toMap(lease -> lease.getLeaseToken(), lease -> lease));
            return Flux.fromIterable((Iterable)partitionKeyRanges).flatMap(partitionKeyRange -> {
                boolean anyEpkVersionLeaseMatch = existingEpkVersionLeases.stream().anyMatch(lease -> {
                    Range<String> epkRange = ((FeedRangeEpkImpl)lease.getFeedRange()).getRange();
                    return MIN_RANGE_COMPARATOR.compare(epkRange, partitionKeyRange.toRange()) <= 0 && MAX_RANGE_COMPARATOR.compare(epkRange, partitionKeyRange.toRange()) >= 0;
                });
                if (anyEpkVersionLeaseMatch) {
                    return Mono.empty();
                }
                ArrayList<String> possibleMatchingPkRangeId = new ArrayList<String>();
                possibleMatchingPkRangeId.add(partitionKeyRange.getId());
                if (partitionKeyRange.getParents() != null) {
                    possibleMatchingPkRangeId.addAll(partitionKeyRange.getParents());
                }
                ArrayList<Lease> matchedPkRangeIdVersionLeases = new ArrayList<Lease>();
                for (String pkRangeId : possibleMatchingPkRangeId) {
                    if (!pkRangeIdVersionLeaseMap.containsKey(pkRangeId)) continue;
                    matchedPkRangeIdVersionLeases.add((Lease)pkRangeIdVersionLeaseMap.get(pkRangeId));
                }
                if (matchedPkRangeIdVersionLeases.size() == 0) {
                    return Mono.error((Throwable)new IllegalStateException("Can not find pkRangeId version lease"));
                }
                if (matchedPkRangeIdVersionLeases.size() == 1) {
                    return Mono.just(Pair.of(partitionKeyRange, (Lease)matchedPkRangeIdVersionLeases.get(0)));
                }
                return Mono.error((Throwable)new IllegalStateException("A merge has happened, creating epk version lease from pkRangeId version lease is not supported"));
            }).flatMap(pair -> {
                FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(((PartitionKeyRange)pair.getLeft()).toRange());
                String leaseContinuationToken = this.getLeaseContinuationToken(feedRangeEpk, ((Lease)pair.getRight()).getContinuationToken());
                return this.leaseManager.createLeaseIfNotExist(feedRangeEpk, leaseContinuationToken);
            }, this.degreeOfParallelism);
        });
    }

    private String getLeaseContinuationToken(FeedRangeEpkImpl feedRangeEpk, String etag) {
        FeedRangeContinuation feedRangeContinuation = FeedRangeContinuation.create(this.collectionSelfLink, (FeedRangeInternal)feedRangeEpk, feedRangeEpk.getRange());
        feedRangeContinuation.replaceContinuation(etag);
        ChangeFeedStateV1 changeFeedState = new ChangeFeedStateV1(this.collectionSelfLink, feedRangeEpk, this.changeFeedMode, PartitionProcessorHelper.getStartFromSettings(feedRangeEpk, this.changeFeedProcessorOptions, this.changeFeedMode), feedRangeContinuation);
        return changeFeedState.toString();
    }
}

