/*
 * Decompiled with CFR 0.152.
 */
package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.ChangeFeedOptions;
import com.azure.data.cosmos.CommonsBridgeInternal;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.CosmosItemProperties;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.internal.changefeed.CancellationToken;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedContextClient;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserver;
import com.azure.data.cosmos.internal.changefeed.PartitionCheckpointer;
import com.azure.data.cosmos.internal.changefeed.PartitionProcessor;
import com.azure.data.cosmos.internal.changefeed.ProcessorSettings;
import com.azure.data.cosmos.internal.changefeed.exceptions.PartitionNotFoundException;
import com.azure.data.cosmos.internal.changefeed.exceptions.PartitionSplitException;
import com.azure.data.cosmos.internal.changefeed.exceptions.TaskCancelledException;
import com.azure.data.cosmos.internal.changefeed.implementation.ChangeFeedObserverContextImpl;
import com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier;
import com.azure.data.cosmos.internal.changefeed.implementation.StatusCodeErrorType;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionProcessorImpl
implements PartitionProcessor {
    private static final Logger logger = LoggerFactory.getLogger(PartitionProcessorImpl.class);
    private static final int DefaultMaxItemCount = 100;
    private final ProcessorSettings settings;
    private final PartitionCheckpointer checkpointer;
    private final ChangeFeedObserver observer;
    private final ChangeFeedOptions options;
    private final ChangeFeedContextClient documentClient;
    private volatile RuntimeException resultException;
    private volatile String lastContinuation;
    private volatile boolean isFirstQueryForChangeFeeds;

    public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, ProcessorSettings settings, PartitionCheckpointer checkpointer) {
        this.observer = observer;
        this.documentClient = documentClient;
        this.settings = settings;
        this.checkpointer = checkpointer;
        this.options = new ChangeFeedOptions();
        this.options.maxItemCount(settings.getMaxItemCount());
        CommonsBridgeInternal.partitionKeyRangeIdInternal(this.options, settings.getPartitionKeyRangeId());
        this.options.startFromBeginning(settings.isStartFromBeginning());
        this.options.requestContinuation(settings.getStartContinuation());
        this.options.startDateTime(settings.getStartTime());
    }

    @Override
    public Mono<Void> run(CancellationToken cancellationToken) {
        this.lastContinuation = this.settings.getStartContinuation();
        this.isFirstQueryForChangeFeeds = true;
        this.options.requestContinuation(this.lastContinuation);
        return Flux.just((Object)this).flatMap(value -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.empty();
            }
            if (this.isFirstQueryForChangeFeeds) {
                this.isFirstQueryForChangeFeeds = false;
                return Flux.just((Object)value);
            }
            ZonedDateTime stopTimer = ZonedDateTime.now().plus(this.settings.getFeedPollDelay());
            return Mono.just((Object)value).delayElement(Duration.ofMillis(100L)).repeat(() -> {
                ZonedDateTime currentTime = ZonedDateTime.now();
                return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
            }).last();
        }).flatMap(value -> this.documentClient.createDocumentChangeFeedQuery(this.settings.getCollectionSelfLink(), this.options).limitRequest(1L)).flatMap(documentFeedResponse -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.error((Throwable)new TaskCancelledException());
            }
            this.lastContinuation = documentFeedResponse.continuationToken();
            if (documentFeedResponse.results() != null && documentFeedResponse.results().size() > 0) {
                return this.dispatchChanges((FeedResponse<CosmosItemProperties>)documentFeedResponse).doFinally(Void2 -> {
                    this.options.requestContinuation(this.lastContinuation);
                    if (cancellationToken.isCancellationRequested()) {
                        throw new TaskCancelledException();
                    }
                }).flux();
            }
            this.options.requestContinuation(this.lastContinuation);
            if (cancellationToken.isCancellationRequested()) {
                return Flux.error((Throwable)new TaskCancelledException());
            }
            return Flux.empty();
        }).doOnComplete(() -> {
            if (this.options.maxItemCount().compareTo(this.settings.getMaxItemCount()) != 0) {
                this.options.maxItemCount(this.settings.getMaxItemCount());
            }
        }).onErrorResume(throwable -> {
            if (throwable instanceof CosmosClientException) {
                CosmosClientException clientException = (CosmosClientException)throwable;
                logger.warn("Exception: partition {}", (Object)this.options.partitionKey().getInternalPartitionKey(), (Object)clientException);
                StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException);
                switch (docDbError) {
                    case PARTITION_NOT_FOUND: {
                        this.resultException = new PartitionNotFoundException("Partition not found.", this.lastContinuation);
                    }
                    case PARTITION_SPLIT: {
                        this.resultException = new PartitionSplitException("Partition split.", this.lastContinuation);
                    }
                    case UNDEFINED: {
                        this.resultException = new RuntimeException(clientException);
                    }
                    case MAX_ITEM_COUNT_TOO_LARGE: {
                        if (this.options.maxItemCount() == null) {
                            this.options.maxItemCount(100);
                        } else if (this.options.maxItemCount() <= 1) {
                            logger.error("Cannot reduce maxItemCount further as it's already at {}", (Object)this.options.maxItemCount(), (Object)clientException);
                            this.resultException = new RuntimeException(clientException);
                        }
                        this.options.maxItemCount(this.options.maxItemCount() / 2);
                        logger.warn("Reducing maxItemCount, new value: {}", (Object)this.options.maxItemCount());
                        return Flux.empty();
                    }
                    case TRANSIENT_ERROR: {
                        if (clientException.retryAfterInMilliseconds() <= 0L) break;
                        ZonedDateTime stopTimer = ZonedDateTime.now().plus(clientException.retryAfterInMilliseconds(), ChronoUnit.MILLIS);
                        return Mono.just((Object)clientException.retryAfterInMilliseconds()).delayElement(Duration.ofMillis(100L)).repeat(() -> {
                            ZonedDateTime currentTime = ZonedDateTime.now();
                            return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer);
                        }).flatMap(values -> Flux.empty());
                    }
                }
                logger.error("Unrecognized DocDbError enum value {}", (Object)docDbError, (Object)clientException);
                this.resultException = new RuntimeException(clientException);
            } else if (throwable instanceof TaskCancelledException) {
                logger.debug("Exception: partition {}", (Object)this.settings.getPartitionKeyRangeId(), throwable);
                this.resultException = (TaskCancelledException)throwable;
            }
            return Flux.error((Throwable)throwable);
        }).repeat(() -> {
            if (cancellationToken.isCancellationRequested()) {
                this.resultException = new TaskCancelledException();
                return false;
            }
            return true;
        }).onErrorResume(throwable -> Flux.empty()).then();
    }

    @Override
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private Mono<Void> dispatchChanges(FeedResponse<CosmosItemProperties> response) {
        ChangeFeedObserverContextImpl context = new ChangeFeedObserverContextImpl(this.settings.getPartitionKeyRangeId(), response, this.checkpointer);
        return this.observer.processChanges(context, response.results());
    }
}

