/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.common;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ChangeFeedContextClientImpl
implements ChangeFeedContextClient {
    private static final Logger logger = LoggerFactory.getLogger(ChangeFeedContextClientImpl.class);
    private final AsyncDocumentClient documentClient;
    private final CosmosAsyncContainer cosmosContainer;
    private Scheduler scheduler;

    public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer) {
        this(cosmosContainer, Schedulers.boundedElastic());
    }

    public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer, Scheduler scheduler) {
        Preconditions.checkNotNull(cosmosContainer, "Argument 'cosmosContainer' can not be null");
        this.cosmosContainer = cosmosContainer;
        this.documentClient = CosmosBridgeInternal.getContextClient(cosmosContainer);
        this.scheduler = scheduler;
    }

    @Override
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Mono<List<PartitionKeyRange>> getOverlappingRanges(Range<String> range) {
        AsyncDocumentClient clientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(this.cosmosContainer.getDatabase());
        return clientWrapper.getCollectionCache().resolveByNameAsync(null, BridgeInternal.extractContainerSelfLink(this.cosmosContainer), null).flatMap(collection -> clientWrapper.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(null, collection.getResourceId(), range, true, null)).flatMap(pkRangesValueHolder -> {
            if (pkRangesValueHolder == null || pkRangesValueHolder.v == null) {
                logger.warn("There are no overlapping ranges found for range {}", (Object)range);
                return Mono.just(new ArrayList());
            }
            return Mono.just((Object)((List)pkRangesValueHolder.v));
        }).publishOn(this.scheduler);
    }

    @Override
    public Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRangeFeed(String partitionKeyRangesOrCollectionLink, CosmosQueryRequestOptions cosmosQueryRequestOptions) {
        return this.documentClient.readPartitionKeyRanges(partitionKeyRangesOrCollectionLink, cosmosQueryRequestOptions).publishOn(this.scheduler);
    }

    @Override
    public <T> Flux<FeedResponse<T>> createDocumentChangeFeedQuery(CosmosAsyncContainer collectionLink, CosmosChangeFeedRequestOptions changeFeedRequestOptions, Class<T> klass) {
        CosmosChangeFeedRequestOptions effectiveRequestOptions = ModelBridgeInternal.disableSplitHandling(changeFeedRequestOptions);
        AsyncDocumentClient clientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(collectionLink.getDatabase());
        Flux feedResponseFlux = clientWrapper.getCollectionCache().resolveByNameAsync(null, BridgeInternal.extractContainerSelfLink(collectionLink), null).flatMapMany(collection -> {
            if (collection == null) {
                throw new IllegalStateException("Collection cannot be null");
            }
            return clientWrapper.queryDocumentChangeFeed((DocumentCollection)collection, effectiveRequestOptions, Document.class).map(response -> {
                List results = response.getResults().stream().map(document -> ModelBridgeInternal.toObjectFromJsonSerializable(document, klass)).collect(Collectors.toList());
                return BridgeInternal.toFeedResponsePage(results, response.getResponseHeaders(), ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor().getNoChanges(response), response.getCosmosDiagnostics());
            });
        });
        return feedResponseFlux.publishOn(this.scheduler);
    }

    @Override
    public Mono<CosmosDatabaseResponse> readDatabase(CosmosAsyncDatabase database, CosmosDatabaseRequestOptions options) {
        return database.read().publishOn(this.scheduler);
    }

    @Override
    public Mono<CosmosContainerResponse> readContainer(CosmosAsyncContainer containerLink, CosmosContainerRequestOptions options) {
        return containerLink.read(options).publishOn(this.scheduler);
    }

    @Override
    public <T> Mono<CosmosItemResponse<T>> createItem(CosmosAsyncContainer containerLink, T document, CosmosItemRequestOptions options, boolean disableAutomaticIdGeneration) {
        if (options != null) {
            return containerLink.createItem(document, options).publishOn(this.scheduler);
        }
        return containerLink.createItem(document).publishOn(this.scheduler);
    }

    @Override
    public Mono<CosmosItemResponse<Object>> deleteItem(String itemId, PartitionKey partitionKey, CosmosItemRequestOptions options) {
        return this.cosmosContainer.deleteItem(itemId, partitionKey, options).publishOn(this.scheduler);
    }

    @Override
    public Flux<CosmosBulkOperationResponse<Object>> deleteAllItems(List<CosmosItemIdentity> cosmosItemIdentities) {
        ArrayList<CosmosItemOperation> operations = new ArrayList<CosmosItemOperation>();
        for (CosmosItemIdentity cosmosItemIdentity : cosmosItemIdentities) {
            operations.add(CosmosBulkOperations.getDeleteItemOperation(cosmosItemIdentity.getId(), cosmosItemIdentity.getPartitionKey()));
        }
        return this.cosmosContainer.executeBulkOperations((Flux<CosmosItemOperation>)Flux.fromIterable(operations)).publishOn(this.scheduler);
    }

    @Override
    public <T> Mono<CosmosItemResponse<T>> replaceItem(String itemId, PartitionKey partitionKey, T document, CosmosItemRequestOptions options) {
        return this.cosmosContainer.replaceItem(document, itemId, partitionKey, options).publishOn(this.scheduler);
    }

    @Override
    public <T> Mono<CosmosItemResponse<T>> readItem(String itemId, PartitionKey partitionKey, CosmosItemRequestOptions options, Class<T> itemType) {
        return this.cosmosContainer.readItem(itemId, partitionKey, options, itemType).publishOn(this.scheduler);
    }

    @Override
    public <T> Flux<FeedResponse<T>> queryItems(CosmosAsyncContainer containerLink, SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class<T> klass) {
        return containerLink.queryItems(querySpec, options, klass).byPage().publishOn(this.scheduler);
    }

    @Override
    public URI getServiceEndpoint() {
        return this.documentClient.getServiceEndpoint();
    }

    @Override
    public Mono<CosmosContainerProperties> readContainerSettings(CosmosAsyncContainer containerLink, CosmosContainerRequestOptions options) {
        return containerLink.read(options).map(CosmosContainerResponse::getProperties);
    }

    @Override
    public CosmosAsyncContainer getContainerClient() {
        return this.cosmosContainer;
    }

    @Override
    public CosmosAsyncDatabase getDatabaseClient() {
        return this.cosmosContainer.getDatabase();
    }

    @Override
    public void close() {
    }
}

