/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.data.cosmos.core;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemRequestOptions;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.UniqueKeyPolicy;
import com.azure.spring.data.cosmos.CosmosFactory;
import com.azure.spring.data.cosmos.common.CosmosUtils;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.config.DatabaseThroughputConfig;
import com.azure.spring.data.cosmos.core.CosmosSchedulers;
import com.azure.spring.data.cosmos.core.ReactiveCosmosOperations;
import com.azure.spring.data.cosmos.core.ResponseDiagnosticsProcessor;
import com.azure.spring.data.cosmos.core.convert.MappingCosmosConverter;
import com.azure.spring.data.cosmos.core.generator.CountQueryGenerator;
import com.azure.spring.data.cosmos.core.generator.FindQuerySpecGenerator;
import com.azure.spring.data.cosmos.core.generator.NativeQueryGenerator;
import com.azure.spring.data.cosmos.core.mapping.event.AfterLoadEvent;
import com.azure.spring.data.cosmos.core.mapping.event.CosmosMappingEvent;
import com.azure.spring.data.cosmos.core.query.CosmosQuery;
import com.azure.spring.data.cosmos.core.query.Criteria;
import com.azure.spring.data.cosmos.core.query.CriteriaType;
import com.azure.spring.data.cosmos.exception.CosmosExceptionUtils;
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation;
import com.fasterxml.jackson.databind.JsonNode;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.auditing.IsNewAwareAuditingHandler;
import org.springframework.data.domain.Sort;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveCosmosTemplate
implements ReactiveCosmosOperations,
ApplicationContextAware {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveCosmosTemplate.class);
    private final CosmosFactory cosmosFactory;
    private final MappingCosmosConverter mappingCosmosConverter;
    private final ResponseDiagnosticsProcessor responseDiagnosticsProcessor;
    private final boolean queryMetricsEnabled;
    private final int maxDegreeOfParallelism;
    private final int maxBufferedItemCount;
    private final int responseContinuationTokenLimitInKb;
    private final IsNewAwareAuditingHandler cosmosAuditingHandler;
    private final DatabaseThroughputConfig databaseThroughputConfig;
    private boolean pointReadWarningLogged = false;
    private ApplicationContext applicationContext;

    public ReactiveCosmosTemplate(CosmosAsyncClient client, String databaseName, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter, IsNewAwareAuditingHandler cosmosAuditingHandler) {
        this(new CosmosFactory(client, databaseName), cosmosConfig, mappingCosmosConverter, cosmosAuditingHandler);
    }

    public ReactiveCosmosTemplate(CosmosAsyncClient client, String databaseName, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter) {
        this(new CosmosFactory(client, databaseName), cosmosConfig, mappingCosmosConverter, null);
    }

    public ReactiveCosmosTemplate(CosmosFactory cosmosFactory, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter, IsNewAwareAuditingHandler cosmosAuditingHandler) {
        Assert.notNull((Object)cosmosFactory, (String)"CosmosFactory must not be null!");
        Assert.notNull((Object)cosmosConfig, (String)"CosmosConfig must not be null!");
        Assert.notNull((Object)mappingCosmosConverter, (String)"MappingCosmosConverter must not be null!");
        this.mappingCosmosConverter = mappingCosmosConverter;
        this.cosmosFactory = cosmosFactory;
        this.responseDiagnosticsProcessor = cosmosConfig.getResponseDiagnosticsProcessor();
        this.queryMetricsEnabled = cosmosConfig.isQueryMetricsEnabled();
        this.maxDegreeOfParallelism = cosmosConfig.getMaxDegreeOfParallelism();
        this.maxBufferedItemCount = cosmosConfig.getMaxBufferedItemCount();
        this.responseContinuationTokenLimitInKb = cosmosConfig.getResponseContinuationTokenLimitInKb();
        this.cosmosAuditingHandler = cosmosAuditingHandler;
        this.databaseThroughputConfig = cosmosConfig.getDatabaseThroughputConfig();
    }

    public ReactiveCosmosTemplate(CosmosFactory cosmosFactory, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter) {
        this(cosmosFactory, cosmosConfig, mappingCosmosConverter, null);
    }

    private String getDatabaseName() {
        return this.cosmosFactory.getDatabaseName();
    }

    private CosmosAsyncClient getCosmosAsyncClient() {
        return this.cosmosFactory.getCosmosAsyncClient();
    }

    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public Mono<CosmosContainerResponse> createContainerIfNotExists(CosmosEntityInformation<?, ?> information) {
        return this.createDatabaseIfNotExists().publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to create database", throwable, this.responseDiagnosticsProcessor)).flatMap(cosmosDatabaseResponse -> {
            Mono cosmosContainerResponseMono;
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosDatabaseResponse.getDiagnostics(), null);
            CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(this.getContainerNameOverride(information.getContainerName()), information.getPartitionKeyPath());
            cosmosContainerProperties.setDefaultTimeToLiveInSeconds(information.getTimeToLive());
            cosmosContainerProperties.setIndexingPolicy(information.getIndexingPolicy());
            UniqueKeyPolicy uniqueKeyPolicy = information.getUniqueKeyPolicy();
            if (uniqueKeyPolicy != null) {
                cosmosContainerProperties.setUniqueKeyPolicy(uniqueKeyPolicy);
            }
            CosmosAsyncDatabase database = this.getCosmosAsyncClient().getDatabase(cosmosDatabaseResponse.getProperties().getId());
            if (information.getRequestUnit() == null) {
                cosmosContainerResponseMono = database.createContainerIfNotExists(cosmosContainerProperties);
            } else {
                ThroughputProperties throughputProperties = information.isAutoScale() ? ThroughputProperties.createAutoscaledThroughput((int)information.getRequestUnit()) : ThroughputProperties.createManualThroughput((int)information.getRequestUnit());
                cosmosContainerResponseMono = database.createContainerIfNotExists(cosmosContainerProperties, throughputProperties);
            }
            return cosmosContainerResponseMono.map(cosmosContainerResponse -> {
                CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosContainerResponse.getDiagnostics(), null);
                return cosmosContainerResponse;
            }).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to create container", throwable, this.responseDiagnosticsProcessor));
        });
    }

    private Mono<CosmosDatabaseResponse> createDatabaseIfNotExists() {
        if (this.databaseThroughputConfig == null) {
            return this.getCosmosAsyncClient().createDatabaseIfNotExists(this.getDatabaseName());
        }
        ThroughputProperties throughputProperties = this.databaseThroughputConfig.isAutoScale() ? ThroughputProperties.createAutoscaledThroughput((int)this.databaseThroughputConfig.getRequestUnits()) : ThroughputProperties.createManualThroughput((int)this.databaseThroughputConfig.getRequestUnits());
        return this.getCosmosAsyncClient().createDatabaseIfNotExists(this.getDatabaseName(), throughputProperties);
    }

    @Override
    public Mono<CosmosContainerProperties> getContainerProperties(String containerName) {
        containerName = this.getContainerNameOverride(containerName);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).read().map(CosmosContainerResponse::getProperties);
    }

    @Override
    public Mono<CosmosContainerProperties> replaceContainerProperties(String containerName, CosmosContainerProperties properties) {
        containerName = this.getContainerNameOverride(containerName);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).replace(properties).map(CosmosContainerResponse::getProperties);
    }

    @Override
    public <T> Flux<T> findAll(String containerName, Class<T> domainType) {
        CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
        return this.find(query, domainType, containerName);
    }

    @Override
    public <T> Flux<T> findAll(Class<T> domainType) {
        return this.findAll(domainType.getSimpleName(), domainType);
    }

    @Override
    public <T> Flux<T> findAll(PartitionKey partitionKey, Class<T> domainType) {
        Assert.notNull((Object)partitionKey, (String)"partitionKey should not be null");
        Assert.notNull(domainType, (String)"domainType should not be null");
        String containerName = this.getContainerName(domainType);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        cosmosQueryRequestOptions.setPartitionKey(partitionKey);
        cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
        cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        cosmosQueryRequestOptions.setMaxBufferedItemCount(this.maxBufferedItemCount);
        cosmosQueryRequestOptions.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).queryItems("SELECT * FROM r", cosmosQueryRequestOptions, JsonNode.class).byPage().publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).flatMap(cosmosItemFeedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
            return Flux.fromIterable((Iterable)cosmosItemFeedResponse.getResults());
        }).map(cosmosItemProperties -> this.emitOnLoadEventAndConvertToDomainObject(domainType, containerName, (JsonNode)cosmosItemProperties)).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable, this.responseDiagnosticsProcessor));
    }

    @Override
    public <T> Mono<T> findById(Object id, Class<T> domainType) {
        Assert.notNull(domainType, (String)"domainType should not be null");
        return this.findById(this.getContainerName(domainType), id, domainType);
    }

    @Override
    public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType) {
        Assert.hasText((String)containerName, (String)"containerName should not be null, empty or only whitespaces");
        Assert.notNull(domainType, (String)"domainType should not be null");
        CosmosEntityInformation<T, T> cosmosEntityInformation = CosmosEntityInformation.getInstance(domainType);
        String containerPartitionKey = cosmosEntityInformation.getPartitionKeyFieldName();
        if ("id".equals(containerPartitionKey) && id != null) {
            return this.findById(id, domainType, new PartitionKey(id));
        }
        if (!this.pointReadWarningLogged) {
            LOGGER.warn("The partitionKey is not id!! Consider using findById(ID id, PartitionKey partitionKey) instead to avoid the need for using a cross partition query which results in higher latency and cost than necessary. See https://aka.ms/PointReadsInSpring for more info.");
            this.pointReadWarningLogged = true;
        }
        String finalContainerName = this.getContainerNameOverride(containerName);
        String query = "select * from root where root.id = @ROOT_ID";
        SqlParameter param = new SqlParameter("@ROOT_ID", (Object)CosmosUtils.getStringIDValue(id));
        SqlQuerySpec sqlQuerySpec = new SqlQuerySpec("select * from root where root.id = @ROOT_ID", new SqlParameter[]{param});
        CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
        options.setQueryMetricsEnabled(this.queryMetricsEnabled);
        options.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        options.setMaxBufferedItemCount(this.maxBufferedItemCount);
        options.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(finalContainerName).queryItems(sqlQuerySpec, options, JsonNode.class).byPage().publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).flatMap(cosmosItemFeedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
            return Mono.justOrEmpty(cosmosItemFeedResponse.getResults().stream().map(cosmosItem -> this.emitOnLoadEventAndConvertToDomainObject(domainType, finalContainerName, (JsonNode)cosmosItem)).findFirst());
        }).onErrorResume(throwable -> CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable, this.responseDiagnosticsProcessor)).next();
    }

    @Override
    public <T> Mono<T> findById(Object id, Class<T> domainType, PartitionKey partitionKey) {
        Assert.notNull(domainType, (String)"domainType should not be null");
        String idToFind = CosmosUtils.getStringIDValue(id);
        String containerName = this.getContainerName(domainType);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).readItem(idToFind, partitionKey, JsonNode.class).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).flatMap(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return Mono.justOrEmpty(this.emitOnLoadEventAndConvertToDomainObject(domainType, containerName, (JsonNode)cosmosItemResponse.getItem()));
        }).onErrorResume(throwable -> CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable, this.responseDiagnosticsProcessor));
    }

    @Override
    public <T> Mono<T> insert(T objectToSave, PartitionKey partitionKey) {
        return this.insert(this.getContainerName(objectToSave.getClass()), objectToSave, partitionKey);
    }

    public <T> Mono<T> insert(T objectToSave) {
        return this.insert(this.getContainerName(objectToSave.getClass()), objectToSave, null);
    }

    @Override
    public <T> Mono<T> insert(String containerName, T objectToSave, PartitionKey partitionKey) {
        Assert.hasText((String)containerName, (String)"containerName should not be null, empty or only whitespaces");
        Assert.notNull(objectToSave, (String)"objectToSave should not be null");
        containerName = this.getContainerNameOverride(containerName);
        Class<?> domainType = objectToSave.getClass();
        this.markAuditedIfConfigured(objectToSave);
        this.generateIdIfNullAndAutoGenerationEnabled(objectToSave, domainType);
        JsonNode originalItem = this.mappingCosmosConverter.writeJsonNode(objectToSave);
        CosmosItemRequestOptions options = new CosmosItemRequestOptions();
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).createItem((Object)originalItem, partitionKey, options).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to insert item", throwable, this.responseDiagnosticsProcessor)).flatMap(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return Mono.just(this.toDomainObject(domainType, (JsonNode)cosmosItemResponse.getItem()));
        });
    }

    @Override
    public <T> Mono<T> insert(String containerName, T objectToSave) {
        return this.insert(containerName, objectToSave, null);
    }

    @Override
    public <S extends T, T> Flux<S> insertAll(CosmosEntityInformation<T, ?> entityInformation, Iterable<S> entities) {
        return this.insertAll(entityInformation, Flux.fromIterable(entities));
    }

    @Override
    public <S extends T, T> Flux<S> insertAll(CosmosEntityInformation<T, ?> entityInformation, Flux<S> entities) {
        Assert.notNull(entities, (String)"entities to be inserted should not be null");
        String containerName = entityInformation.getContainerName();
        Class domainType = entityInformation.getJavaType();
        Flux cosmosItemOperationsFlux = entities.map(entity -> {
            this.generateIdIfNullAndAutoGenerationEnabled(entity, domainType);
            JsonNode originalItem = this.mappingCosmosConverter.writeJsonNode(entity);
            PartitionKey partitionKey = new PartitionKey(entityInformation.getPartitionKeyFieldValue(entity));
            CosmosBulkItemRequestOptions options = new CosmosBulkItemRequestOptions();
            this.applyBulkVersioning(domainType, originalItem, options);
            return CosmosBulkOperations.getUpsertItemOperation((Object)originalItem, (PartitionKey)partitionKey, (CosmosBulkItemRequestOptions)options);
        });
        CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
        cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).executeBulkOperations(cosmosItemOperationsFlux, cosmosBulkExecutionOptions).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to insert item(s)", throwable, this.responseDiagnosticsProcessor)).flatMap(r -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, r.getResponse().getCosmosDiagnostics(), null);
            JsonNode responseItem = (JsonNode)r.getResponse().getItem(JsonNode.class);
            return responseItem != null ? Flux.just(this.toDomainObject(domainType, responseItem)) : Flux.empty();
        });
    }

    @Override
    public <T> Mono<T> patch(Object id, PartitionKey partitionKey, Class<T> domainType, CosmosPatchOperations patchOperations) {
        return this.patch(id, partitionKey, domainType, patchOperations, null);
    }

    @Override
    public <T> Mono<T> patch(Object id, PartitionKey partitionKey, Class<T> domainType, CosmosPatchOperations patchOperations, CosmosPatchItemRequestOptions options) {
        Assert.notNull((Object)patchOperations, (String)"expected non-null cosmosPatchOperations");
        String containerName = this.getContainerName(domainType);
        Assert.notNull((Object)id, (String)"id should not be null");
        Assert.notNull((Object)partitionKey, (String)"partitionKey should not be null, empty or only whitespaces");
        Assert.notNull((Object)patchOperations, (String)"patchOperations should not be null, empty or only whitespaces");
        LOGGER.debug("execute patchItem in database {} container {}", (Object)this.getDatabaseName(), (Object)containerName);
        if (options == null) {
            options = new CosmosPatchItemRequestOptions();
        }
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).patchItem(id.toString(), partitionKey, patchOperations, options, JsonNode.class).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to patch item", throwable, this.responseDiagnosticsProcessor)).flatMap(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return Mono.just(this.toDomainObject(domainType, (JsonNode)cosmosItemResponse.getItem()));
        });
    }

    private <T> void generateIdIfNullAndAutoGenerationEnabled(T originalItem, Class<?> type) {
        CosmosEntityInformation<?, ?> entityInfo = CosmosEntityInformation.getInstance(type);
        if (entityInfo.shouldGenerateId() && ReflectionUtils.getField((Field)entityInfo.getIdField(), originalItem) == null) {
            ReflectionUtils.setField((Field)entityInfo.getIdField(), originalItem, (Object)UUID.randomUUID().toString());
        }
    }

    @Override
    public <T> Mono<T> upsert(T object) {
        return this.upsert(this.getContainerName(object.getClass()), object);
    }

    @Override
    public <T> Mono<T> upsert(String containerName, T object) {
        containerName = this.getContainerNameOverride(containerName);
        Class<?> domainType = object.getClass();
        this.markAuditedIfConfigured(object);
        JsonNode originalItem = this.mappingCosmosConverter.writeJsonNode(object);
        CosmosItemRequestOptions options = new CosmosItemRequestOptions();
        this.applyVersioning(object.getClass(), originalItem, options);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).upsertItem((Object)originalItem, options).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).flatMap(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return Mono.just(this.toDomainObject(domainType, (JsonNode)cosmosItemResponse.getItem()));
        }).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to upsert item", throwable, this.responseDiagnosticsProcessor));
    }

    @Override
    public Mono<Void> deleteById(String containerName, Object id, PartitionKey partitionKey) {
        return this.deleteById(containerName, id, partitionKey, new CosmosItemRequestOptions());
    }

    private Mono<Void> deleteById(String containerName, Object id, PartitionKey partitionKey, CosmosItemRequestOptions cosmosItemRequestOptions) {
        containerName = this.getContainerNameOverride(containerName);
        Assert.hasText((String)containerName, (String)"container name should not be null, empty or only whitespaces");
        String idToDelete = CosmosUtils.getStringIDValue(id);
        if (partitionKey == null) {
            partitionKey = PartitionKey.NONE;
        }
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).deleteItem(idToDelete, partitionKey, cosmosItemRequestOptions).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).doOnNext(cosmosItemResponse -> CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null)).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to delete item", throwable, this.responseDiagnosticsProcessor)).then();
    }

    @Override
    public <T> Mono<Void> deleteEntity(String containerName, T entity) {
        Assert.notNull(entity, (String)"entity to be deleted should not be null");
        Class<?> domainType = entity.getClass();
        JsonNode originalItem = this.mappingCosmosConverter.writeJsonNode(entity);
        CosmosItemRequestOptions options = new CosmosItemRequestOptions();
        this.applyVersioning(entity.getClass(), originalItem, options);
        return this.deleteItem(originalItem, containerName, domainType).then();
    }

    @Override
    public <S extends T, T> Mono<Void> deleteEntities(CosmosEntityInformation<T, ?> entityInformation, Iterable<S> entities) {
        return this.deleteEntities(entityInformation, Flux.fromIterable(entities));
    }

    @Override
    public <S extends T, T> Mono<Void> deleteEntities(CosmosEntityInformation<T, ?> entityInformation, Flux<S> entities) {
        Assert.notNull(entities, (String)"entities to be deleted should not be null");
        String containerName = entityInformation.getContainerName();
        Class domainType = entityInformation.getJavaType();
        Flux cosmosItemOperationFlux = entities.map(entity -> {
            JsonNode originalItem = this.mappingCosmosConverter.writeJsonNode(entity);
            PartitionKey partitionKey = new PartitionKey(entityInformation.getPartitionKeyFieldValue(entity));
            CosmosBulkItemRequestOptions options = new CosmosBulkItemRequestOptions();
            this.applyBulkVersioning(domainType, originalItem, options);
            return CosmosBulkOperations.getDeleteItemOperation((String)String.valueOf(entityInformation.getId(entity)), (PartitionKey)partitionKey, (CosmosBulkItemRequestOptions)options);
        });
        CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
        cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).executeBulkOperations(cosmosItemOperationFlux, cosmosBulkExecutionOptions).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to delete item(s)", throwable, this.responseDiagnosticsProcessor)).then();
    }

    @Override
    public Mono<Void> deleteAll(@NonNull String containerName, @NonNull Class<?> domainType) {
        Assert.hasText((String)containerName, (String)"container name should not be null, empty or only whitespaces");
        CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
        return this.delete(query, domainType, containerName).then();
    }

    @Override
    public <T> Flux<T> delete(CosmosQuery query, Class<T> domainType, String containerName) {
        String finalContainerName = this.getContainerNameOverride(containerName);
        Assert.notNull((Object)query, (String)"DocumentQuery should not be null.");
        Assert.notNull(domainType, (String)"domainType should not be null.");
        Assert.hasText((String)containerName, (String)"container name should not be null, empty or only whitespaces");
        CosmosEntityInformation entityInfo = CosmosEntityInformation.getInstance(domainType);
        Flux<JsonNode> results = this.findItems(query, finalContainerName, domainType);
        if (entityInfo.getPartitionKeyFieldName() != null) {
            Flux cosmosItemOperationFlux = results.map(item -> {
                Object object = this.toDomainObject(domainType, (JsonNode)item);
                Object id = entityInfo.getId(object);
                String idString = id != null ? id.toString() : "";
                CosmosBulkItemRequestOptions options = new CosmosBulkItemRequestOptions();
                this.applyBulkVersioning(domainType, (JsonNode)item, options);
                return CosmosBulkOperations.getDeleteItemOperation((String)idString, (PartitionKey)new PartitionKey(entityInfo.getPartitionKeyFieldValue(object)), (CosmosBulkItemRequestOptions)options, object);
            });
            CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
            cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);
            return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).executeBulkOperations(cosmosItemOperationFlux, cosmosBulkExecutionOptions).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to delete item(s)", throwable, this.responseDiagnosticsProcessor)).map(itemResponse -> itemResponse.getOperation().getContext());
        }
        return results.flatMap(d -> this.deleteItem((JsonNode)d, finalContainerName, domainType));
    }

    @Override
    public <T> Flux<T> find(CosmosQuery query, Class<T> domainType, String containerName) {
        return this.findItems(query, containerName, domainType).map(cosmosItemProperties -> this.emitOnLoadEventAndConvertToDomainObject(domainType, containerName, (JsonNode)cosmosItemProperties));
    }

    @Override
    public Mono<Boolean> exists(CosmosQuery query, Class<?> domainType, String containerName) {
        return this.count(query, containerName).flatMap(count -> Mono.just((Object)(count > 0L ? 1 : 0)));
    }

    @Override
    public Mono<Boolean> existsById(Object id, Class<?> domainType, String containerName) {
        return this.findById(containerName, id, domainType).flatMap(o -> Mono.just((Object)(o != null ? 1 : 0)));
    }

    @Override
    public Mono<Long> count(String containerName) {
        CosmosQuery query = new CosmosQuery(Criteria.getInstance(CriteriaType.ALL));
        return this.count(query, containerName);
    }

    @Override
    public Mono<Long> count(CosmosQuery query, String containerName) {
        SqlQuerySpec querySpec = new CountQueryGenerator().generateCosmos(query);
        return this.getCountValue(querySpec, containerName);
    }

    @Override
    public Mono<Long> count(SqlQuerySpec querySpec, String containerName) {
        return this.getCountValue(querySpec, containerName);
    }

    @Override
    public MappingCosmosConverter getConverter() {
        return this.mappingCosmosConverter;
    }

    @Override
    public <T> Flux<T> runQuery(SqlQuerySpec querySpec, Class<?> domainType, Class<T> returnType) {
        return this.runQuery(querySpec, Sort.unsorted(), domainType, returnType);
    }

    @Override
    public <T> Flux<T> runQuery(SqlQuerySpec querySpec, Sort sort, Class<?> domainType, Class<T> returnType) {
        SqlQuerySpec sortedQuerySpec = NativeQueryGenerator.getInstance().generateSortedQuery(querySpec, sort);
        return this.runQuery(sortedQuerySpec, domainType).map(cosmosItemProperties -> this.emitOnLoadEventAndConvertToDomainObject(returnType, this.getContainerName(domainType), (JsonNode)cosmosItemProperties));
    }

    private Flux<JsonNode> runQuery(SqlQuerySpec querySpec, Class<?> domainType) {
        String containerName = this.getContainerName(domainType);
        CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
        options.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        options.setMaxBufferedItemCount(this.maxBufferedItemCount);
        options.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).queryItems(querySpec, options, JsonNode.class).byPage().publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).flatMap(cosmosItemFeedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
            return Flux.fromIterable((Iterable)cosmosItemFeedResponse.getResults());
        }).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable, this.responseDiagnosticsProcessor));
    }

    private Mono<Long> getCountValue(SqlQuerySpec querySpec, String containerName) {
        CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
        options.setQueryMetricsEnabled(this.queryMetricsEnabled);
        options.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        options.setMaxBufferedItemCount(this.maxBufferedItemCount);
        options.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
        return this.executeQuery(querySpec, containerName, options).doOnNext(feedResponse -> CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, feedResponse.getCosmosDiagnostics(), feedResponse)).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to get count value", throwable, this.responseDiagnosticsProcessor)).next().map(r -> ((JsonNode)r.getResults().get(0)).asLong());
    }

    private Flux<FeedResponse<JsonNode>> executeQuery(SqlQuerySpec sqlQuerySpec, String containerName, CosmosQueryRequestOptions options) {
        containerName = this.getContainerNameOverride(containerName);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).queryItems(sqlQuerySpec, options, JsonNode.class).byPage().onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to execute query", throwable, this.responseDiagnosticsProcessor));
    }

    @Override
    public void deleteContainer(@NonNull String containerName) {
        containerName = this.getContainerNameOverride(containerName);
        Assert.hasText((String)containerName, (String)"containerName should have text.");
        this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).delete().doOnNext(cosmosContainerResponse -> CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosContainerResponse.getDiagnostics(), null)).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to delete container", throwable, this.responseDiagnosticsProcessor)).block();
    }

    @Override
    public String getContainerName(Class<?> domainType) {
        Assert.notNull(domainType, (String)"domainType should not be null");
        return this.getContainerNameOverride(CosmosEntityInformation.getInstance(domainType).getContainerName());
    }

    public String getContainerNameOverride(String containerName) {
        if (this.cosmosFactory.overrideContainerName() != null) {
            return this.cosmosFactory.overrideContainerName();
        }
        Assert.notNull((Object)containerName, (String)"containerName should not be null");
        return containerName;
    }

    private void markAuditedIfConfigured(Object object) {
        if (this.cosmosAuditingHandler != null) {
            this.cosmosAuditingHandler.markAudited(object);
        }
    }

    private <T> Flux<JsonNode> findItems(@NonNull CosmosQuery query, @NonNull String containerName, @NonNull Class<T> domainType) {
        containerName = this.getContainerNameOverride(containerName);
        SqlQuerySpec sqlQuerySpec = new FindQuerySpecGenerator().generateCosmos(query);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
        cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        cosmosQueryRequestOptions.setMaxBufferedItemCount(this.maxBufferedItemCount);
        cosmosQueryRequestOptions.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
        Optional<Object> partitionKeyValue = query.getPartitionKeyValue(domainType);
        partitionKeyValue.ifPresent(o -> {
            LOGGER.debug("Setting partition key {}", o);
            cosmosQueryRequestOptions.setPartitionKey(new PartitionKey(o));
        });
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class).byPage().publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).flatMap(cosmosItemFeedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
            return Flux.fromIterable((Iterable)cosmosItemFeedResponse.getResults());
        }).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to query items", throwable, this.responseDiagnosticsProcessor));
    }

    private <T> Mono<T> deleteItem(@NonNull JsonNode jsonNode, String containerName, @NonNull Class<T> domainType) {
        containerName = this.getContainerNameOverride(containerName);
        CosmosItemRequestOptions options = new CosmosItemRequestOptions();
        this.applyVersioning(domainType, jsonNode, options);
        return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName()).getContainer(containerName).deleteItem((Object)jsonNode, options).publishOn(CosmosSchedulers.SPRING_DATA_COSMOS_PARALLEL).map(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return cosmosItemResponse;
        }).flatMap(objectCosmosItemResponse -> Mono.just(this.toDomainObject(domainType, jsonNode))).onErrorResume(throwable -> CosmosExceptionUtils.exceptionHandler("Failed to delete item", throwable, this.responseDiagnosticsProcessor));
    }

    private <T> T emitOnLoadEventAndConvertToDomainObject(@NonNull Class<T> domainType, String containerName, JsonNode responseJsonNode) {
        containerName = this.getContainerNameOverride(containerName);
        this.maybeEmitEvent(new AfterLoadEvent<T>(responseJsonNode, domainType, containerName));
        return this.toDomainObject(domainType, responseJsonNode);
    }

    private <T> T toDomainObject(@NonNull Class<T> domainType, JsonNode jsonNode) {
        return this.mappingCosmosConverter.read(domainType, jsonNode);
    }

    private void applyVersioning(Class<?> domainType, JsonNode jsonNode, CosmosItemRequestOptions options) {
        CosmosEntityInformation<?, ?> entityInformation = CosmosEntityInformation.getInstance(domainType);
        if (entityInformation.isVersioned()) {
            options.setIfMatchETag(jsonNode.get("_etag").asText());
        }
    }

    private void applyBulkVersioning(Class<?> domainType, JsonNode jsonNode, CosmosBulkItemRequestOptions options) {
        CosmosEntityInformation<?, ?> entityInformation = CosmosEntityInformation.getInstance(domainType);
        if (entityInformation.isVersioned()) {
            options.setIfMatchETag(jsonNode.get("_etag").asText());
        }
    }

    private void maybeEmitEvent(CosmosMappingEvent<?> event) {
        if (this.canPublishEvent()) {
            this.applicationContext.publishEvent(event);
        }
    }

    private boolean canPublishEvent() {
        return this.applicationContext != null;
    }
}

