/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.fullfidelity;

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverFactory;
import com.azure.cosmos.implementation.changefeed.CheckpointFrequency;
import com.azure.cosmos.implementation.changefeed.HealthMonitor;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy;
import com.azure.cosmos.implementation.changefeed.PartitionManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedContextClientImpl;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.CheckpointerObserverFactory;
import com.azure.cosmos.implementation.changefeed.common.DefaultObserverFactory;
import com.azure.cosmos.implementation.changefeed.common.EqualPartitionsBalancingStrategy;
import com.azure.cosmos.implementation.changefeed.common.PartitionedByIdCollectionRequestOptionsFactory;
import com.azure.cosmos.implementation.changefeed.common.TraceHealthMonitor;
import com.azure.cosmos.implementation.changefeed.fullfidelity.BootstrapperImpl;
import com.azure.cosmos.implementation.changefeed.fullfidelity.HealthMonitoringPartitionControllerDecorator;
import com.azure.cosmos.implementation.changefeed.fullfidelity.LeaseStoreManagerImpl;
import com.azure.cosmos.implementation.changefeed.fullfidelity.PartitionControllerImpl;
import com.azure.cosmos.implementation.changefeed.fullfidelity.PartitionLoadBalancerImpl;
import com.azure.cosmos.implementation.changefeed.fullfidelity.PartitionManagerImpl;
import com.azure.cosmos.implementation.changefeed.fullfidelity.PartitionProcessorFactoryImpl;
import com.azure.cosmos.implementation.changefeed.fullfidelity.PartitionSupervisorFactoryImpl;
import com.azure.cosmos.implementation.changefeed.fullfidelity.PartitionSynchronizerImpl;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.models.ChangeFeedProcessorState;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ChangeFeedProcessorBuilderImpl
implements ChangeFeedProcessor,
AutoCloseable {
    private final Logger logger = LoggerFactory.getLogger(ChangeFeedProcessorBuilderImpl.class);
    private final Duration sleepTime = Duration.ofSeconds(15L);
    private final Duration lockTime = Duration.ofSeconds(30L);
    private static final int DEFAULT_QUERY_PARTITIONS_MAX_BATCH_SIZE = 100;
    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 25;
    private String hostName;
    private ChangeFeedContextClient feedContextClient;
    private ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private ChangeFeedObserverFactory<ChangeFeedProcessorItem> observerFactory;
    private volatile String databaseResourceId;
    private volatile String collectionResourceId;
    private ChangeFeedContextClient leaseContextClient;
    private PartitionLoadBalancingStrategy loadBalancingStrategy;
    private LeaseStoreManager leaseStoreManager;
    private HealthMonitor healthMonitor;
    private volatile PartitionManager partitionManager;
    private Scheduler scheduler;

    @Override
    public Mono<Void> start() {
        if (this.partitionManager == null) {
            return this.initializeCollectionPropertiesForBuild().flatMap(value -> this.getLeaseStoreManager().flatMap(this::buildPartitionManager)).flatMap(partitionManager1 -> {
                this.partitionManager = partitionManager1;
                return this.partitionManager.start();
            });
        }
        return this.partitionManager.start();
    }

    @Override
    public Mono<Void> stop() {
        if (this.partitionManager == null || !this.partitionManager.isRunning()) {
            throw new IllegalStateException("The ChangeFeedProcessor instance has not fully started");
        }
        return this.partitionManager.stop();
    }

    @Override
    public boolean isStarted() {
        return this.partitionManager != null && this.partitionManager.isRunning();
    }

    @Override
    public Mono<Map<String, Integer>> getEstimatedLag() {
        throw new UnsupportedOperationException("getEstimatedLag() API is not supported on Full Fidelity Change Feed Processor. Use getCurrentState() instead");
    }

    @Override
    public Mono<List<ChangeFeedProcessorState>> getCurrentState() {
        if (this.leaseContextClient == null || this.feedContextClient == null) {
            return Mono.just(Collections.unmodifiableList(new ArrayList()));
        }
        return this.initializeCollectionPropertiesForBuild().flatMap(value -> this.getLeaseStoreManager()).flatMap(leaseStoreManager1 -> leaseStoreManager1.getAllLeases().flatMap(lease -> {
            CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions.createForProcessingFromNow(lease.getFeedRange()).setMaxItemCount(1).allVersionsAndDeletes();
            return this.feedContextClient.createDocumentChangeFeedQuery(this.feedContextClient.getContainerClient(), options, ChangeFeedProcessorItem.class).take(1L).map(feedResponse -> {
                ChangeFeedProcessorState changeFeedProcessorState = new ChangeFeedProcessorState().setHostName(lease.getOwner()).setLeaseToken(lease.getLeaseToken());
                int latestLsn = 0;
                int estimatedLag = 0;
                int currentLsn = 0;
                try {
                    latestLsn = this.getLsnFromEncodedContinuationToken(feedResponse.getContinuationToken());
                    changeFeedProcessorState.setContinuationToken(feedResponse.getContinuationToken());
                    if (Strings.isNullOrWhiteSpace(lease.getContinuationToken())) {
                        estimatedLag = latestLsn - 1;
                    } else {
                        currentLsn = this.getLsnFromEncodedContinuationToken(lease.getContinuationToken());
                        estimatedLag = latestLsn - currentLsn;
                    }
                }
                catch (NumberFormatException ex) {
                    this.logger.warn("Unexpected Cosmos LSN found", (Throwable)ex);
                    changeFeedProcessorState.setEstimatedLag(-1);
                }
                changeFeedProcessorState.setEstimatedLag(estimatedLag);
                return changeFeedProcessorState;
            });
        }).collectList().map(Collections::unmodifiableList));
    }

    public ChangeFeedProcessorBuilderImpl hostName(String hostName) {
        this.hostName = hostName;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl feedContainer(CosmosAsyncContainer feedDocumentClient) {
        if (feedDocumentClient == null) {
            throw new IllegalArgumentException("feedContextClient");
        }
        this.feedContextClient = new ChangeFeedContextClientImpl(feedDocumentClient);
        return this;
    }

    public ChangeFeedProcessorBuilderImpl options(ChangeFeedProcessorOptions changeFeedProcessorOptions) {
        if (changeFeedProcessorOptions == null) {
            throw new IllegalArgumentException("changeFeedProcessorOptions");
        }
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl observerFactory(ChangeFeedObserverFactory<ChangeFeedProcessorItem> observerFactory) {
        if (observerFactory == null) {
            throw new IllegalArgumentException("observerFactory");
        }
        this.observerFactory = observerFactory;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl handleChanges(Consumer<List<ChangeFeedProcessorItem>> consumer) {
        return this.observerFactory(new DefaultObserverFactory<ChangeFeedProcessorItem>(consumer));
    }

    public ChangeFeedProcessorBuilderImpl leaseContainer(CosmosAsyncContainer leaseClient) {
        if (leaseClient == null) {
            throw new IllegalArgumentException("leaseClient");
        }
        if (!CosmosBridgeInternal.getContextClient(leaseClient).isContentResponseOnWriteEnabled()) {
            throw new IllegalArgumentException("leaseClient: content response on write setting must be enabled");
        }
        ConsistencyLevel consistencyLevel = CosmosBridgeInternal.getContextClient(leaseClient).getConsistencyLevel();
        if (consistencyLevel == ConsistencyLevel.CONSISTENT_PREFIX || consistencyLevel == ConsistencyLevel.EVENTUAL) {
            this.logger.warn("leaseClient consistency level setting are less then expected which is SESSION");
        }
        this.leaseContextClient = new ChangeFeedContextClientImpl(leaseClient);
        return this;
    }

    public ChangeFeedProcessor build() {
        if (this.hostName == null) {
            throw new IllegalArgumentException("Host name was not specified");
        }
        if (this.observerFactory == null) {
            throw new IllegalArgumentException("Observer was not specified");
        }
        if (this.changeFeedProcessorOptions != null && this.changeFeedProcessorOptions.getLeaseAcquireInterval().compareTo(ChangeFeedProcessorOptions.DEFAULT_ACQUIRE_INTERVAL) < 0) {
            this.logger.warn("Found lower than expected setting for leaseAcquireInterval");
        }
        if (this.changeFeedProcessorOptions == null) {
            this.changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
        }
        this.scheduler = this.changeFeedProcessorOptions.getScheduler();
        this.feedContextClient.setScheduler(this.scheduler);
        this.leaseContextClient.setScheduler(this.scheduler);
        return this;
    }

    private Mono<ChangeFeedProcessor> initializeCollectionPropertiesForBuild() {
        return this.feedContextClient.readDatabase(this.feedContextClient.getDatabaseClient(), null).map(databaseResourceResponse -> {
            this.databaseResourceId = databaseResourceResponse.getProperties().getResourceId();
            return this.databaseResourceId;
        }).flatMap(id -> this.feedContextClient.readContainer(this.feedContextClient.getContainerClient(), null).map(documentCollectionResourceResponse -> {
            this.collectionResourceId = documentCollectionResourceResponse.getProperties().getResourceId();
            return this;
        }));
    }

    private Mono<LeaseStoreManager> getLeaseStoreManager() {
        if (this.leaseStoreManager == null) {
            return this.leaseContextClient.readContainerSettings(this.leaseContextClient.getContainerClient(), null).flatMap(collectionSettings -> {
                boolean isPartitioned;
                boolean bl = isPartitioned = collectionSettings.getPartitionKeyDefinition() != null && collectionSettings.getPartitionKeyDefinition().getPaths() != null && collectionSettings.getPartitionKeyDefinition().getPaths().size() > 0;
                if (!isPartitioned || collectionSettings.getPartitionKeyDefinition().getPaths().size() != 1 || !collectionSettings.getPartitionKeyDefinition().getPaths().get(0).equals("/id")) {
                    return Mono.error((Throwable)new IllegalArgumentException("The lease collection must have partition key equal to id."));
                }
                PartitionedByIdCollectionRequestOptionsFactory requestOptionsFactory = new PartitionedByIdCollectionRequestOptionsFactory();
                String leasePrefix = this.getLeasePrefix();
                return LeaseStoreManagerImpl.builder().leasePrefix(leasePrefix).leaseCollectionLink(this.leaseContextClient.getContainerClient()).leaseContextClient(this.leaseContextClient).requestOptionsFactory(requestOptionsFactory).hostName(this.hostName).build().map(manager -> {
                    this.leaseStoreManager = manager;
                    return this.leaseStoreManager;
                });
            });
        }
        return Mono.just((Object)this.leaseStoreManager);
    }

    private String getLeasePrefix() {
        String optionsPrefix = this.changeFeedProcessorOptions.getLeasePrefix();
        if (optionsPrefix == null) {
            optionsPrefix = "";
        }
        URI uri = this.feedContextClient.getServiceEndpoint();
        return String.format("%s%s_%s_%s", optionsPrefix, uri.getHost(), this.databaseResourceId, this.collectionResourceId);
    }

    private Mono<PartitionManager> buildPartitionManager(LeaseStoreManager leaseStoreManager) {
        CheckpointerObserverFactory<ChangeFeedProcessorItem> factory = new CheckpointerObserverFactory<ChangeFeedProcessorItem>(this.observerFactory, new CheckpointFrequency());
        PartitionSynchronizerImpl synchronizer = new PartitionSynchronizerImpl(this.feedContextClient, this.feedContextClient.getContainerClient(), leaseStoreManager, leaseStoreManager, 25, 100, this.collectionResourceId);
        BootstrapperImpl bootstrapper = new BootstrapperImpl(synchronizer, leaseStoreManager, this.lockTime, this.sleepTime);
        PartitionSupervisorFactoryImpl partitionSupervisorFactory = new PartitionSupervisorFactoryImpl(factory, leaseStoreManager, new PartitionProcessorFactoryImpl(this.feedContextClient, this.changeFeedProcessorOptions, leaseStoreManager, this.feedContextClient.getContainerClient(), this.collectionResourceId), this.changeFeedProcessorOptions, this.scheduler);
        if (this.loadBalancingStrategy == null) {
            this.loadBalancingStrategy = new EqualPartitionsBalancingStrategy(this.hostName, this.changeFeedProcessorOptions.getMinScaleCount(), this.changeFeedProcessorOptions.getMaxScaleCount(), this.changeFeedProcessorOptions.getLeaseExpirationInterval());
        }
        PartitionControllerImpl partitionController = new PartitionControllerImpl(leaseStoreManager, leaseStoreManager, partitionSupervisorFactory, synchronizer, this.scheduler);
        if (this.healthMonitor == null) {
            this.healthMonitor = new TraceHealthMonitor();
        }
        HealthMonitoringPartitionControllerDecorator partitionController2 = new HealthMonitoringPartitionControllerDecorator(partitionController, this.healthMonitor);
        PartitionLoadBalancerImpl partitionLoadBalancer = new PartitionLoadBalancerImpl(partitionController2, leaseStoreManager, this.loadBalancingStrategy, this.changeFeedProcessorOptions.getLeaseAcquireInterval(), this.scheduler);
        PartitionManagerImpl partitionManager = new PartitionManagerImpl(bootstrapper, partitionController, partitionLoadBalancer);
        return Mono.just((Object)partitionManager);
    }

    private int getLsnFromEncodedContinuationToken(String continuationToken) {
        ChangeFeedState changeFeedState = ChangeFeedState.fromString(continuationToken);
        String token = changeFeedState.getContinuation().getCurrentContinuationToken().getToken();
        token = token.replace("\"", "");
        int lsn = Integer.parseInt(token);
        return lsn;
    }

    @Override
    public void close() {
        this.stop().subscribeOn(Schedulers.boundedElastic()).subscribe();
    }
}

