/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.implementation.CosmosItemProperties;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseConflictException;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.models.AccessCondition;
import com.azure.cosmos.models.AccessConditionType;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

class DocumentServiceLeaseUpdaterImpl
implements ServiceItemLeaseUpdater {
    private final Logger logger = LoggerFactory.getLogger(DocumentServiceLeaseUpdaterImpl.class);
    private final int RETRY_COUNT_ON_CONFLICT = 5;
    private final ChangeFeedContextClient client;

    public DocumentServiceLeaseUpdaterImpl(ChangeFeedContextClient client) {
        if (client == null) {
            throw new IllegalArgumentException("client");
        }
        this.client = client;
    }

    @Override
    public Mono<Lease> updateLease(Lease cachedLease, String itemId, PartitionKey partitionKey, CosmosItemRequestOptions requestOptions, Function<Lease, Lease> updateLease) {
        Lease[] arrayLease = new Lease[]{cachedLease};
        arrayLease[0] = updateLease.apply(cachedLease);
        if (arrayLease[0] == null) {
            return Mono.empty();
        }
        arrayLease[0].setTimestamp(ZonedDateTime.now(ZoneId.of("UTC")));
        return this.tryReplaceLease(arrayLease[0], itemId, partitionKey).map(leaseDocument -> {
            arrayLease[0] = ServiceItemLease.fromDocument(leaseDocument);
            return arrayLease[0];
        }).hasElement().flatMap(hasItems -> {
            if (hasItems.booleanValue()) {
                return Mono.just((Object)arrayLease[0]);
            }
            return this.client.readItem(itemId, partitionKey, requestOptions, CosmosItemProperties.class).onErrorResume(throwable -> {
                CosmosClientException ex;
                if (throwable instanceof CosmosClientException && (ex = (CosmosClientException)throwable).getStatusCode() == 404) {
                    throw new LeaseLostException(arrayLease[0]);
                }
                return Mono.error((Throwable)throwable);
            }).map(cosmosItemResponse -> {
                CosmosItemProperties document = BridgeInternal.getProperties(cosmosItemResponse);
                ServiceItemLease serverLease = ServiceItemLease.fromDocument(document);
                this.logger.info("Partition {} update failed because the lease with token '{}' was updated by owner '{}' with token '{}'.", new Object[]{arrayLease[0].getLeaseToken(), arrayLease[0].getConcurrencyToken(), serverLease.getOwner(), serverLease.getConcurrencyToken()});
                arrayLease[0] = serverLease;
                throw new LeaseConflictException(arrayLease[0], "Partition update failed");
            });
        }).retry(5L, throwable -> {
            if (throwable instanceof LeaseConflictException) {
                this.logger.info("Partition {} for the lease with token '{}' failed to update for owner '{}'; will retry.", new Object[]{arrayLease[0].getLeaseToken(), arrayLease[0].getConcurrencyToken(), arrayLease[0].getOwner()});
                return true;
            }
            return false;
        }).onErrorResume(throwable -> {
            if (throwable instanceof LeaseConflictException) {
                this.logger.warn("Partition {} for the lease with token '{}' failed to update for owner '{}'; current continuation token '{}'.", new Object[]{arrayLease[0].getLeaseToken(), arrayLease[0].getConcurrencyToken(), arrayLease[0].getOwner(), arrayLease[0].getContinuationToken(), throwable});
                return Mono.just((Object)arrayLease[0]);
            }
            return Mono.error((Throwable)throwable);
        });
    }

    private Mono<CosmosItemProperties> tryReplaceLease(Lease lease, String itemId, PartitionKey partitionKey) throws LeaseLostException {
        return this.client.replaceItem(itemId, partitionKey, lease, this.getCreateIfMatchOptions(lease)).map(cosmosItemResponse -> BridgeInternal.getProperties(cosmosItemResponse)).onErrorResume(re -> {
            if (re instanceof CosmosClientException) {
                CosmosClientException ex = (CosmosClientException)re;
                switch (ex.getStatusCode()) {
                    case 412: {
                        return Mono.empty();
                    }
                    case 409: {
                        throw new LeaseLostException(lease, ex, false);
                    }
                    case 404: {
                        throw new LeaseLostException(lease, ex, true);
                    }
                }
                return Mono.error((Throwable)re);
            }
            return Mono.error((Throwable)re);
        });
    }

    private CosmosItemRequestOptions getCreateIfMatchOptions(Lease lease) {
        AccessCondition ifMatchCondition = new AccessCondition();
        ifMatchCondition.setType(AccessConditionType.IF_MATCH);
        ifMatchCondition.setCondition(lease.getConcurrencyToken());
        CosmosItemRequestOptions createIfMatchOptions = new CosmosItemRequestOptions();
        createIfMatchOptions.setAccessCondition(ifMatchCondition);
        return createIfMatchOptions;
    }
}

