/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.common;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import java.net.URI;
import java.util.List;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ChangeFeedContextClientImpl
implements ChangeFeedContextClient {
    private final AsyncDocumentClient documentClient;
    private final CosmosAsyncContainer cosmosContainer;
    private Scheduler scheduler;

    public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer) {
        if (cosmosContainer == null) {
            throw new IllegalArgumentException("cosmosContainer");
        }
        this.cosmosContainer = cosmosContainer;
        this.documentClient = CosmosBridgeInternal.getContextClient(cosmosContainer);
        this.scheduler = Schedulers.boundedElastic();
    }

    public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer, Scheduler scheduler) {
        if (cosmosContainer == null) {
            throw new IllegalArgumentException("cosmosContainer");
        }
        this.cosmosContainer = cosmosContainer;
        this.documentClient = CosmosBridgeInternal.getContextClient(cosmosContainer);
        this.scheduler = scheduler;
    }

    @Override
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRangeFeed(String partitionKeyRangesOrCollectionLink, CosmosQueryRequestOptions cosmosQueryRequestOptions) {
        return this.documentClient.readPartitionKeyRanges(partitionKeyRangesOrCollectionLink, cosmosQueryRequestOptions).publishOn(this.scheduler);
    }

    @Override
    public <T> Flux<FeedResponse<T>> createDocumentChangeFeedQuery(CosmosAsyncContainer collectionLink, CosmosChangeFeedRequestOptions changeFeedRequestOptions, Class<T> klass) {
        CosmosChangeFeedRequestOptions effectiveRequestOptions = ModelBridgeInternal.disableSplitHandling(changeFeedRequestOptions);
        AsyncDocumentClient clientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(collectionLink.getDatabase());
        Flux feedResponseFlux = clientWrapper.getCollectionCache().resolveByNameAsync(null, BridgeInternal.extractContainerSelfLink(collectionLink), null).flatMapMany(collection -> {
            if (collection == null) {
                throw new IllegalStateException("Collection cannot be null");
            }
            return clientWrapper.queryDocumentChangeFeed((DocumentCollection)collection, effectiveRequestOptions, Document.class).map(response -> {
                List results = response.getResults().stream().map(document -> ModelBridgeInternal.toObjectFromJsonSerializable(document, klass)).collect(Collectors.toList());
                return BridgeInternal.toFeedResponsePage(results, response.getResponseHeaders(), false);
            });
        });
        return feedResponseFlux.publishOn(this.scheduler);
    }

    @Override
    public Mono<CosmosDatabaseResponse> readDatabase(CosmosAsyncDatabase database, CosmosDatabaseRequestOptions options) {
        return database.read().publishOn(this.scheduler);
    }

    @Override
    public Mono<CosmosContainerResponse> readContainer(CosmosAsyncContainer containerLink, CosmosContainerRequestOptions options) {
        return containerLink.read(options).publishOn(this.scheduler);
    }

    @Override
    public <T> Mono<CosmosItemResponse<T>> createItem(CosmosAsyncContainer containerLink, T document, CosmosItemRequestOptions options, boolean disableAutomaticIdGeneration) {
        if (options != null) {
            return containerLink.createItem(document, options).publishOn(this.scheduler);
        }
        return containerLink.createItem(document).publishOn(this.scheduler);
    }

    @Override
    public Mono<CosmosItemResponse<Object>> deleteItem(String itemId, PartitionKey partitionKey, CosmosItemRequestOptions options) {
        return this.cosmosContainer.deleteItem(itemId, partitionKey, options).publishOn(this.scheduler);
    }

    @Override
    public <T> Mono<CosmosItemResponse<T>> replaceItem(String itemId, PartitionKey partitionKey, T document, CosmosItemRequestOptions options) {
        return this.cosmosContainer.replaceItem(document, itemId, partitionKey, options).publishOn(this.scheduler);
    }

    @Override
    public <T> Mono<CosmosItemResponse<T>> readItem(String itemId, PartitionKey partitionKey, CosmosItemRequestOptions options, Class<T> itemType) {
        return this.cosmosContainer.readItem(itemId, partitionKey, options, itemType).publishOn(this.scheduler);
    }

    @Override
    public <T> Flux<FeedResponse<T>> queryItems(CosmosAsyncContainer containerLink, SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class<T> klass) {
        return containerLink.queryItems(querySpec, options, klass).byPage().publishOn(this.scheduler);
    }

    @Override
    public URI getServiceEndpoint() {
        return this.documentClient.getServiceEndpoint();
    }

    @Override
    public Mono<CosmosContainerProperties> readContainerSettings(CosmosAsyncContainer containerLink, CosmosContainerRequestOptions options) {
        return containerLink.read(options).map(CosmosContainerResponse::getProperties);
    }

    @Override
    public CosmosAsyncContainer getContainerClient() {
        return this.cosmosContainer;
    }

    @Override
    public CosmosAsyncDatabase getDatabaseClient() {
        return this.cosmosContainer.getDatabase();
    }

    @Override
    public void close() {
    }
}

