/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.internal.changefeed.implementation;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncItem;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.SqlParameter;
import com.azure.cosmos.SqlParameterList;
import com.azure.cosmos.SqlQuerySpec;
import com.azure.cosmos.internal.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.internal.changefeed.Lease;
import com.azure.cosmos.internal.changefeed.LeaseStore;
import com.azure.cosmos.internal.changefeed.LeaseStoreManager;
import com.azure.cosmos.internal.changefeed.LeaseStoreManagerSettings;
import com.azure.cosmos.internal.changefeed.RequestOptionsFactory;
import com.azure.cosmos.internal.changefeed.ServiceItemLease;
import com.azure.cosmos.internal.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.internal.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.internal.changefeed.implementation.DocumentServiceLeaseStore;
import com.azure.cosmos.internal.changefeed.implementation.DocumentServiceLeaseUpdaterImpl;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class LeaseStoreManagerImpl
implements LeaseStoreManager,
LeaseStoreManager.LeaseStoreManagerBuilderDefinition {
    private final String LEASE_STORE_MANAGER_LEASE_SUFFIX = "..";
    private final Logger logger = LoggerFactory.getLogger(LeaseStoreManagerImpl.class);
    private LeaseStoreManagerSettings settings = new LeaseStoreManagerSettings();
    private ChangeFeedContextClient leaseDocumentClient;
    private RequestOptionsFactory requestOptionsFactory;
    private ServiceItemLeaseUpdater leaseUpdater;
    private LeaseStore leaseStore;

    public static LeaseStoreManager.LeaseStoreManagerBuilderDefinition Builder() {
        return new LeaseStoreManagerImpl();
    }

    @Override
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition leaseContextClient(ChangeFeedContextClient leaseContextClient) {
        if (leaseContextClient == null) {
            throw new IllegalArgumentException("leaseContextClient");
        }
        this.leaseDocumentClient = leaseContextClient;
        return this;
    }

    @Override
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition leasePrefix(String leasePrefix) {
        if (leasePrefix == null) {
            throw new IllegalArgumentException("leasePrefix");
        }
        this.settings.withContainerNamePrefix(leasePrefix);
        return this;
    }

    @Override
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition leaseCollectionLink(CosmosAsyncContainer leaseCollectionLink) {
        if (leaseCollectionLink == null) {
            throw new IllegalArgumentException("leaseCollectionLink");
        }
        this.settings.withLeaseCollectionLink(leaseCollectionLink);
        return this;
    }

    @Override
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition requestOptionsFactory(RequestOptionsFactory requestOptionsFactory) {
        if (requestOptionsFactory == null) {
            throw new IllegalArgumentException("requestOptionsFactory");
        }
        this.requestOptionsFactory = requestOptionsFactory;
        return this;
    }

    @Override
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition hostName(String hostName) {
        if (hostName == null) {
            throw new IllegalArgumentException("hostName");
        }
        this.settings.withHostName(hostName);
        return this;
    }

    @Override
    public Mono<LeaseStoreManager> build() {
        if (this.settings == null) {
            throw new IllegalArgumentException("properties");
        }
        if (this.settings.getContainerNamePrefix() == null) {
            throw new IllegalArgumentException("properties.containerNamePrefix");
        }
        if (this.settings.getLeaseCollectionLink() == null) {
            throw new IllegalArgumentException("properties.leaseCollectionLink");
        }
        if (this.settings.getHostName() == null || this.settings.getHostName().isEmpty()) {
            throw new IllegalArgumentException("properties.hostName");
        }
        if (this.leaseDocumentClient == null) {
            throw new IllegalArgumentException("leaseDocumentClient");
        }
        if (this.requestOptionsFactory == null) {
            throw new IllegalArgumentException("requestOptionsFactory");
        }
        if (this.leaseUpdater == null) {
            this.leaseUpdater = new DocumentServiceLeaseUpdaterImpl(this.leaseDocumentClient);
        }
        this.leaseStore = new DocumentServiceLeaseStore(this.leaseDocumentClient, this.settings.getContainerNamePrefix(), this.settings.getLeaseCollectionLink(), this.requestOptionsFactory);
        if (this.settings.getLeaseCollectionLink() == null) {
            throw new IllegalArgumentException("leaseCollectionLink was not specified");
        }
        if (this.requestOptionsFactory == null) {
            throw new IllegalArgumentException("requestOptionsFactory was not specified");
        }
        return Mono.just((Object)this);
    }

    @Override
    public Flux<Lease> getAllLeases() {
        return this.listDocuments(this.getPartitionLeasePrefix()).map(documentServiceLease -> documentServiceLease);
    }

    @Override
    public Flux<Lease> getOwnedLeases() {
        return this.getAllLeases().filter(lease -> lease.getOwner() != null && lease.getOwner().equalsIgnoreCase(this.settings.getHostName()));
    }

    @Override
    public Mono<Lease> createLeaseIfNotExist(String leaseToken, String continuationToken) {
        if (leaseToken == null) {
            throw new IllegalArgumentException("leaseToken");
        }
        String leaseDocId = this.getDocumentId(leaseToken);
        ServiceItemLease documentServiceLease = new ServiceItemLease().withId(leaseDocId).withLeaseToken(leaseToken).withContinuationToken(continuationToken);
        return this.leaseDocumentClient.createItem(this.settings.getLeaseCollectionLink(), documentServiceLease, null, false).onErrorResume(ex -> {
            CosmosClientException e;
            if (ex instanceof CosmosClientException && (e = (CosmosClientException)ex).getStatusCode() == 409) {
                this.logger.info("Some other host created lease for {}.", (Object)leaseToken);
                return Mono.empty();
            }
            return Mono.error((Throwable)ex);
        }).map(documentResourceResponse -> {
            if (documentResourceResponse == null) {
                return null;
            }
            CosmosItemProperties document = documentResourceResponse.getProperties();
            this.logger.info("Created lease for partition {}.", (Object)leaseToken);
            return documentServiceLease.withId(document.getId()).withEtag(document.getETag()).withTs(document.getString("_ts"));
        });
    }

    @Override
    public Mono<Void> delete(Lease lease) {
        if (lease == null || lease.getId() == null) {
            throw new IllegalArgumentException("lease");
        }
        CosmosAsyncItem itemForLease = this.createItemForLease(lease.getId());
        return this.leaseDocumentClient.deleteItem(itemForLease, this.requestOptionsFactory.createRequestOptions(lease)).onErrorResume(ex -> {
            CosmosClientException e;
            if (ex instanceof CosmosClientException && (e = (CosmosClientException)ex).getStatusCode() == 404) {
                return Mono.empty();
            }
            return Mono.error((Throwable)ex);
        }).map(documentResourceResponse -> true).then();
    }

    @Override
    public Mono<Lease> acquire(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String oldOwner = lease.getOwner();
        return this.leaseUpdater.updateLease(lease, this.createItemForLease(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease), serverLease -> {
            if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(oldOwner)) {
                this.logger.info("Partition {} lease was taken over by owner '{}'", (Object)lease.getLeaseToken(), (Object)serverLease.getOwner());
                throw new LeaseLostException(lease);
            }
            serverLease.setOwner(this.settings.getHostName());
            serverLease.setProperties(lease.getProperties());
            return serverLease;
        });
    }

    @Override
    public Mono<Void> release(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        CosmosAsyncItem itemForLease = this.createItemForLease(lease.getId());
        return this.leaseDocumentClient.readItem(itemForLease, this.requestOptionsFactory.createRequestOptions(lease)).onErrorResume(ex -> {
            CosmosClientException e;
            if (ex instanceof CosmosClientException && (e = (CosmosClientException)ex).getStatusCode() == 404) {
                this.logger.info("Partition {} failed to renew lease. The lease is gone already.", (Object)lease.getLeaseToken());
                throw new LeaseLostException(lease);
            }
            return Mono.error((Throwable)ex);
        }).map(documentResourceResponse -> ServiceItemLease.fromDocument(documentResourceResponse.getProperties())).flatMap(refreshedLease -> this.leaseUpdater.updateLease((Lease)refreshedLease, this.createItemForLease(refreshedLease.getId()), this.requestOptionsFactory.createRequestOptions(lease), serverLease -> {
            if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
                this.logger.info("Partition {} no need to release lease. The lease was already taken by another host '{}'.", (Object)lease.getLeaseToken(), (Object)serverLease.getOwner());
                throw new LeaseLostException(lease);
            }
            serverLease.setOwner(null);
            return serverLease;
        })).then();
    }

    @Override
    public Mono<Lease> renew(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        CosmosAsyncItem itemForLease = this.createItemForLease(lease.getId());
        return this.leaseDocumentClient.readItem(itemForLease, this.requestOptionsFactory.createRequestOptions(lease)).onErrorResume(ex -> {
            CosmosClientException e;
            if (ex instanceof CosmosClientException && (e = (CosmosClientException)ex).getStatusCode() == 404) {
                this.logger.info("Partition {} failed to renew lease. The lease is gone already.", (Object)lease.getLeaseToken());
                throw new LeaseLostException(lease);
            }
            return Mono.error((Throwable)ex);
        }).map(documentResourceResponse -> ServiceItemLease.fromDocument(documentResourceResponse.getProperties())).flatMap(refreshedLease -> this.leaseUpdater.updateLease((Lease)refreshedLease, this.createItemForLease(refreshedLease.getId()), this.requestOptionsFactory.createRequestOptions(lease), serverLease -> {
            if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
                this.logger.info("Partition {} lease was taken over by owner '{}'", (Object)lease.getLeaseToken(), (Object)serverLease.getOwner());
                throw new LeaseLostException(lease);
            }
            return serverLease;
        }));
    }

    @Override
    public Mono<Lease> updateProperties(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        if (!lease.getOwner().equalsIgnoreCase(this.settings.getHostName())) {
            this.logger.info("Partition '{}' lease was taken over by owner '{}' before lease item update", (Object)lease.getLeaseToken(), (Object)lease.getOwner());
            throw new LeaseLostException(lease);
        }
        return this.leaseUpdater.updateLease(lease, this.createItemForLease(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease), serverLease -> {
            if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
                this.logger.info("Partition '{}' lease was taken over by owner '{}'", (Object)lease.getLeaseToken(), (Object)serverLease.getOwner());
                throw new LeaseLostException(lease);
            }
            serverLease.setProperties(lease.getProperties());
            return serverLease;
        });
    }

    @Override
    public Mono<Lease> checkpoint(Lease lease, String continuationToken) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        if (continuationToken == null || continuationToken.isEmpty()) {
            throw new IllegalArgumentException("continuationToken must be a non-empty string");
        }
        CosmosAsyncItem itemForLease = this.createItemForLease(lease.getId());
        return this.leaseDocumentClient.readItem(itemForLease, this.requestOptionsFactory.createRequestOptions(lease)).map(documentResourceResponse -> ServiceItemLease.fromDocument(documentResourceResponse.getProperties())).flatMap(refreshedLease -> this.leaseUpdater.updateLease((Lease)refreshedLease, this.createItemForLease(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease), serverLease -> {
            if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
                this.logger.info("Partition {} lease was taken over by owner '{}'", (Object)lease.getLeaseToken(), (Object)serverLease.getOwner());
                throw new LeaseLostException(lease);
            }
            serverLease.setContinuationToken(continuationToken);
            return serverLease;
        }));
    }

    @Override
    public Mono<Boolean> isInitialized() {
        return this.leaseStore.isInitialized();
    }

    @Override
    public Mono<Boolean> markInitialized() {
        return this.leaseStore.markInitialized();
    }

    @Override
    public Mono<Boolean> acquireInitializationLock(Duration lockExpirationTime) {
        return this.leaseStore.acquireInitializationLock(lockExpirationTime);
    }

    @Override
    public Mono<Boolean> releaseInitializationLock() {
        return this.leaseStore.releaseInitializationLock();
    }

    private Mono<ServiceItemLease> tryGetLease(Lease lease) {
        CosmosAsyncItem itemForLease = this.createItemForLease(lease.getId());
        return this.leaseDocumentClient.readItem(itemForLease, this.requestOptionsFactory.createRequestOptions(lease)).onErrorResume(ex -> {
            CosmosClientException e;
            if (ex instanceof CosmosClientException && (e = (CosmosClientException)ex).getStatusCode() == 404) {
                return Mono.empty();
            }
            return Mono.error((Throwable)ex);
        }).map(documentResourceResponse -> {
            if (documentResourceResponse == null) {
                return null;
            }
            return ServiceItemLease.fromDocument(documentResourceResponse.getProperties());
        });
    }

    private Flux<ServiceItemLease> listDocuments(String prefix) {
        if (prefix == null || prefix.isEmpty()) {
            throw new IllegalArgumentException("prefix");
        }
        SqlParameter param = new SqlParameter();
        param.setName("@PartitionLeasePrefix");
        param.setValue(prefix);
        SqlQuerySpec querySpec = new SqlQuerySpec("SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix)", new SqlParameterList(param));
        Flux<FeedResponse<CosmosItemProperties>> query = this.leaseDocumentClient.queryItems(this.settings.getLeaseCollectionLink(), querySpec, this.requestOptionsFactory.createFeedOptions());
        return query.flatMap(documentFeedResponse -> Flux.fromIterable(documentFeedResponse.getResults())).map(ServiceItemLease::fromDocument);
    }

    private String getDocumentId(String leaseToken) {
        return this.getPartitionLeasePrefix() + leaseToken;
    }

    private String getPartitionLeasePrefix() {
        return this.settings.getContainerNamePrefix() + "..";
    }

    private CosmosAsyncItem createItemForLease(String leaseId) {
        return this.leaseDocumentClient.getContainerClient().getItem(leaseId, "/id");
    }
}

