/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.internal.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.GoneException;
import com.azure.cosmos.RequestTimeoutException;
import com.azure.cosmos.internal.IAuthorizationTokenProvider;
import com.azure.cosmos.internal.ISessionContainer;
import com.azure.cosmos.internal.Integers;
import com.azure.cosmos.internal.RequestChargeTracker;
import com.azure.cosmos.internal.RxDocumentServiceRequest;
import com.azure.cosmos.internal.SessionTokenHelper;
import com.azure.cosmos.internal.Strings;
import com.azure.cosmos.internal.Utils;
import com.azure.cosmos.internal.directconnectivity.AddressInformation;
import com.azure.cosmos.internal.directconnectivity.AddressSelector;
import com.azure.cosmos.internal.directconnectivity.BarrierRequestHelper;
import com.azure.cosmos.internal.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.internal.directconnectivity.HttpUtils;
import com.azure.cosmos.internal.directconnectivity.ReadMode;
import com.azure.cosmos.internal.directconnectivity.ReplicatedResourceClient;
import com.azure.cosmos.internal.directconnectivity.RequestHelper;
import com.azure.cosmos.internal.directconnectivity.StoreReader;
import com.azure.cosmos.internal.directconnectivity.StoreResponse;
import com.azure.cosmos.internal.directconnectivity.StoreResult;
import com.azure.cosmos.internal.directconnectivity.TimeoutHelper;
import com.azure.cosmos.internal.directconnectivity.TransportClient;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.collections4.ComparatorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ConsistencyWriter {
    private static final int MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES = 30;
    private static final int DELAY_BETWEEN_WRITE_BARRIER_CALLS_IN_MS = 30;
    private static final int MAX_SHORT_BARRIER_RETRIES_FOR_MULTI_REGION = 4;
    private static final int SHORT_BARRIER_RETRY_INTERVAL_IN_MS_FOR_MULTI_REGION = 10;
    private final Logger logger = LoggerFactory.getLogger(ConsistencyWriter.class);
    private final TransportClient transportClient;
    private final AddressSelector addressSelector;
    private final ISessionContainer sessionContainer;
    private final IAuthorizationTokenProvider authorizationTokenProvider;
    private final boolean useMultipleWriteLocations;
    private final GatewayServiceConfigurationReader serviceConfigReader;
    private final StoreReader storeReader;

    public ConsistencyWriter(AddressSelector addressSelector, ISessionContainer sessionContainer, TransportClient transportClient, IAuthorizationTokenProvider authorizationTokenProvider, GatewayServiceConfigurationReader serviceConfigReader, boolean useMultipleWriteLocations) {
        this.transportClient = transportClient;
        this.addressSelector = addressSelector;
        this.sessionContainer = sessionContainer;
        this.authorizationTokenProvider = authorizationTokenProvider;
        this.useMultipleWriteLocations = useMultipleWriteLocations;
        this.serviceConfigReader = serviceConfigReader;
        this.storeReader = new StoreReader(transportClient, addressSelector, null);
    }

    public Mono<StoreResponse> writeAsync(RxDocumentServiceRequest entity, TimeoutHelper timeout, boolean forceRefresh) {
        if (timeout.isElapsed()) {
            return Mono.error((Throwable)new RequestTimeoutException());
        }
        String sessionToken = entity.getHeaders().get("x-ms-session-token");
        return this.writePrivateAsync(entity, timeout, forceRefresh).doOnEach(arg -> {
            try {
                SessionTokenHelper.setOriginalSessionToken(entity, sessionToken);
            }
            catch (Throwable throwable) {
                this.logger.error("Unexpected failure in handling orig [{}]: new [{}]", new Object[]{arg, throwable.getMessage(), throwable});
            }
        });
    }

    Mono<StoreResponse> writePrivateAsync(RxDocumentServiceRequest request, TimeoutHelper timeout, boolean forceRefresh) {
        if (timeout.isElapsed()) {
            return Mono.error((Throwable)new RequestTimeoutException());
        }
        request.requestContext.timeoutHelper = timeout;
        if (request.requestContext.requestChargeTracker == null) {
            request.requestContext.requestChargeTracker = new RequestChargeTracker();
        }
        if (request.requestContext.cosmosResponseDiagnostics == null) {
            request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
        }
        request.requestContext.forceRefreshAddressCache = forceRefresh;
        if (request.requestContext.globalStrongWriteResponse == null) {
            Mono<List<AddressInformation>> replicaAddressesObs = this.addressSelector.resolveAddressesAsync(request, forceRefresh);
            AtomicReference primaryURI = new AtomicReference();
            return replicaAddressesObs.flatMap(replicaAddresses -> {
                try {
                    ArrayList<URI> contactedReplicas = new ArrayList<URI>();
                    replicaAddresses.forEach(replicaAddress -> contactedReplicas.add(HttpUtils.toURI(replicaAddress.getPhysicalUri())));
                    BridgeInternal.setContactedReplicas(request.requestContext.cosmosResponseDiagnostics, contactedReplicas);
                    return Mono.just((Object)AddressSelector.getPrimaryUri(request, replicaAddresses));
                }
                catch (GoneException e) {
                    return Mono.error((Throwable)e);
                }
            }).flatMap(primaryUri -> {
                try {
                    primaryURI.set(primaryUri);
                    if (this.useMultipleWriteLocations && RequestHelper.GetConsistencyLevelToUse(this.serviceConfigReader, request) == ConsistencyLevel.SESSION) {
                        SessionTokenHelper.setPartitionLocalSessionToken(request, this.sessionContainer);
                    } else {
                        SessionTokenHelper.validateAndRemoveSessionToken(request);
                    }
                }
                catch (Exception e) {
                    return Mono.error((Throwable)e);
                }
                return this.transportClient.invokeResourceOperationAsync((URI)primaryUri, request).doOnError(t -> {
                    try {
                        Integer result;
                        Throwable unwrappedException = Exceptions.unwrap((Throwable)t);
                        CosmosClientException ex = Utils.as(unwrappedException, CosmosClientException.class);
                        try {
                            BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request, this.storeReader.createStoreResult(null, ex, false, false, (URI)primaryUri));
                        }
                        catch (CosmosClientException e) {
                            this.logger.error("Error occurred while recording response", (Throwable)e);
                        }
                        String value = ex.getResponseHeaders().get("x-ms-write-request-trigger-refresh");
                        if (!Strings.isNullOrWhiteSpace(value) && (result = Integers.tryParse(value)) != null && result == 1) {
                            this.startBackgroundAddressRefresh(request);
                        }
                    }
                    catch (Throwable throwable) {
                        this.logger.error("Unexpected failure in handling orig [{}]", (Object)t.getMessage(), t);
                        this.logger.error("Unexpected failure in handling orig [{}] : new [{}]", new Object[]{t.getMessage(), throwable.getMessage(), throwable});
                    }
                });
            }).flatMap(response -> {
                try {
                    BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request, this.storeReader.createStoreResult((StoreResponse)response, null, false, false, (URI)primaryURI.get()));
                }
                catch (CosmosClientException e) {
                    this.logger.error("Error occurred while recording response", (Throwable)e);
                }
                return this.barrierForGlobalStrong(request, (StoreResponse)response);
            });
        }
        Mono<RxDocumentServiceRequest> barrierRequestObs = BarrierRequestHelper.createAsync(request, this.authorizationTokenProvider, null, request.requestContext.globalCommittedSelectedLSN);
        return barrierRequestObs.flatMap(barrierRequest -> this.waitForWriteBarrierAsync((RxDocumentServiceRequest)barrierRequest, request.requestContext.globalCommittedSelectedLSN).flatMap(v -> {
            if (!v.booleanValue()) {
                this.logger.warn("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", (Object)request.requestContext.globalCommittedSelectedLSN);
                return Mono.error((Throwable)new GoneException("Global STRONG write barrier has not been met for the request."));
            }
            return Mono.just((Object)request);
        })).map(req -> req.requestContext.globalStrongWriteResponse);
    }

    boolean isGlobalStrongRequest(RxDocumentServiceRequest request, StoreResponse response) {
        if (this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG) {
            int numberOfReadRegions = -1;
            String headerValue = null;
            headerValue = response.getHeaderValue("x-ms-number-of-read-regions");
            if (headerValue != null) {
                numberOfReadRegions = Integer.parseInt(headerValue);
            }
            if (numberOfReadRegions > 0 && this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG) {
                return true;
            }
        }
        return false;
    }

    Mono<StoreResponse> barrierForGlobalStrong(RxDocumentServiceRequest request, StoreResponse response) {
        try {
            if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) {
                Utils.ValueHolder<Long> lsn = Utils.ValueHolder.initialize(-1L);
                Utils.ValueHolder<Long> globalCommittedLsn = Utils.ValueHolder.initialize(-1L);
                ConsistencyWriter.getLsnAndGlobalCommittedLsn(response, lsn, globalCommittedLsn);
                if ((Long)lsn.v == -1L || (Long)globalCommittedLsn.v == -1L) {
                    this.logger.error("ConsistencyWriter: lsn {} or GlobalCommittedLsn {} is not set for global strong request", lsn, globalCommittedLsn);
                    throw new GoneException("The requested resource is no longer available at the server.");
                }
                request.requestContext.globalStrongWriteResponse = response;
                request.requestContext.globalCommittedSelectedLSN = (Long)lsn.v;
                request.requestContext.forceRefreshAddressCache = false;
                this.logger.debug("ConsistencyWriter: globalCommittedLsn {}, lsn {}", globalCommittedLsn, lsn);
                if ((Long)globalCommittedLsn.v < (Long)lsn.v) {
                    Mono<RxDocumentServiceRequest> barrierRequestObs = BarrierRequestHelper.createAsync(request, this.authorizationTokenProvider, null, request.requestContext.globalCommittedSelectedLSN);
                    return barrierRequestObs.flatMap(barrierRequest -> {
                        Mono<Boolean> barrierWait = this.waitForWriteBarrierAsync((RxDocumentServiceRequest)barrierRequest, request.requestContext.globalCommittedSelectedLSN);
                        return barrierWait.flatMap(res -> {
                            if (!res.booleanValue()) {
                                this.logger.error("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", (Object)request.requestContext.globalCommittedSelectedLSN);
                                return Mono.error((Throwable)new GoneException("Global STRONG write barrier has not been met for the request."));
                            }
                            return Mono.just((Object)request.requestContext.globalStrongWriteResponse);
                        });
                    });
                }
                return Mono.just((Object)request.requestContext.globalStrongWriteResponse);
            }
            return Mono.just((Object)response);
        }
        catch (CosmosClientException e) {
            return Mono.error((Throwable)e);
        }
    }

    private Mono<Boolean> waitForWriteBarrierAsync(RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) {
        AtomicInteger writeBarrierRetryCount = new AtomicInteger(30);
        AtomicLong maxGlobalCommittedLsnReceived = new AtomicLong(0L);
        return Flux.defer(() -> {
            if (barrierRequest.requestContext.timeoutHelper.isElapsed()) {
                return Flux.error((Throwable)new RequestTimeoutException());
            }
            Mono<List<StoreResult>> storeResultListObs = this.storeReader.readMultipleReplicaAsync(barrierRequest, true, 1, false, false, ReadMode.Strong, false, false);
            return storeResultListObs.flatMap(responses -> {
                if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
                    return Mono.just((Object)Boolean.TRUE);
                }
                long maxGlobalCommittedLsn = responses != null || !responses.isEmpty() ? responses.stream().map(s -> s.globalCommittedLSN).max(ComparatorUtils.NATURAL_COMPARATOR).get() : 0L;
                maxGlobalCommittedLsnReceived.set(maxGlobalCommittedLsnReceived.get() > maxGlobalCommittedLsn ? maxGlobalCommittedLsnReceived.get() : maxGlobalCommittedLsn);
                barrierRequest.requestContext.forceRefreshAddressCache = false;
                if (writeBarrierRetryCount.getAndDecrement() == 0) {
                    this.logger.debug("ConsistencyWriter: WaitForWriteBarrierAsync - Last barrier multi-region strong. Responses: {}", (Object)responses.stream().map(StoreResult::toString).collect(Collectors.joining("; ")));
                }
                return Mono.empty();
            }).flux();
        }).repeatWhen(s -> {
            if (writeBarrierRetryCount.get() == 0) {
                return Flux.empty();
            }
            if (30 - writeBarrierRetryCount.get() > 4) {
                return Flux.just((Object)0L).delayElements(Duration.ofMillis(30L));
            }
            return Flux.just((Object)0L).delayElements(Duration.ofMillis(10L));
        }).take(1L).switchIfEmpty((Publisher)Mono.defer(() -> {
            this.logger.debug("ConsistencyWriter: Highest global committed lsn received for write barrier call is {}", (Object)maxGlobalCommittedLsnReceived);
            return Mono.just((Object)false);
        })).map(r -> r).single();
    }

    static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolder<Long> lsn, Utils.ValueHolder<Long> globalCommittedLsn) {
        lsn.v = -1L;
        globalCommittedLsn.v = -1L;
        String headerValue = response.getHeaderValue("lsn");
        if (headerValue != null) {
            lsn.v = Long.parseLong(headerValue);
        }
        if ((headerValue = response.getHeaderValue("x-ms-global-Committed-lsn")) != null) {
            globalCommittedLsn.v = Long.parseLong(headerValue);
        }
    }

    void startBackgroundAddressRefresh(RxDocumentServiceRequest request) {
        this.addressSelector.resolvePrimaryUriAsync(request, true).publishOn(Schedulers.elastic()).subscribe(r -> {}, e -> this.logger.warn("Background refresh of the primary address failed with {}", (Object)e.getMessage(), e));
    }
}

