/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseConflictException;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import java.time.Instant;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

class DocumentServiceLeaseUpdaterImpl
implements ServiceItemLeaseUpdater {
    private final Logger logger = LoggerFactory.getLogger(DocumentServiceLeaseUpdaterImpl.class);
    private final int RETRY_COUNT_ON_CONFLICT = 5;
    private final ChangeFeedContextClient client;

    public DocumentServiceLeaseUpdaterImpl(ChangeFeedContextClient client) {
        if (client == null) {
            throw new IllegalArgumentException("client");
        }
        this.client = client;
    }

    @Override
    public Mono<Lease> updateLease(Lease cachedLease, String itemId, PartitionKey partitionKey, CosmosItemRequestOptions requestOptions, Function<Lease, Lease> updateLease) {
        Lease localLease = updateLease.apply(cachedLease);
        if (localLease == null) {
            return Mono.empty();
        }
        localLease.setTimestamp(Instant.now());
        cachedLease.setServiceItemLease(localLease);
        return Mono.just((Object)this).flatMap(value -> this.tryReplaceLease(cachedLease, itemId, partitionKey)).map(leaseDocument -> {
            cachedLease.setServiceItemLease(ServiceItemLease.fromDocument(leaseDocument));
            return cachedLease;
        }).hasElement().flatMap(hasItems -> {
            if (hasItems.booleanValue()) {
                return Mono.just((Object)cachedLease);
            }
            return this.client.readItem(itemId, partitionKey, requestOptions, InternalObjectNode.class).onErrorResume(throwable -> {
                CosmosException ex;
                if (throwable instanceof CosmosException && (ex = (CosmosException)((Object)((Object)((Object)throwable)))).getStatusCode() == 404) {
                    this.logger.info("Partition {} could not be found.", (Object)cachedLease.getLeaseToken());
                    throw new LeaseLostException(cachedLease);
                }
                return Mono.error((Throwable)throwable);
            }).map(cosmosItemResponse -> {
                InternalObjectNode document = BridgeInternal.getProperties(cosmosItemResponse);
                ServiceItemLease serverLease = ServiceItemLease.fromDocument(document);
                this.logger.info("Partition {} update failed because the lease with token '{}' was updated by owner '{}' with token '{}'.", new Object[]{cachedLease.getLeaseToken(), cachedLease.getConcurrencyToken(), serverLease.getOwner(), serverLease.getConcurrencyToken()});
                if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(cachedLease.getOwner())) {
                    this.logger.info("Partition {} lease was acquired already by owner '{}'", (Object)serverLease.getLeaseToken(), (Object)serverLease.getOwner());
                    throw new LeaseLostException(serverLease);
                }
                cachedLease.setTimestamp(Instant.now());
                cachedLease.setConcurrencyToken(serverLease.getConcurrencyToken());
                throw new LeaseConflictException(cachedLease, "Partition update failed");
            });
        }).retryWhen((Retry)Retry.max((long)5L).filter(throwable -> {
            if (throwable instanceof LeaseConflictException) {
                this.logger.info("Partition {} for the lease with token '{}' failed to update for owner '{}'; will retry.", new Object[]{cachedLease.getLeaseToken(), cachedLease.getConcurrencyToken(), cachedLease.getOwner()});
                return true;
            }
            return false;
        })).onErrorResume(throwable -> {
            if (throwable instanceof LeaseConflictException) {
                this.logger.warn("Partition {} for the lease with token '{}' failed to update for owner '{}'; current continuation token '{}'.", new Object[]{cachedLease.getLeaseToken(), cachedLease.getConcurrencyToken(), cachedLease.getOwner(), cachedLease.getContinuationToken(), throwable});
                return Mono.just((Object)cachedLease);
            }
            return Mono.error((Throwable)throwable);
        });
    }

    private Mono<InternalObjectNode> tryReplaceLease(Lease lease, String itemId, PartitionKey partitionKey) throws LeaseLostException {
        return this.client.replaceItem(itemId, partitionKey, lease, this.getCreateIfMatchOptions(lease)).map(cosmosItemResponse -> BridgeInternal.getProperties(cosmosItemResponse)).onErrorResume(re -> {
            if (re instanceof CosmosException) {
                CosmosException ex = (CosmosException)((Object)((Object)re));
                switch (ex.getStatusCode()) {
                    case 412: {
                        return Mono.empty();
                    }
                    case 409: {
                        throw new LeaseLostException(lease, (Exception)((Object)ex), false);
                    }
                    case 404: {
                        throw new LeaseLostException(lease, (Exception)((Object)ex), true);
                    }
                }
                return Mono.error((Throwable)re);
            }
            return Mono.error((Throwable)re);
        });
    }

    private CosmosItemRequestOptions getCreateIfMatchOptions(Lease lease) {
        CosmosItemRequestOptions createIfMatchOptions = new CosmosItemRequestOptions();
        createIfMatchOptions.setIfMatchETag(lease.getConcurrencyToken());
        return createIfMatchOptions;
    }
}

