/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.error.CasMismatchException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DocumentExistsException;
import com.couchbase.client.core.error.DocumentNotFoundException;
import com.couchbase.client.core.error.ErrorCodeAndMessage;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.context.ErrorContext;
import com.couchbase.client.core.error.context.QueryErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.kv.MutationToken;
import com.couchbase.client.core.msg.query.QueryRequest;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.retry.reactor.RetryExhaustedException;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.ReactiveScope;
import com.couchbase.client.java.SDKAccessUtil;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.codec.JsonSerializer;
import com.couchbase.client.java.codec.RawJsonTranscoder;
import com.couchbase.client.java.codec.Transcoder;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.InsertOptions;
import com.couchbase.client.java.kv.LookupInOptions;
import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutateInMacro;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInResult;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.RemoveOptions;
import com.couchbase.client.java.kv.StoreSemantics;
import com.couchbase.client.java.query.QueryMetaData;
import com.couchbase.client.java.query.QueryOptions;
import com.couchbase.client.java.query.QueryResult;
import com.couchbase.client.java.query.QueryScanConsistency;
import com.couchbase.client.java.query.QueryStatus;
import com.couchbase.client.java.query.ReactiveQueryResult;
import com.couchbase.transactions.LockTokens;
import com.couchbase.transactions.TransactionAttempt;
import com.couchbase.transactions.TransactionContext;
import com.couchbase.transactions.TransactionDurabilityLevel;
import com.couchbase.transactions.TransactionGetResult;
import com.couchbase.transactions.TransactionInsertOptions;
import com.couchbase.transactions.TransactionJsonDocumentStatus;
import com.couchbase.transactions.TransactionQueryOptions;
import com.couchbase.transactions.TransactionReplaceOptions;
import com.couchbase.transactions.TransactionResult;
import com.couchbase.transactions.TransactionsReactive;
import com.couchbase.transactions.atr.ATRIds;
import com.couchbase.transactions.cleanup.CleanupRequest;
import com.couchbase.transactions.cleanup.TransactionsCleanup;
import com.couchbase.transactions.components.ATREntry;
import com.couchbase.transactions.components.ATRUtil;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.components.DocRecord;
import com.couchbase.transactions.components.DocumentGetter;
import com.couchbase.transactions.components.DocumentMetadata;
import com.couchbase.transactions.components.DurabilityLevelUtil;
import com.couchbase.transactions.components.SerializationUtil;
import com.couchbase.transactions.config.MergedTransactionConfig;
import com.couchbase.transactions.deferred.TransactionSerializedContext;
import com.couchbase.transactions.error.RetryTransaction;
import com.couchbase.transactions.error.TransactionCommitAmbiguous;
import com.couchbase.transactions.error.TransactionExpired;
import com.couchbase.transactions.error.TransactionFailed;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordEntryNotFound;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordFull;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordNotFound;
import com.couchbase.transactions.error.external.CommitNotPermitted;
import com.couchbase.transactions.error.external.ConcurrentOperationsDetectedOnSameDocument;
import com.couchbase.transactions.error.external.ForwardCompatibilityFailure;
import com.couchbase.transactions.error.external.PreviousOperationFailed;
import com.couchbase.transactions.error.external.RollbackNotPermitted;
import com.couchbase.transactions.error.external.TransactionAlreadyAborted;
import com.couchbase.transactions.error.external.TransactionAlreadyCommitted;
import com.couchbase.transactions.error.external.TransactionOperationFailed;
import com.couchbase.transactions.error.internal.AttemptExpired;
import com.couchbase.transactions.error.internal.AttemptNotFoundOnQuery;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.error.internal.ForwardCompatibilityRequiresRetry;
import com.couchbase.transactions.error.internal.RetryAtrCommit;
import com.couchbase.transactions.error.internal.RetryOperation;
import com.couchbase.transactions.error.internal.TransactionOperationFailedBuilder;
import com.couchbase.transactions.forwards.ForwardCompatibility;
import com.couchbase.transactions.forwards.ForwardCompatibilityStages;
import com.couchbase.transactions.forwards.Supported;
import com.couchbase.transactions.log.IllegalDocumentState;
import com.couchbase.transactions.log.TransactionLogger;
import com.couchbase.transactions.query.QueryAccessor;
import com.couchbase.transactions.support.AttemptStates;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.support.SpanWrapperUtil;
import com.couchbase.transactions.support.StagedMutation;
import com.couchbase.transactions.support.StagedMutationType;
import com.couchbase.transactions.util.DebugUtil;
import com.couchbase.transactions.util.LogDeferThrowable;
import com.couchbase.transactions.util.MonoBridge;
import com.couchbase.transactions.util.ReactiveLock;
import com.couchbase.transactions.util.ReactiveWaitGroup;
import com.couchbase.transactions.util.SchedulerUtil;
import com.couchbase.transactions.util.TriFunction;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

public class AttemptContextReactive {
    public static int TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED = 1;
    public static int TRANSACTION_STATE_BIT_APP_ROLLBACK_NOT_ALLOWED = 2;
    public static int TRANSACTION_STATE_BIT_SHOULD_NOT_ROLLBACK = 4;
    public static int TRANSACTION_STATE_BIT_SHOULD_NOT_RETRY = 8;
    public static int STATE_BITS_POSITION_FINAL_ERROR = 4;
    public static int STATE_BITS_MASK_FINAL_ERROR = 112;
    public static int STATE_BITS_MASK_BITS = 15;
    private final AtomicInteger stateBits = new AtomicInteger(0);
    private final MergedTransactionConfig config;
    private final ArrayList<StagedMutation> stagedMutationsLocked = new ArrayList();
    private final String attemptId;
    private final TransactionContext overall;
    private final Duration startTimeClient;
    private Optional<Duration> startTimeServer;
    private Optional<String> atrId = Optional.empty();
    private Optional<ReactiveCollection> atrCollection = Optional.empty();
    final TransactionLogger LOGGER;
    private AttemptStates state = AttemptStates.NOT_STARTED;
    private final TransactionsReactive parent;
    private final SpanWrapper attemptSpan;
    private volatile boolean expiryOvertimeMode = false;
    @Nullable
    private volatile NodeIdentifier queryTarget = null;
    private final AtomicInteger queryStatementIdx = new AtomicInteger(0);
    private final boolean lockDebugging = Boolean.parseBoolean(System.getProperty("com.couchbase.transactions.debug.lock", "true"));
    private final boolean monoBridgeDebugging = Boolean.parseBoolean(System.getProperty("com.couchbase.transactions.debug.monoBridge", "false"));
    private final boolean threadSafetyEnabled = Boolean.parseBoolean(System.getProperty("com.couchbase.transactions.threadSafety", "true"));
    private final ReactiveWaitGroup kvOps = new ReactiveWaitGroup(this, this.lockDebugging);
    private final ReactiveLock mutex = new ReactiveLock(this, this.lockDebugging);
    private final int EXPIRY_THRESHOLD = Integer.parseInt(System.getProperty("com.couchbase.transactions.expiryThresholdMs", "10"));
    public static String HOOK_ROLLBACK = "rollback";
    public static String HOOK_GET = "get";
    public static String HOOK_INSERT = "insert";
    public static String HOOK_REPLACE = "replace";
    public static String HOOK_REMOVE = "remove";
    public static String HOOK_BEFORE_COMMIT = "commit";
    public static String HOOK_ROLLBACK_DOC = "rollbackDoc";
    public static String HOOK_DELETE_INSERTED = "deleteInserted";
    public static String HOOK_REMOVE_STAGED_INSERT = "removeStagedInsert";
    public static String HOOK_CREATE_STAGED_INSERT = "createdStagedInsert";
    public static String HOOK_REMOVE_DOC = "removeDoc";
    public static String HOOK_COMMIT_DOC = "commitDoc";
    public static String HOOK_QUERY = "query";
    public static String HOOK_QUERY_BEGIN_WORK = "queryBeginWork";
    public static String HOOK_QUERY_COMMIT = "queryCommit";
    public static String HOOK_QUERY_ROLLBACK = "queryRollback";
    public static String HOOK_QUERY_KV_GET = "queryKvGet";
    public static String HOOK_QUERY_KV_REPLACE = "queryKvReplace";
    public static String HOOK_QUERY_KV_REMOVE = "queryKvRemove";
    public static String HOOK_QUERY_KV_INSERT = "queryKvInsert";
    public static String HOOK_BEFORE_RETRY = "beforeRetry";
    public static String HOOK_ATR_COMMIT = "atrCommit";
    public static String HOOK_ATR_COMMIT_AMBIGUITY_RESOLUTION = "atrCommitAmbiguityResolution";
    public static String HOOK_ATR_ABORT = "atrAbort";
    public static String HOOK_ATR_ROLLBACK_COMPLETE = "atrRollbackComplete";
    public static String HOOK_ATR_PENDING = "atrPending";
    public static String HOOK_ATR_COMPLETE = "atrComplete";
    public static Duration DEFAULT_DELAY_RETRYING_OPERATION = Duration.ofMillis(3L);
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY = Retry.anyOf((Class[])new Class[]{RetryOperation.class}).exponentialBackoff(Duration.of(1L, ChronoUnit.MILLIS), Duration.of(100L, ChronoUnit.MILLIS)).jitter(Jitter.random()).toReactorRetry();
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY = Retry.anyOf((Class[])new Class[]{RetryOperation.class}).fixedBackoff(DEFAULT_DELAY_RETRYING_OPERATION).toReactorRetry();
    private final List<MutationToken> finalMutationTokens = new ArrayList<MutationToken>();

    public AttemptContextReactive(TransactionContext overall, MergedTransactionConfig config, String attemptId, TransactionsReactive parent, Optional<SpanWrapper> parentSpan) {
        Objects.requireNonNull(overall);
        Objects.requireNonNull(config);
        Objects.requireNonNull(attemptId);
        Objects.requireNonNull(parent);
        this.LOGGER = overall.LOGGER;
        this.overall = overall;
        this.config = config;
        this.attemptId = attemptId;
        this.startTimeClient = Duration.of(System.nanoTime(), ChronoUnit.NANOS);
        this.parent = parent;
        this.attemptSpan = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "attempt", parentSpan.orElse(null));
    }

    private JsonObject dehydrate(boolean includeMutationBody) {
        JsonObject ob = JsonObject.create();
        ob.put("transactionId", this.transactionId());
        ob.put("attemptId", this.attemptId);
        this.atrId.ifPresent(v -> ob.put("atrId", v));
        this.atrCollection.ifPresent(ac -> {
            ob.put("atrBucket", ac.bucketName());
            ob.put("atrScope", ac.scopeName());
            ob.put("atrCollection", ac.name());
        });
        long transactionElapsedTimeMillis = (System.nanoTime() - this.overall.startTimeClient().toNanos()) / 1000000L;
        ob.put("transactionElapsedTimeMillis", transactionElapsedTimeMillis);
        this.startTimeServer.ifPresent(v -> ob.put("startTimeServerMillis", v.toMillis()));
        JsonArray mutations = JsonArray.create();
        this.stagedMutationsLocked.forEach(sm -> {
            JsonObject m = JsonObject.create();
            m.put("type", sm.type.toString());
            switch (sm.type) {
                case REMOVE: {
                    break;
                }
                default: {
                    if (!includeMutationBody) break;
                    m.put("content", Base64.getEncoder().encodeToString(sm.content));
                }
            }
            sm.doc.serialize(m);
            mutations.add(m);
        });
        ob.put("mutations", mutations);
        return ob;
    }

    private JsonObject makeQueryTxDataLocked() {
        this.assertLocked("makeQueryTxData");
        ClusterEnvironment env = this.parent.cleanup().clusterData().cluster().environment();
        JsonObject out = JsonObject.create().put("id", JsonObject.create().put("txn", this.transactionId()).put("atmpt", this.attemptId)).put("state", JsonObject.create().put("timeLeftMs", this.expiryRemainingMillis())).put("config", JsonObject.create().put("kvTimeoutMs", this.config.keyValueTimeout().orElse(env.timeoutConfig().kvDurableTimeout()).toMillis()).put("durabilityLevel", this.config.transactionDurabilityLevel().name()).put("numAtrs", this.config.numAtrs()));
        JsonArray mutations = JsonArray.create();
        this.stagedMutationsLocked.forEach(sm -> mutations.add(JsonObject.create().put("scp", sm.doc.collection().scopeName()).put("coll", sm.doc.collection().name()).put("bkt", sm.doc.collection().bucketName()).put("id", sm.doc.id()).put("cas", Long.toString(sm.doc.cas())).put("type", sm.type.name())));
        out.put("mutations", mutations);
        if (this.atrCollection.isPresent() && this.atrId.isPresent()) {
            out.put("atr", JsonObject.create().put("id", this.atrId.get()).put("bkt", this.atrCollection.get().bucketName()).put("scp", this.atrCollection.get().scopeName()).put("coll", this.atrCollection.get().name()));
        } else if (this.config.metadataCollection().isPresent()) {
            Collection mc = this.config.metadataCollection().get();
            out.put("atr", JsonObject.create().put("bkt", mc.bucketName()).put("scp", mc.scopeName()).put("coll", mc.name()));
        }
        return out;
    }

    public AttemptContextReactive(JsonObject pill, TransactionContext overall, MergedTransactionConfig config, TransactionsReactive parent, Optional<SpanWrapper> parentSpan) {
        this.LOGGER = overall.LOGGER;
        this.overall = overall;
        this.config = config;
        this.attemptId = pill.getString("attemptId");
        this.startTimeClient = Duration.of(System.nanoTime(), ChronoUnit.NANOS);
        this.atrId = Optional.of(pill.getString("atrId"));
        String atrBucket = pill.getString("atrBucket");
        String atrScope = pill.getString("atrScope");
        String atrCollection = pill.getString("atrCollection");
        this.atrCollection = Optional.of(parent.cleanup().clusterData().getBucketFromName(atrBucket).scope(atrScope).collection(atrCollection));
        this.parent = parent;
        this.state = AttemptStates.PENDING;
        JsonArray mutations = pill.getArray("mutations");
        mutations.iterator().forEachRemaining(v -> {
            JsonObject jo = (JsonObject)v;
            TransactionGetResult doc = TransactionGetResult.createFrom(jo, parent.cleanup().clusterData());
            StagedMutationType type = StagedMutationType.REMOVE;
            switch (jo.getString("type")) {
                case "INSERT": {
                    type = StagedMutationType.INSERT;
                    break;
                }
                case "REPLACE": {
                    type = StagedMutationType.REPLACE;
                }
            }
            switch (type) {
                case REMOVE: {
                    StagedMutation sm = new StagedMutation(UUID.randomUUID().toString(), doc, null, type, null);
                    this.stagedMutationsLocked.add(sm);
                    break;
                }
                default: {
                    byte[] bytes = Base64.getDecoder().decode(jo.getString("content"));
                    StagedMutation sm = new StagedMutation(UUID.randomUUID().toString(), doc, bytes, type, null);
                    this.stagedMutationsLocked.add(sm);
                }
            }
        });
        this.attemptSpan = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "attempt", parentSpan.orElse(null));
    }

    SpanWrapper span() {
        return this.attemptSpan;
    }

    Duration startTimeClient() {
        return this.startTimeClient;
    }

    public String attemptId() {
        return this.attemptId;
    }

    public String transactionId() {
        return this.overall.transactionId();
    }

    private List<StagedMutation> stagedReplacesLocked() {
        this.assertLocked("stagedReplaces");
        this.assertNotQueryMode("stagedReplaces");
        return this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REPLACE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedRemovesLocked() {
        this.assertLocked("stagedRemoves");
        this.assertNotQueryMode("stagedRemoves");
        return this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REMOVE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedInsertsLocked() {
        this.assertNotQueryMode("stagedInserts");
        this.assertLocked("stagedInserts");
        return this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.INSERT).collect(Collectors.toList());
    }

    private Optional<StagedMutation> checkForOwnWriteLocked(ReactiveCollection collection, String id) {
        this.assertLocked("checkForOwnWrite");
        this.assertNotQueryMode("checkForOwnWrite");
        Optional<StagedMutation> ownReplace = this.stagedReplacesLocked().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst();
        if (ownReplace.isPresent()) {
            return ownReplace;
        }
        Optional<StagedMutation> ownInserted = this.stagedInsertsLocked().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst();
        if (ownInserted.isPresent()) {
            return ownInserted;
        }
        return Optional.empty();
    }

    private Mono<Void> errorIfExpiredAndNotInExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.expiryOvertimeMode) {
            this.LOGGER.info(this.attemptId, "not doing expiry check in %s as already in expiry-overtime-mode", stage);
            return Mono.empty();
        }
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage %s", stage);
            return Mono.error((Throwable)new AttemptExpired(this, "Attempt has expired in stage " + stage));
        }
        return Mono.empty();
    }

    private void checkExpiryPreCommitAndSetExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage %s, setting expiry-overtime-mode", stage);
            this.expiryOvertimeMode = true;
            throw this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build());
        }
    }

    public Mono<Optional<TransactionGetResult>> getOptional(ReactiveCollection collection, String id) {
        return this.getInternal(collection, id);
    }

    private Mono<Optional<TransactionGetResult>> getInternal(ReactiveCollection collection, String id) {
        return this.doKVOperation("get " + DebugUtil.docId(collection, id), "user.get", HOOK_GET, collection, id, (operationId, span, lockToken) -> Mono.defer(() -> {
            if (this.queryModeLocked()) {
                return this.getWithQueryLocked(collection, id, (ReactiveLock.Waiter)lockToken);
            }
            return this.getWithKVLocked(collection, id, Optional.empty(), (SpanWrapper)span, (ReactiveLock.Waiter)lockToken);
        }));
    }

    private Mono<Optional<TransactionGetResult>> getWithKVLocked(ReactiveCollection collection, String id, Optional<String> resolvingMissingATREntry, SpanWrapper pspan, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            this.assertLocked("getWithKV");
            this.LOGGER.info(this.attemptId, "getting doc %s, resolvingMissingATREntry=%s", DebugUtil.docId(collection, id), resolvingMissingATREntry.orElse("<empty>"));
            Optional<StagedMutation> ownWrite = this.checkForOwnWriteLocked(collection, id);
            if (ownWrite.isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of mutated doc %s", DebugUtil.docId(collection, id));
                return this.unlock(lockToken, "found own-write of mutation").then(Mono.just(Optional.of(TransactionGetResult.createFrom(ownWrite.get().doc, ownWrite.get().content, TransactionJsonDocumentStatus.OWN_WRITE))));
            }
            Optional<TransactionGetResult> ownRemove = this.stagedRemovesLocked().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst().map(v -> v.doc);
            if (ownRemove.isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of removed doc %s", DebugUtil.docId(collection, id));
                return this.unlock(lockToken, "found own-write of removed").then(Mono.just(Optional.empty()));
            }
            return this.beforeUnlockGet(this, id).then(this.unlock(lockToken, "standard")).then(this.beforeDocGet(this, id)).then(DocumentGetter.getAsync(this.cluster(), this.LOGGER, collection, this.config, id, this.attemptId, false, pspan, this.getTranscoder(), resolvingMissingATREntry)).publishOn(SchedulerUtil.scheduler).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.warn(this.attemptId, "got error while getting doc %s in %dus: %s", DebugUtil.docId(collection, id), pspan.elapsed(), AttemptContextReactive.dbg(err));
                if (err instanceof ForwardCompatibilityRequiresRetry || err instanceof ForwardCompatibilityFailure) {
                    TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new ForwardCompatibilityFailure());
                    if (err instanceof ForwardCompatibilityRequiresRetry) {
                        error.retryTransaction();
                    }
                    return Mono.error((Throwable)this.operationFailed(error.build()));
                }
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof ActiveTransactionRecordNotFound || err instanceof ActiveTransactionRecordEntryNotFound) {
                    String attemptIdToCheck = err instanceof ActiveTransactionRecordNotFound ? ((ActiveTransactionRecordNotFound)err).attemptId() : ((ActiveTransactionRecordEntryNotFound)err).attemptId();
                    return this.lock("get relock").flatMap(newLockToken -> this.getWithKVLocked(collection, id, Optional.of(attemptIdToCheck), pspan, (ReactiveLock.Waiter)newLockToken).onErrorResume(e -> this.unlock((ReactiveLock.Waiter)newLockToken, "relock error").then(Mono.error((Throwable)e))));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().build()));
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)this.operationFailed(builder.retryTransaction().build()));
                }
                return Mono.error((Throwable)this.operationFailed(builder.build()));
            }).flatMap(v -> {
                long elapsed = pspan.finish();
                if (v.isPresent()) {
                    this.LOGGER.info(this.attemptId, "completed get of %s in %dus", v.get(), elapsed);
                } else {
                    this.LOGGER.info(this.attemptId, "completed get of %s, could not find, in %dus", DebugUtil.docId(collection, id), elapsed);
                }
                return this.afterGetComplete(this, id).thenReturn(v);
            }).flatMap(doc -> {
                if (doc.isPresent()) {
                    return this.forwardCompatibilityCheck(ForwardCompatibilityStages.GETS, doc.flatMap(v -> v.links().forwardCompatibility())).thenReturn(doc);
                }
                return Mono.just((Object)doc);
            });
        });
    }

    private JsonObject makeTxdata() {
        return JsonObject.create().put("kv", true);
    }

    private Mono<Optional<TransactionGetResult>> getWithQueryLocked(ReactiveCollection collection, String id, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            this.assertLocked("getWithQuery");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "user.query_get", this.attemptSpan);
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(collection)).add(id);
            return this.queryWrapperLocked(sidx, null, "EXECUTE __get", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_GET, false, true, this.makeTxdata(), params, span, false, lt, true).map(result -> {
                Optional<Object> ret;
                List rows = result.rowsAsObject();
                if (rows.size() == 0) {
                    ret = Optional.empty();
                } else {
                    JsonObject row = (JsonObject)rows.get(0);
                    String scas = row.getString("scas");
                    long cas = Long.parseLong(scas);
                    JsonObject doc = row.getObject("doc");
                    JsonObject txnMeta = row.getObject("txnMeta");
                    this.logger().info(this.attemptId, "got doc %s from query with scas=%s meta=%s", DebugUtil.docId(collection, id), scas, txnMeta == null ? "null" : txnMeta.toString());
                    ret = Optional.of(new TransactionGetResult(id, doc.toBytes(), cas, collection, null, null, null, this.getTranscoder(), Optional.ofNullable(txnMeta)));
                }
                return ret;
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof DocumentNotFoundException) {
                    return Mono.just(Optional.empty());
                }
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                return Mono.error((Throwable)this.operationFailed(builder.build()));
            }).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "getWithQueryLocked end", false).thenReturn(result)).doOnTerminate(() -> span.finish());
        });
    }

    public Mono<TransactionGetResult> get(ReactiveCollection collection, String id) {
        return this.getOptional(collection, id).flatMap(doc -> {
            if (doc.isPresent()) {
                return Mono.just((Object)((TransactionGetResult)doc.get()));
            }
            TransactionOperationFailed err = this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).cause((Throwable)new DocumentNotFoundException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build());
            return Mono.error((Throwable)err);
        });
    }

    boolean hasExpiredClientSide(String place, Optional<String> docId) {
        boolean over = this.overall.hasExpiredClientSide();
        boolean hook = this.hasExpiredClientSideHook(this, place, docId);
        if (over) {
            this.LOGGER.info(this.attemptId, "expired in %s", place);
        }
        if (hook) {
            this.LOGGER.info(this.attemptId, "fake expiry in %s", place);
        }
        return over || hook;
    }

    List<MutationToken> finalMutationTokens() {
        return this.finalMutationTokens;
    }

    public Optional<String> atrId() {
        return this.atrId;
    }

    public Optional<ReactiveCollection> atrCollection() {
        return this.atrCollection;
    }

    public Mono<TransactionGetResult> insert(ReactiveCollection collection, String id, Object content) {
        return this.insert(collection, id, content, TransactionInsertOptions.DEFAULT);
    }

    private ReactiveCollection getAtrCollection(ReactiveCollection docCollection) {
        if (this.config.metadataCollection().isPresent()) {
            return this.config.metadataCollection().get().reactive();
        }
        return this.parent.cleanup().clusterData().getBucketFromName(docCollection.bucketName()).defaultCollection();
    }

    private static String makeKeyspace(ReactiveCollection collection) {
        return String.format("default:`%s`.`%s`.`%s`", collection.bucketName(), collection.scopeName(), collection.name());
    }

    public Mono<TransactionGetResult> insert(ReactiveCollection collection, String id, Object content, TransactionInsertOptions options) {
        return this.doKVOperation("insert " + DebugUtil.docId(collection, id), "user.insert", HOOK_INSERT, collection, id, (operationId, span, lockToken) -> this.insertInternal((String)operationId, collection, id, content, (SpanWrapper)span, (ReactiveLock.Waiter)lockToken, options));
    }

    private Mono<TransactionGetResult> insertInternal(String operationId, ReactiveCollection collection, String id, Object content, SpanWrapper span, ReactiveLock.Waiter lockToken, TransactionInsertOptions options) {
        return Mono.defer(() -> {
            TransactionInsertOptions.BuiltOptions built = options.build();
            if (this.queryModeLocked()) {
                return this.insertWithQueryLocked(collection, id, content, lockToken);
            }
            return this.insertWithKVLocked(operationId, collection, id, content, span, lockToken, built);
        });
    }

    private Mono<TransactionGetResult> insertWithKVLocked(String operationId, ReactiveCollection collection, String id, Object content, SpanWrapper span, ReactiveLock.Waiter lockToken, TransactionInsertOptions.BuiltOptions built) {
        this.assertLocked("insertWithKV");
        Optional<StagedMutation> existing = this.findStagedMutationLocked(collection, id);
        if (existing.isPresent()) {
            StagedMutation op = existing.get();
            if (op.type == StagedMutationType.INSERT || op.type == StagedMutationType.REPLACE) {
                return Mono.error((Throwable)new DocumentExistsException(null));
            }
        }
        return this.initAtrIfNeededLocked(collection, id, span).then(this.beforeUnlockInsert(this, id)).then(this.unlock(lockToken, "standard")).then(Mono.defer(() -> {
            if (existing.isPresent() && ((StagedMutation)existing.get()).type == StagedMutationType.REMOVE) {
                return this.createStagedReplace(operationId, ((StagedMutation)existing.get()).doc, content, span, false);
            }
            return this.createStagedInsert(operationId, collection, id, content, span, built, Optional.empty());
        }));
    }

    private Mono<TransactionGetResult> insertWithQueryLocked(ReactiveCollection collection, String id, Object content, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "user.query_insert", this.attemptSpan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject reencoded = JsonObject.fromJson((byte[])encoded.encoded());
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(collection)).add(id).add(reencoded).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            return this.queryWrapperLocked(sidx, null, "EXECUTE __insert", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_INSERT, false, true, this.makeTxdata(), params, span, false, lt, true).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "insertWithQueryLocked end", false).thenReturn(result)).map(result -> {
                JsonObject row = (JsonObject)result.rowsAsObject().get(0);
                String scas = row.getString("scas");
                long cas = Long.parseLong(scas);
                return TransactionGetResult.createFromInsert(collection, id, content.toString().getBytes(StandardCharsets.UTF_8), this.transactionId(), this.attemptId, null, null, null, null, cas, this.getTranscoder());
            }).onErrorResume(err -> {
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof DocumentExistsException) {
                    return Mono.error((Throwable)err);
                }
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailed out = this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).build());
                return Mono.error((Throwable)out);
            }).doOnTerminate(() -> span.finish());
        });
    }

    protected String randomAtrIdForVbucket(AttemptContextReactive self, Integer vbucketIdForDoc, int numAtrs) {
        return ATRIds.randomAtrIdForVbucket(vbucketIdForDoc, numAtrs);
    }

    public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object content) {
        return this.replace(doc, content, TransactionReplaceOptions.DEFAULT);
    }

    private Mono<ReactiveLock.Waiter> lock(String dbg) {
        return Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.mutex.lock(dbg, Duration.ofMillis(this.expiryRemainingMillis()));
            }
            return Mono.empty();
        });
    }

    private Mono<Void> unlock(ReactiveLock.Waiter lockToken, String dbgExtra, boolean removeFromWaiters) {
        return Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.mutex.unlock(lockToken, dbgExtra, removeFromWaiters);
            }
            return Mono.empty();
        });
    }

    private Mono<Void> unlock(ReactiveLock.Waiter lockToken, String dbgExtra) {
        return this.unlock(lockToken, dbgExtra, false);
    }

    private Mono<LockTokens> lockAndIncKVOps(String dbg) {
        return this.lock(dbg).flatMap(lt -> this.kvOps.add(dbg).map(opsToken -> new LockTokens((ReactiveLock.Waiter)lt, (ReactiveWaitGroup.Waiter)opsToken)));
    }

    private Mono<ReactiveLock.Waiter> waitForAllKVOpsThenLock(String dbg) {
        return Mono.fromRunnable(() -> {
            this.assertNotLocked(dbg);
            this.logger().info(this.attemptId, "waiting for %d KV ops finish for %s", this.kvOps.waitingCount(), dbg);
        }).then(Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.kvOps.await(Duration.ofMillis(this.expiryRemainingMillis()));
            }
            return Mono.empty();
        })).then(this.lock(dbg)).flatMap(lockToken -> {
            if (this.kvOps.waitingCount() > 0) {
                return this.unlock((ReactiveLock.Waiter)lockToken, dbg + " still waiting for KV ops").then(this.waitForAllKVOpsThenLock(dbg + " still waiting for KV ops"));
            }
            return Mono.just((Object)lockToken);
        });
    }

    private Mono<Void> waitForAllOpsThenDoUnderLock(String dbg, @Nullable SpanWrapper span, Supplier<Mono<Void>> doUnderLock) {
        return Mono.defer(() -> this.waitForAllOps(dbg).then(this.lock(dbg)).flatMap(arg_0 -> this.lambda$waitForAllOpsThenDoUnderLock$44(dbg, span, (Supplier)doUnderLock, arg_0)).doOnError(err -> span.failWith((Throwable)err)).doOnTerminate(() -> span.finish()));
    }

    private Mono<Void> waitForAllOps(String dbg) {
        return Mono.fromRunnable(() -> {
            this.assertNotLocked(dbg);
            this.logger().info(this.attemptId, "waiting for %d KV ops in %s", this.kvOps.waitingCount(), dbg);
        }).then(Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.kvOps.await(Duration.ofMillis(this.expiryRemainingMillis()));
            }
            return Mono.empty();
        }));
    }

    public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object content, TransactionReplaceOptions options) {
        return this.doKVOperation("replace " + DebugUtil.docId(doc).toString(), "user.replace", HOOK_REPLACE, doc.collection(), doc.id(), (operationId, span, lockToken) -> this.replaceInternalLocked((String)operationId, doc, content, (SpanWrapper)span, (ReactiveLock.Waiter)lockToken));
    }

    private <T> Mono<T> createMonoBridge(String debug, Mono<T> internal) {
        if (this.threadSafetyEnabled) {
            return new MonoBridge<T>(internal, debug, this, this.monoBridgeDebugging ? this.LOGGER : null).external();
        }
        return internal;
    }

    private <T> Mono<T> doKVOperation(String lockDebugOrig, String spanName, String stageName, ReactiveCollection docCollection, String docId, TriFunction<String, SpanWrapper, ReactiveLock.Waiter, Mono<T>> op) {
        return this.createMonoBridge(lockDebugOrig, Mono.defer(() -> {
            String operationId = UUID.randomUUID().toString();
            String lockDebug = lockDebugOrig + " - " + operationId.substring(0, 5);
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), docCollection, docId, spanName, this.attemptSpan);
            return this.lockAndIncKVOps(lockDebug).subscribeOn(SchedulerUtil.scheduler).flatMap(lockTokens -> Mono.defer(() -> {
                TransactionOperationFailed returnEarly = this.canPerformOperation(lockDebug);
                if (returnEarly != null) {
                    return Mono.error((Throwable)returnEarly);
                }
                if (this.hasExpiredClientSide(stageName, Optional.of(docId))) {
                    this.LOGGER.info(this.attemptId, "has expired in stage %s, setting expiry-overtime-mode", stageName);
                    this.expiryOvertimeMode = true;
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).cause(new AttemptExpired(this, "Attempt expired in stage " + stageName)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build()));
                }
                return Mono.empty();
            }).then((Mono)op.apply(operationId, span, lockTokens.mutexToken)).doFinally(v -> {
                if (v == SignalType.CANCEL || v == SignalType.ON_ERROR) {
                    this.LOGGER.info(this.attemptId, "doKVOperation %s got signal %s", lockDebug, v);
                    this.unlock(lockTokens.mutexToken, "doKVOperation", v == SignalType.CANCEL).block();
                }
                this.kvOps.done(lockTokens.waitGroupToken).block();
                span.finish();
            }));
        }));
    }

    private <T> Mono<T> doQueryOperation(String lockDebugIn, BiFunction<Integer, AtomicReference<ReactiveLock.Waiter>, Mono<T>> op) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            String lockDebug = lockDebugIn + " q" + sidx;
            return this.createMonoBridge(lockDebug, Mono.defer(() -> {
                AtomicReference lt = new AtomicReference();
                return this.lock(lockDebug).subscribeOn(SchedulerUtil.scheduler).flatMap(lockToken -> {
                    lt.set(lockToken);
                    return ((Mono)op.apply(sidx, lt)).doFinally(v -> {
                        if (v == SignalType.CANCEL || v == SignalType.ON_ERROR) {
                            this.LOGGER.info(this.attemptId, "doQueryOperation %s got signal %s", lockDebug, v);
                        }
                        this.unlock((ReactiveLock.Waiter)lt.get(), "doQueryOperation", v == SignalType.CANCEL).block();
                    });
                });
            }));
        });
    }

    private Mono<TransactionGetResult> replaceInternalLocked(String operationId, TransactionGetResult doc, Object content, SpanWrapper pspan, ReactiveLock.Waiter lockToken) {
        this.LOGGER.info(this.attemptId, "replace doc %s, operationId = %s", doc, operationId);
        if (this.queryModeLocked()) {
            return this.replaceWithQueryLocked(doc, content, lockToken);
        }
        return this.replaceWithKVLocked(operationId, doc, content, pspan, lockToken);
    }

    private Mono<TransactionGetResult> replaceWithKVLocked(String operationId, TransactionGetResult doc, Object content, SpanWrapper pspan, ReactiveLock.Waiter lockToken) {
        Optional<StagedMutation> existing = this.findStagedMutationLocked(doc);
        boolean mayNeedToWriteATR = this.state == AttemptStates.NOT_STARTED;
        return this.beforeUnlockReplace(this, doc.id()).then(this.unlock(lockToken, "standard")).then(Mono.defer(() -> {
            if (existing.isPresent()) {
                StagedMutation op = (StagedMutation)existing.get();
                if (op.type == StagedMutationType.REMOVE) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).cause((Throwable)new DocumentNotFoundException(null)).build()));
                }
            }
            return this.checkAndHandleBlockingTxn(doc, pspan, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_REPLACING, existing).then(this.initATRIfNeeded(mayNeedToWriteATR, doc.collection(), doc.id(), pspan)).then(Mono.defer(() -> {
                if (existing.isPresent() && ((StagedMutation)existing.get()).type == StagedMutationType.INSERT) {
                    return this.createStagedInsert(operationId, doc.collection(), doc.id(), content, pspan, TransactionInsertOptions.insertOptions().build(), Optional.of(doc.cas()));
                }
                return this.createStagedReplace(operationId, doc, content, pspan, doc.links().isDeleted());
            }));
        }));
    }

    private Mono<Void> initAtrIfNeededLocked(ReactiveCollection docCollection, String docId, SpanWrapper pspan) {
        return Mono.defer(() -> {
            if (this.state == AttemptStates.NOT_STARTED) {
                return Mono.fromCallable(() -> this.selectAtrLocked(docCollection, docId)).flatMap(atrCollection -> this.atrPendingLocked((ReactiveCollection)atrCollection, pspan)).then();
            }
            return Mono.empty();
        });
    }

    private Mono<Void> initATRIfNeeded(boolean mayNeedToWriteATR, ReactiveCollection docCollection, String docId, SpanWrapper pspan) {
        return Mono.defer(() -> {
            if (mayNeedToWriteATR) {
                return this.doUnderLock("before ATR " + DebugUtil.docId(docCollection, docId), null, () -> this.initAtrIfNeededLocked(docCollection, docId, pspan));
            }
            return Mono.empty();
        });
    }

    private ReactiveCollection selectAtrLocked(ReactiveCollection docCollection, String docId) {
        if (this.atrId.isPresent()) {
            throw new IllegalStateException("Internal bug: two operations have concurrently initialised the ATR");
        }
        long vbucketIdForDoc = ATRIds.vbucketForKey(docId, 1024);
        String atr = this.randomAtrIdForVbucket(this, (int)vbucketIdForDoc, this.config.numAtrs());
        this.atrId = Optional.of(atr);
        this.atrCollection = this.config.metadataCollection().isPresent() ? Optional.of(this.config.metadataCollection().get().reactive()) : Optional.of(this.getAtrCollection(docCollection));
        this.LOGGER.info(this.attemptId, "First mutated doc in txn is '%s' on vbucket %d, so using atr %s", DebugUtil.docId(docCollection, docId), vbucketIdForDoc, atr);
        return this.atrCollection.get();
    }

    private Mono<TransactionGetResult> replaceWithQueryLocked(TransactionGetResult doc, Object content, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "user.query_replace", this.attemptSpan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject reencoded = JsonObject.fromJson((byte[])encoded.encoded());
            JsonObject txData = this.makeTxdata().put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.put("txnMeta", v));
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(doc.collection())).add(doc.id()).add(reencoded).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            return this.queryWrapperLocked(sidx, null, "EXECUTE __update", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_REPLACE, false, true, txData, params, span, false, lt, true).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "replaceWithQueryLocked end", false).thenReturn(result)).map(result -> {
                List rows = result.rowsAsObject();
                JsonObject row = (JsonObject)rows.get(0);
                String scas = row.getString("scas");
                long cas = Long.parseLong(scas);
                JsonObject updatedDoc = row.getObject("doc");
                return new TransactionGetResult(doc.id(), updatedDoc.toBytes(), cas, doc.collection(), null, null, null, this.getTranscoder(), Optional.empty());
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    TransactionOperationFailed out = this.operationFailed(builder.retryTransaction().build());
                    return Mono.error((Throwable)out);
                }
                TransactionOperationFailed out = this.operationFailed(builder.build());
                return Mono.error((Throwable)out);
            }).doOnTerminate(() -> span.finish());
        });
    }

    private Mono<Void> removeWithQueryLocked(TransactionGetResult doc, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "user.query_remove", this.attemptSpan);
            JsonObject txData = this.makeTxdata().put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.put("txnMeta", v));
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(doc.collection())).add(doc.id()).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            return this.queryWrapperLocked(sidx, null, "EXECUTE __delete", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_REMOVE, false, true, txData, params, span, false, lt, true).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "removeWithQueryLocked end", false)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    TransactionOperationFailed out = this.operationFailed(builder.retryTransaction().build());
                    return Mono.error((Throwable)out);
                }
                TransactionOperationFailed out = this.operationFailed(builder.build());
                return Mono.error((Throwable)out);
            }).doOnTerminate(() -> span.finish());
        });
    }

    private Mono<Void> forwardCompatibilityCheck(ForwardCompatibilityStages stage, Optional<ForwardCompatibility> fc) {
        return ForwardCompatibility.check(stage, fc, this.logger(), Supported.SUPPORTED).onErrorResume(err -> {
            TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new ForwardCompatibilityFailure());
            if (err instanceof ForwardCompatibilityRequiresRetry) {
                error.retryTransaction();
            }
            return Mono.error((Throwable)this.operationFailed(error.build()));
        });
    }

    private Mono<Void> checkATREntryForBlockingDocInternal(TransactionGetResult doc, ReactiveCollection collection, SpanWrapper span) {
        return Mono.fromRunnable(() -> this.checkExpiryPreCommitAndSetExpiryOvertimeMode("staging.check_atr_entry_blocking_doc", Optional.empty())).then(this.beforeCheckATREntryForBlockingDoc(this, doc.links().atrId().get())).then(ActiveTransactionRecord.findEntryForTransaction(collection, doc.links().atrId().get(), doc.links().stagedAttemptId().get(), this.config, span, this.logger()).flatMap(atrEntry -> {
            if (atrEntry.isPresent()) {
                ATREntry ae = (ATREntry)atrEntry.get();
                this.LOGGER.info(this.attemptId, "fetched ATR entry for blocking txn: hasExpired=%s entry=%s", ae.hasExpired(), ae);
                return this.forwardCompatibilityCheck(ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_READING_ATR, ae.forwardCompatibility()).then(Mono.defer(() -> {
                    switch (ae.state()) {
                        case COMPLETED: 
                        case ROLLED_BACK: {
                            this.LOGGER.info(this.attemptId, "ATR entry state of %s indicates we can proceed to overwrite", new Object[]{((ATREntry)atrEntry.get()).state()});
                            return Mono.empty();
                        }
                    }
                    return Mono.error((Throwable)new RetryOperation());
                }));
            }
            this.LOGGER.info(this.attemptId, "blocking txn %s's entry has been removed indicating the txn expired, so proceeding to overwrite", doc.links().stagedAttemptId().get());
            return Mono.empty();
        })).retryWhen(Retry.anyOf((Class[])new Class[]{RetryOperation.class}).exponentialBackoff(Duration.ofMillis(50L), Duration.ofMillis(500L)).timeout(Duration.ofSeconds(1L)).toReactorRetry()).publishOn(SchedulerUtil.scheduler).onErrorResume(err -> {
            if (err instanceof RetryExhaustedException) {
                this.LOGGER.info(this.attemptId, "still blocked by a valid transaction, retrying to unlock documents");
            } else {
                if (err instanceof DocumentNotFoundException) {
                    this.LOGGER.info(this.attemptId, "blocking txn's ATR has been removed so proceeding to overwrite");
                    return Mono.empty();
                }
                this.LOGGER.warn(this.attemptId, "got error in checkATREntryForBlockingDoc: %s", AttemptContextReactive.dbg(err));
            }
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_WRITE_WRITE_CONFLICT).cause((Throwable)err).retryTransaction().build()));
        }).then();
    }

    private Mono<Void> checkATREntryForBlockingDoc(TransactionGetResult doc, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "staging.check_atr_blocking", pspan);
            ReactiveCollection collection = this.parent.cleanup().clusterData().cluster().bucket(doc.links().atrBucketName().get()).scope(doc.links().atrScopeName().get()).collection(doc.links().atrCollectionName().get()).reactive();
            return this.checkATREntryForBlockingDocInternal(doc, collection, span).doFinally(v -> span.finish()).doOnError(err -> AttemptContextReactive.failSpan(span, err));
        });
    }

    private RedactableArgument getAtrDebug(ReactiveCollection collection, Optional<String> atrId) {
        return ATRUtil.getAtrDebug(collection, atrId);
    }

    private RedactableArgument getAtrDebug(Optional<ReactiveCollection> collection, Optional<String> atrId) {
        return ATRUtil.getAtrDebug(collection, atrId);
    }

    private RemoveOptions wrap(RemoveOptions opts, SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(opts, pspan, this.config, core);
    }

    private MutateInOptions wrap(MutateInOptions opts, SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(opts, pspan, this.config, core);
    }

    private MutateInOptions wrap(SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(pspan, this.config, core);
    }

    long expiryRemainingMillis() {
        long nowNanos = System.nanoTime();
        long expiredMillis = this.overall.timeSinceStartOfTransactionsMillis(nowNanos);
        long remainingMillis = this.config.expirationTime().toMillis() - expiredMillis;
        return Math.max(Math.min(remainingMillis, this.config.expirationTime().toMillis()), 0L);
    }

    private RequestTracer tracer() {
        return this.cluster().environment().requestTracer();
    }

    private Mono<MutateInResult> atrPendingLocked(ReactiveCollection collection, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrPending");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, this.atrId.orElse(null), "atr.pending", pspan);
            String prefix = "attempts." + this.attemptId;
            if (!this.atrId.isPresent()) {
                return Mono.error((Throwable)new IllegalStateException("atrId not present"));
            }
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to set ATR %s to Pending", this.getAtrDebug(collection, this.atrId));
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_PENDING, Optional.empty());
            }).then(this.beforeAtrPending(this)).then(collection.mutateIn(this.atrId.get(), Arrays.asList(MutateInSpec.insert((String)(prefix + "." + "tid"), (Object)this.transactionId()).xattr().createPath(), MutateInSpec.insert((String)(prefix + "." + "st"), (Object)AttemptStates.PENDING.name()).xattr(), MutateInSpec.insert((String)(prefix + "." + "tst"), (Object)MutateInMacro.CAS), MutateInSpec.insert((String)(prefix + "." + "exp"), (Object)this.expiryRemainingMillis()).xattr(), MutateInSpec.insert((String)(prefix + "." + "d"), (Object)DurabilityLevelUtil.convertDurabilityLevel(this.config.durabilityLevel())).xattr(), MutateInSpec.replace((String)"", (Object)new byte[]{0})), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("atrPending"))).storeSemantics(StoreSemantics.UPSERT), span, collection.core()))).publishOn(SchedulerUtil.scheduler).flatMap(v -> this.afterAtrPending(this).map(x -> v)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while setting ATR %s to Pending in %dus: %s", this.getAtrDebug(collection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, HOOK_ATR_PENDING, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, HOOK_ATR_PENDING, ec);
                }
                if (ec == ErrorClasses.FAIL_ATR_FULL) {
                    return Mono.error((Throwable)this.operationFailed(out.cause(new ActiveTransactionRecordFull(this, (Throwable)err)).build()));
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    this.LOGGER.info(this.attemptId, "retrying the op on %s to resolve ambiguity", new Object[]{ec});
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)SchedulerUtil.scheduler).then(this.atrPendingLocked(collection, span));
                }
                if (ec == ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                    this.LOGGER.info(this.attemptId, "assuming this is caused by resolved ambiguity, and proceeding as though successful", new Object[]{ec});
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    this.LOGGER.info(this.attemptId, "transient error likely to be solved by retry", new Object[]{ec});
                    return Mono.error((Throwable)this.operationFailed(out.retryTransaction().build()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(out.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)this.operationFailed(out.build()));
            }).doOnNext(v -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "set ATR %s to Pending in %dus, got CAS (start time) %s", this.getAtrDebug(collection, this.atrId), elapsed, this.dbg((MutationResult)v));
                this.startTimeServer = Optional.of(Duration.ofNanos(v.cas()));
                this.setStateLocked(AttemptStates.PENDING);
            }).doOnError(err -> {
                AttemptContextReactive.failSpan(span, err);
                span.finish();
            });
        });
    }

    private void setStateLocked(AttemptStates newState) {
        this.assertLocked("setState " + (Object)((Object)newState));
        this.logger().info(this.attemptId, "changed state to %s", new Object[]{newState});
        this.state = newState;
    }

    private Transcoder getTranscoder() {
        return this.parent.cleanup().clusterData().cluster().environment().transcoder();
    }

    private static void failSpan(SpanWrapper span, Throwable err) {
        span.failWith(err);
    }

    private Mono<TransactionGetResult> createStagedReplace(String operationId, TransactionGetResult doc, Object content, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            this.assertNotLocked("createStagedReplace");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "staging.replace", pspan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject txn = this.createDocumentMetadata("replace", operationId, doc);
            return this.beforeStagedReplace(this, doc.id()).then(doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.upsert((String)"txn", (Object)txn).xattr().createPath(), MutateInSpec.upsert((String)"txn.op.stgd", (Object)encoded.encoded()).xattr(), MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().accessDeleted(accessDeleted).clientContext(OptionsWrapperUtil.createClientContext("createStagedReplace"))).cas(doc.cas()), span, doc.collection().core()))).publishOn(SchedulerUtil.scheduler).doOnSubscribe(v -> this.LOGGER.info(this.attemptId, "about to replace doc %s with cas %d, accessDeleted=%s", DebugUtil.docId(doc), doc.cas(), accessDeleted)).flatMap(result -> this.afterStagedReplaceComplete(this, doc.id()).map(x -> result)).flatMap(updatedDoc -> {
                long elapsed = span.finish();
                doc.cas(updatedDoc.cas());
                this.LOGGER.info(this.attemptId, "replaced doc %s got %s, in %dus", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc), elapsed);
                return this.addStagedMutation(new StagedMutation(operationId, doc, encoded.encoded(), StagedMutationType.REPLACE, (MutateInResult)updatedDoc));
            }).thenReturn((Object)doc).onErrorResume(err -> this.handleErrorOnStagedMutation("replacing", doc, (Throwable)err, span.elapsed()).thenReturn((Object)doc)).doFinally(v -> span.finish()).doOnError(err -> AttemptContextReactive.failSpan(span, err));
        });
    }

    private JsonObject createDocumentMetadata(String opType, String operationId, @Nullable TransactionGetResult doc) {
        JsonObject op = JsonObject.create().put("type", opType);
        JsonObject ret = JsonObject.create().put("id", JsonObject.create().put("txn", this.transactionId()).put("atmpt", this.attemptId).put("op", operationId)).put("atr", JsonObject.create().put("id", this.atrId.get()).put("bkt", this.atrCollection.get().bucketName()).put("scp", this.atrCollection.get().scopeName()).put("coll", this.atrCollection.get().name())).put("op", op);
        JsonObject restore = JsonObject.create();
        if (doc != null) {
            doc.documentMetadata().flatMap(DocumentMetadata::cas).ifPresent(v -> restore.put("CAS", v));
            doc.documentMetadata().flatMap(DocumentMetadata::exptime).ifPresent(v -> restore.put("exptime", (Number)v));
            doc.documentMetadata().flatMap(DocumentMetadata::revid).ifPresent(v -> restore.put("revid", v));
        }
        if (restore.size() > 0) {
            ret.put("restore", restore);
        }
        return ret;
    }

    private Mono<Void> createStagedRemove(String operationId, TransactionGetResult doc, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "staging.remove", pspan);
            JsonObject txn = this.createDocumentMetadata("remove", operationId, doc);
            return this.beforeStagedRemove(this, doc.id()).doOnSubscribe(x -> this.LOGGER.info(this.attemptId, "about to remove doc %s with cas %d", DebugUtil.docId(doc), doc.cas())).then(doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.upsert((String)"txn", (Object)txn).xattr().createPath(), MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().accessDeleted(accessDeleted).clientContext(OptionsWrapperUtil.createClientContext("createdStagedRemove"))).cas(doc.cas()), span, doc.collection().core()))).publishOn(SchedulerUtil.scheduler).flatMap(updatedDoc -> this.afterStagedRemoveComplete(this, doc.id()).thenReturn(updatedDoc)).flatMap(updatedDoc -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "staged remove of doc %s got %s, in %dus", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc), elapsed);
                doc.cas(updatedDoc.cas());
                return this.addStagedMutation(new StagedMutation(operationId, doc, null, StagedMutationType.REMOVE, (MutateInResult)updatedDoc));
            }).then().onErrorResume(err -> this.handleErrorOnStagedMutation("removing", doc, (Throwable)err, span.elapsed())).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> doUnderLock(String dbg, @Nullable SpanWrapper span, Supplier<Mono<Void>> whileLocked) {
        return this.lock(dbg).flatMap(lockToken -> Mono.defer(() -> AttemptContextReactive.lambda$doUnderLock$110((Supplier)whileLocked)).doFinally(v -> {
            if (span != null) {
                span.finish();
            }
            this.unlock((ReactiveLock.Waiter)lockToken, "doUnderLock on signal " + v).block();
        }));
    }

    private Mono<Void> addStagedMutation(StagedMutation sm) {
        return Mono.defer(() -> this.doUnderLock("addStagedMutation " + DebugUtil.docId(sm.doc), null, () -> Mono.fromRunnable(() -> {
            this.removeStagedMutationLocked(sm.doc);
            this.stagedMutationsLocked.add(sm);
        })));
    }

    private Mono<Void> handleErrorOnStagedMutation(String stage, TransactionGetResult doc, Throwable err, long elapsed) {
        ErrorClasses ec = ErrorClasses.classify(err);
        TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause(err);
        this.LOGGER.info(this.attemptId, "error while %s doc %s in %dus: %s", stage, DebugUtil.docId(doc), elapsed, AttemptContextReactive.dbg(err));
        if (this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "should not reach here in expiryOvertimeMode");
        }
        if (ec == ErrorClasses.FAIL_EXPIRY) {
            return this.setExpiryOvertimeModeAndFail(err, stage, ec);
        }
        if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).retryTransaction().build()));
        }
        if (ec == ErrorClasses.FAIL_AMBIGUOUS || ec == ErrorClasses.FAIL_TRANSIENT) {
            return Mono.error((Throwable)this.operationFailed(out.retryTransaction().build()));
        }
        if (ec == ErrorClasses.FAIL_HARD) {
            return Mono.error((Throwable)this.operationFailed(out.doNotRollbackAttempt().build()));
        }
        return Mono.error((Throwable)this.operationFailed(out.build()));
    }

    private Optional<StagedMutation> findStagedMutationLocked(TransactionGetResult doc) {
        return this.findStagedMutationLocked(doc.collection(), doc.id());
    }

    private Optional<StagedMutation> findStagedMutationLocked(ReactiveCollection collection, String docId) {
        this.assertLocked("findStagedMutation");
        return this.stagedMutationsLocked.stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(docId)).findFirst();
    }

    private void removeStagedMutationLocked(TransactionGetResult doc) {
        this.assertLocked("removeStagedMutation");
        ReactiveCollection collection = doc.collection();
        this.stagedMutationsLocked.removeIf(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(doc.id()));
    }

    private static LogDeferThrowable dbg(Throwable err) {
        return DebugUtil.dbg(err);
    }

    private Mono<TransactionGetResult> handleDocExistsDuringStagedInsert(String operationId, ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options) {
        String bp = "DocExists on " + DebugUtil.docId(collection, id) + ": ";
        return this.beforeGetDocInExistsDuringStagedInsert(this, id).then(DocumentGetter.justGetDoc(collection, this.config, id, pspan, this.getTranscoder(), true, this.logger())).publishOn(SchedulerUtil.scheduler).doOnSubscribe(x -> this.LOGGER.info(this.attemptId, "%s getting doc (which may be a tombstone)", bp)).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
            this.LOGGER.warn(this.attemptId, "%s got error while getting doc: %s", bp, AttemptContextReactive.dbg(err));
            if (ec == ErrorClasses.FAIL_TRANSIENT || ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                e.retryTransaction();
            }
            return Mono.error((Throwable)this.operationFailed(e.build()));
        }).flatMap(v -> {
            if (v.isPresent()) {
                Tuple2 results = (Tuple2)v.get();
                TransactionGetResult r = (TransactionGetResult)results.getT1();
                LookupInResult lir = (LookupInResult)results.getT2();
                this.LOGGER.info(this.attemptId, "%s doc %s exists inTransaction=%s isDeleted=%s", bp, DebugUtil.docId(collection, id), r.links(), lir.isDeleted());
                return this.forwardCompatibilityCheck(ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_INSERTING_GET, r.links().forwardCompatibility()).then(Mono.defer(() -> {
                    if (lir.isDeleted() && !r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "%s doc %s is a regular tombstone without txn metadata, proceeding to overwrite", bp, DebugUtil.docId(collection, id));
                        return this.createStagedInsert(operationId, collection, id, content, pspan, options, Optional.of(r.cas()));
                    }
                    if (!r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "%s doc %s exists but is not in txn, raising DocumentAlreadyExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_ALREADY_EXISTS).cause((Throwable)new DocumentExistsException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build()));
                    }
                    if (r.links().stagedAttemptId().get().equals(this.attemptId)) {
                        if (r.links().stagedOperationId().isPresent() && r.links().stagedOperationId().get().equals(operationId)) {
                            this.LOGGER.info(this.attemptId, "%s doc %s has the same operation id, must be a resolved ambiguity, proceeding", bp, DebugUtil.docId(collection, id));
                            return this.addStagedMutation(new StagedMutation(operationId, r, r.links().stagedContent().get().getBytes(StandardCharsets.UTF_8), StagedMutationType.INSERT, null)).thenReturn((Object)r);
                        }
                        this.LOGGER.info(this.attemptId, "%s doc %s has the same attempt id but a different operation id, must be racing with a concurrent attempt to write the same doc", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_CAS_MISMATCH).cause(new ConcurrentOperationsDetectedOnSameDocument()).build()));
                    }
                    if (!r.links().op().get().equals("insert")) {
                        this.LOGGER.info(this.attemptId, "%s doc %s is in a txn but is not a staged insert, raising DocumentAlreadyExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_ALREADY_EXISTS).cause((Throwable)new DocumentExistsException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build()));
                    }
                    return this.checkAndHandleBlockingTxn(r, pspan, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_INSERTING, Optional.empty()).then(this.overwriteStagedInsert(operationId, collection, id, content, pspan, options, bp, r, lir));
                }));
            }
            this.LOGGER.info(this.attemptId, "%s completed get of %s, could not find, throwing to retry txn which should succeed now", bp, DebugUtil.docId(collection, id));
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).retryTransaction().build()));
        });
    }

    private Mono<TransactionGetResult> overwriteStagedInsert(String operationId, ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options, String bp, TransactionGetResult r, LookupInResult lir) {
        return Mono.defer(() -> {
            assert (r.links().isDocumentInTransaction());
            assert (r.links().op().get().equals("insert"));
            if (lir.isDeleted()) {
                return this.createStagedInsert(operationId, collection, id, content, pspan, options, Optional.of(r.cas()));
            }
            this.LOGGER.info(this.attemptId, "%s removing %s as it's a protocol 1.0 staged insert", bp, DebugUtil.docId(collection, id));
            return this.beforeOverwritingStagedInsertRemoval(this, id).then(collection.remove(id, this.wrap(RemoveOptions.removeOptions(), pspan, collection.core()))).onErrorResume(err -> {
                this.LOGGER.warn(this.attemptId, "%s hit error %s while removing %s", bp, DebugUtil.dbg(err), DebugUtil.docId(collection, id));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH || ec == ErrorClasses.FAIL_TRANSIENT) {
                    out.retryTransaction();
                }
                return Mono.error((Throwable)this.operationFailed(out.build()));
            }).then(this.createStagedInsert(operationId, collection, id, content, pspan, options, Optional.empty()));
        });
    }

    private Mono<TransactionGetResult> createStagedInsert(String operationId, ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options, Optional<Long> cas) {
        return Mono.defer(() -> {
            this.assertNotLocked("createStagedInsert");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "staging.insert", pspan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject txn = this.createDocumentMetadata("insert", operationId, null);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to insert staged doc %s as shadow document, cas=%s, operationId=%s", DebugUtil.docId(collection, id), cas, operationId);
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_CREATE_STAGED_INSERT, Optional.of(id));
            }).then(this.beforeStagedInsert(this, id)).then(collection.mutateIn(id, Arrays.asList(MutateInSpec.upsert((String)"txn", (Object)txn).xattr().createPath(), MutateInSpec.upsert((String)"txn.op.stgd", (Object)encoded.encoded()).xattr(), MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("createStagedInsert"))).accessDeleted(true).createAsDeleted(true).cas(cas.orElse(0L).longValue()).storeSemantics(cas.isPresent() ? StoreSemantics.REPLACE : StoreSemantics.INSERT), span, collection.core()))).publishOn(SchedulerUtil.scheduler).flatMap(updatedDoc -> this.afterStagedInsertComplete(this, id).thenReturn(updatedDoc)).doOnNext(updatedDoc -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "inserted doc %s got %s, in %dus", DebugUtil.docId(collection, id), this.dbg((MutationResult)updatedDoc), elapsed);
            }).flatMap(updatedDoc -> {
                TransactionGetResult out = TransactionGetResult.createFromInsert(collection, id, encoded.encoded(), this.transactionId(), this.attemptId, this.atrId.get(), this.atrCollection.get().bucketName(), this.atrCollection.get().scopeName(), this.atrCollection.get().name(), updatedDoc.cas(), this.getTranscoder());
                return this.addStagedMutation(new StagedMutation(operationId, out, encoded.encoded(), StagedMutationType.INSERT, (MutateInResult)updatedDoc)).thenReturn((Object)out);
            }).onErrorResume(err -> {
                this.LOGGER.info(this.attemptId, "got err while staging insert of %s: %s", DebugUtil.docId(collection, id), AttemptContextReactive.dbg(err));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof FeatureNotAvailableException) {
                    return Mono.error((Throwable)this.operationFailed(out.build()));
                }
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, HOOK_CREATE_STAGED_INSERT, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, HOOK_CREATE_STAGED_INSERT, ec);
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)SchedulerUtil.scheduler).then(this.createStagedInsert(operationId, collection, id, content, span, options, cas));
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)this.operationFailed(out.retryTransaction().build()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(out.doNotRollbackAttempt().build()));
                }
                if (ec == ErrorClasses.FAIL_DOC_ALREADY_EXISTS || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return this.handleDocExistsDuringStagedInsert(operationId, collection, id, content, span, options);
                }
                return Mono.error((Throwable)this.operationFailed(out.build()));
            }).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doOnTerminate(() -> span.finish());
        });
    }

    public Mono<Void> remove(TransactionGetResult doc) {
        return this.doKVOperation("remove " + DebugUtil.docId(doc), "user.remove", HOOK_REMOVE, doc.collection(), doc.id(), (operationId, span, lockToken) -> this.removeInternalLocked((String)operationId, doc, (SpanWrapper)span, (ReactiveLock.Waiter)lockToken).thenReturn((Object)1)).then();
    }

    private Mono<Void> removeInternalLocked(String operationId, TransactionGetResult doc, SpanWrapper span, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "remove doc %s, operationId=%s", DebugUtil.docId(doc), operationId);
            if (this.queryModeLocked()) {
                return this.removeWithQueryLocked(doc, lockToken);
            }
            return this.removeWithKVLocked(operationId, doc, span, lockToken);
        });
    }

    private Mono<Void> removeWithKVLocked(String operationId, TransactionGetResult doc, SpanWrapper span, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            boolean mayNeedToWriteATR = this.state == AttemptStates.NOT_STARTED;
            Optional<StagedMutation> existing = this.findStagedMutationLocked(doc);
            return this.beforeUnlockRemove(this, doc.id()).then(this.unlock(lockToken, "standard")).then(Mono.defer(() -> {
                if (existing.isPresent()) {
                    StagedMutation op = (StagedMutation)existing.get();
                    this.LOGGER.info(this.attemptId, "found previous write of %s as %s on remove", new Object[]{DebugUtil.docId(doc), op.type});
                    if (op.type == StagedMutationType.REMOVE) {
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).cause((Throwable)new DocumentNotFoundException(null)).build()));
                    }
                    if (op.type == StagedMutationType.INSERT) {
                        return this.removeStagedInsert(doc, span);
                    }
                }
                return this.checkAndHandleBlockingTxn(doc, span, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_REMOVING, existing).then(this.initATRIfNeeded(mayNeedToWriteATR, doc.collection(), doc.id(), span)).then(this.createStagedRemove(operationId, doc, span, doc.links().isDeleted()));
            }));
        });
    }

    private Mono<Void> checkAndHandleBlockingTxn(TransactionGetResult doc, SpanWrapper pspan, ForwardCompatibilityStages stage, Optional<StagedMutation> existingOpt) {
        if (doc.links().hasStagedWrite()) {
            if (doc.links().stagedTransactionId().get().equals(this.transactionId())) {
                if (doc.links().stagedAttemptId().get().equals(this.attemptId)) {
                    if (existingOpt.isPresent()) {
                        StagedMutation existing = existingOpt.get();
                        if (existing.doc.cas() != doc.cas()) {
                            this.LOGGER.info(this.attemptId, "concurrent op race detected on doc %s: have read a document before a concurrent op wrote its stagedMutation", DebugUtil.docId(doc));
                            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_CAS_MISMATCH).cause(new ConcurrentOperationsDetectedOnSameDocument()).build()));
                        }
                    } else {
                        this.LOGGER.info(this.attemptId, "concurrent op race detected on doc %s: can see the KV result of another op, but stagedMutation not yet written", DebugUtil.docId(doc));
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_CAS_MISMATCH).cause(new ConcurrentOperationsDetectedOnSameDocument()).build()));
                    }
                }
                this.LOGGER.info(this.attemptId, "doc %s has been written by a different attempt in transaction, ok to continue", DebugUtil.docId(doc));
                return Mono.empty();
            }
            if (doc.links().atrId().isPresent() && doc.links().atrBucketName().isPresent()) {
                this.LOGGER.info(this.attemptId, "doc %s is in another txn %s, checking ATR entry %s/%s/%s to see if blocked", DebugUtil.docId(doc), doc.links().stagedAttemptId().get(), doc.links().atrBucketName().orElse(""), doc.links().atrCollectionName().orElse(""), doc.links().atrId().orElse(""));
                return this.forwardCompatibilityCheck(stage, doc.links().forwardCompatibility()).then(this.checkATREntryForBlockingDoc(doc, pspan));
            }
            this.LOGGER.info(this.attemptId, "doc %s is in another txn %s, cannot check ATR entry - probably a bug, so proceeding to overwrite", DebugUtil.docId(doc), doc.links().stagedAttemptId().get());
            return Mono.empty();
        }
        return Mono.empty();
    }

    private List<JsonObject> listStagedToDocRecords(List<StagedMutation> docs) {
        return this.listToDocRecords(docs.stream().map(v -> v.doc).collect(Collectors.toList()));
    }

    private List<JsonObject> listToDocRecords(List<TransactionGetResult> docs) {
        return docs.stream().map(v -> JsonObject.create().put("id", v.id()).put("bkt", v.collection().bucketName()).put("scp", v.collection().scopeName()).put("col", v.collection().name())).collect(Collectors.toList());
    }

    private List<MutateInSpec> addDocsToBuilder() {
        String prefix = "attempts." + this.attemptId;
        return Arrays.asList(MutateInSpec.upsert((String)(prefix + "." + "ins"), this.listStagedToDocRecords(this.stagedInsertsLocked())).xattr(), MutateInSpec.upsert((String)(prefix + "." + "rep"), this.listStagedToDocRecords(this.stagedReplacesLocked())).xattr(), MutateInSpec.upsert((String)(prefix + "." + "rem"), this.listStagedToDocRecords(this.stagedRemovesLocked())).xattr());
    }

    @Stability.Volatile
    public Mono<Void> defer() {
        return Mono.fromRunnable(() -> {
            if (this.queryModeUnlocked()) {
                throw this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)new FeatureNotAvailableException("Deferred transactions are not supported when the transaction includes a query")).build());
            }
            String json = this.dehydrate(true).toString();
            this.LOGGER.info(this.attemptId, "deferring commit, serialized is %d chars", json.length());
            this.overall.serialized(TransactionSerializedContext.createFrom(json));
        }).then();
    }

    @Stability.Volatile
    Optional<TransactionSerializedContext> serialized() {
        return this.overall.serialized();
    }

    @Nullable
    private CleanupRequest createCleanupRequestIfNeeded() {
        block4: {
            block7: {
                block6: {
                    block5: {
                        block3: {
                            if (this.config.runRegularAttemptsCleanupThread()) break block3;
                            this.LOGGER.trace(this.attemptId(), "skipping addition of cleanup request on failure as regular cleanup disabled");
                            break block4;
                        }
                        if (!this.queryModeUnlocked()) break block5;
                        this.LOGGER.info(this.attemptId(), "Skipping cleanup request as in query mode");
                        break block4;
                    }
                    if (!this.serialized().isPresent()) break block6;
                    this.LOGGER.info(this.attemptId(), "Skipping cleanup request as deferred transaction");
                    break block4;
                }
                if (!this.atrId().isPresent() || !this.atrCollection().isPresent()) break block7;
                switch (this.state()) {
                    case NOT_STARTED: 
                    case COMPLETED: 
                    case ROLLED_BACK: {
                        this.LOGGER.trace(this.attemptId(), "Skipping addition of cleanup request in state %s", new Object[]{this.state()});
                        break block4;
                    }
                    default: {
                        this.LOGGER.trace(this.attemptId(), "Adding cleanup request for %s/%s", this.atrCollection().get().name(), this.atrId().get());
                        return this.createCleanupRequest();
                    }
                }
            }
            this.LOGGER.trace(this.attemptId(), "Skipping cleanup request as no ATR entry to remove (due to no mutations)");
        }
        return null;
    }

    private CleanupRequest createCleanupRequest() {
        assert (this.state != AttemptStates.NOT_STARTED);
        assert (this.state != AttemptStates.COMPLETED);
        long transactionElapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.overall.startTimeClient().toNanos());
        return new CleanupRequest(this.attemptId, this.atrId().get(), this.atrCollection().get(), this.state, this.toDocRecords(this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REPLACE).collect(Collectors.toList())), this.toDocRecords(this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REMOVE).collect(Collectors.toList())), this.toDocRecords(this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.INSERT).collect(Collectors.toList())), Duration.ZERO, Optional.empty(), transactionElapsedTimeMillis, Optional.of(this.config.durabilityLevel()));
    }

    public Mono<Void> commit() {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "user.commit", this.attemptSpan);
            return this.commitInternal(span);
        });
    }

    Mono<Void> implicitCommit(boolean singleQueryTransactionMode) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "commit.implicit", this.attemptSpan);
            if (this.hasStateBit(TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED)) {
                return Mono.just((Object)this);
            }
            if (singleQueryTransactionMode) {
                return Mono.just((Object)this);
            }
            if (this.serialized().isPresent()) {
                return Mono.just((Object)this);
            }
            this.LOGGER.info(this.attemptId(), "doing implicit commit");
            return this.commitInternal(span);
        }).then();
    }

    Mono<Void> commitInternal(SpanWrapper span) {
        return this.createMonoBridge("commit", Mono.defer(() -> {
            this.assertNotLocked("commit");
            return this.waitForAllOpsThenDoUnderLock("commit", span, () -> this.commitInternalLocked(span));
        }));
    }

    private Mono<Void> commitInternalLocked(SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("commitInternal");
            TransactionOperationFailed returnEarly = this.canPerformCommit("commit");
            if (returnEarly != null) {
                this.logger().info(this.attemptId, "commit raising %s", DebugUtil.dbg(returnEarly));
                return Mono.error((Throwable)returnEarly);
            }
            this.setStateBits("commit", TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED | TRANSACTION_STATE_BIT_APP_ROLLBACK_NOT_ALLOWED, 0);
            if (this.queryModeLocked()) {
                return this.commitWithQueryLocked(span);
            }
            this.LOGGER.info(this.attemptId, "commit %s", this);
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_BEFORE_COMMIT, Optional.empty());
            if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                return Mono.create(s -> {
                    this.LOGGER.info(this.attemptId, "calling commit on attempt that's got no mutations, skipping");
                    s.success();
                });
            }
            return this.commitActualLocked(span);
        }).subscribeOn(SchedulerUtil.scheduler);
    }

    private Mono<Void> commitActualLocked(SpanWrapper span) {
        return Mono.defer(() -> {
            String prefix = "attempts." + this.attemptId;
            ArrayList<MutateInSpec> specs = new ArrayList<MutateInSpec>();
            specs.add((MutateInSpec)MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.COMMITTED.name()).xattr());
            specs.add((MutateInSpec)MutateInSpec.upsert((String)(prefix + "." + "tsc"), (Object)MutateInMacro.CAS));
            specs.addAll(this.addDocsToBuilder());
            specs.add((MutateInSpec)MutateInSpec.insert((String)(prefix + "." + "p"), (Object)0).xattr());
            AtomicReference<Long> overallStartTime = new AtomicReference<Long>(0L);
            return this.atrCommitLocked(specs, overallStartTime, span).then(this.commitDocsLocked(span)).then(this.atrCompleteLocked(prefix, overallStartTime, span)).doOnSuccess(ignore -> this.LOGGER.info(this.attemptId, "overall commit completed")).doFinally(v -> span.finish()).then();
        });
    }

    private Mono<Void> commitWithQueryLocked(SpanWrapper span) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapperLocked(sidx, null, "COMMIT", QueryOptions.queryOptions(), HOOK_QUERY_COMMIT, false, false, null, null, span, false, null, true).doOnNext(v -> this.setStateLocked(AttemptStates.COMPLETED)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    TransactionOperationFailed e = this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).cause((Throwable)err).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).doNotRollbackAttempt().build());
                    return Mono.error((Throwable)e);
                }
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                TransactionOperationFailed e = this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().build());
                return Mono.error((Throwable)e);
            }).then();
        });
    }

    private void checkExpiryDuringCommitOrRollbackLocked(String stage, Optional<String> id) {
        this.assertLocked("checkExpiryDuringCommitOrRollbackLocked in stage " + stage);
        if (!this.expiryOvertimeMode) {
            if (this.hasExpiredClientSide(stage, id)) {
                this.LOGGER.info(this.attemptId, "has expired in stage %s, entering expiry-overtime mode (one attempt to complete)", stage);
                this.expiryOvertimeMode = true;
            }
        } else {
            this.LOGGER.info(this.attemptId, "ignoring expiry in stage %s, as in expiry-overtime mode", stage);
        }
    }

    private Mono<Void> atrCompleteLocked(String prefix, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrComplete");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.complete", pspan);
            this.LOGGER.info(this.attemptId, "about to remove ATR entry %s", this.getAtrDebug(this.atrCollection, this.atrId));
            return Mono.defer(() -> {
                if (!this.expiryOvertimeMode && this.hasExpiredClientSide(HOOK_ATR_COMPLETE, Optional.empty())) {
                    String msg = "has expired in stage atrComplete, but transaction has successfully completed so returning success";
                    this.LOGGER.info(this.attemptId, msg);
                    return Mono.error((Throwable)new AttemptExpired(this, msg));
                }
                return Mono.empty();
            }).then(this.beforeAtrComplete(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), Collections.singletonList(MutateInSpec.remove((String)prefix).xattr()), (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext(HOOK_ATR_COMPLETE)))).publishOn(SchedulerUtil.scheduler).flatMap(v -> this.afterAtrComplete(this)).doOnNext(v -> {
                this.setStateLocked(AttemptStates.COMPLETED);
                long now = System.nanoTime();
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "removed ATR %s in %dus, overall commit completed in %dus", this.getAtrDebug(this.atrCollection, this.atrId), elapsed, TimeUnit.NANOSECONDS.toMicros(now - (Long)overallStartTime.get()));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                this.LOGGER.info(this.attemptId, "error '%s' ec=%s while removing ATR %s", new Object[]{err, ec, this.getAtrDebug(this.atrCollection, this.atrId)});
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).doNotRollbackAttempt().build()));
                }
                this.LOGGER.info(this.attemptId, "ignoring error during transaction tidyup, regarding as success");
                return Mono.empty();
            }).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private <T> Mono<T> mapErrorInOvertimeToExpired(boolean updateAppState, String stage, Throwable err, TransactionOperationFailed.FinalErrorToRaise toRaise) {
        this.LOGGER.info(this.attemptId, "in expiry-overtime mode so changing error '%s' to raise %s in stage '%s'; no rollback will be tried", new Object[]{err, toRaise, stage});
        if (!this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "not in expiry-overtime mode handling error '%s' in stage %s, possibly a bug", err, stage);
        }
        return Mono.error((Throwable)this.operationFailed(updateAppState, TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpired(this, err)).build()));
    }

    private Mono<Void> removeDocLocked(SpanWrapper pspan, TransactionGetResult doc, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            this.assertLocked("removeDoc");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "commit.remove", pspan);
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "about to remove doc %s, ambiguityResolutionMode=%s", DebugUtil.docId(doc), ambiguityResolutionMode);
                this.checkExpiryDuringCommitOrRollbackLocked(HOOK_REMOVE_DOC, Optional.of(doc.id()));
            }).then(this.beforeDocRemoved(this, doc.id())).then(doc.collection().remove(doc.id(), this.wrap(RemoveOptions.removeOptions(), pspan, doc.collection().core()))).flatMap(mutationResult -> this.afterDocRemovedPreRetry(this, doc.id()).thenReturn(mutationResult)).doOnNext(mutationResult -> {
                this.LOGGER.info(this.attemptId, "commit - removed doc %s, mt = %s", DebugUtil.docId(doc), mutationResult.mutationToken());
                mutationResult.mutationToken().ifPresent(mt -> this.finalMutationTokens.add((MutationToken)mt));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                this.LOGGER.info("got error while removing doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, HOOK_REMOVE_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)SchedulerUtil.scheduler).then(this.removeDocLocked(span, doc, true));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(e.build()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(e.build()));
                }
                return Mono.error((Throwable)this.operationFailed(e.build()));
            }).then(this.afterDocRemovedPostRetry(this, doc.id())).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> commitDocsLocked(SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("commitDocs");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "commit.docs", pspan);
            return Flux.fromIterable(this.stagedMutationsLocked).publishOn(SchedulerUtil.scheduler).concatMap(staged -> {
                TransactionGetResult doc = staged.doc;
                return this.commitDocWrapperLocked(span, (StagedMutation)staged, doc);
            }).then(Mono.defer(() -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "commit - all %d docs committed in %dus", this.stagedMutationsLocked.size(), elapsed);
                return this.afterDocsCommitted(this);
            })).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private static String msgDocChangedUnexpectedly(ReactiveCollection collection, String id, String dbg) {
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been modified by another party in-between staging and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The change will be committed with CAS=0, which will overwrite the other change.  This document may need manual review to verify that no changes have been lost.  Last document state=" + dbg;
    }

    private static String msgDocRemovedUnexpectedly(ReactiveCollection collection, String id, String dbg) {
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been removed by another party in-between replacing and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The document will be left removed, and the transaction's changes will not be written to this document.  Last document state=" + dbg;
    }

    private Mono<Void> commitDocWrapperLocked(SpanWrapper pspan, StagedMutation staged, TransactionGetResult doc) {
        return Mono.defer(() -> {
            if (staged.type == StagedMutationType.REMOVE) {
                return this.removeDocLocked(pspan, doc, false);
            }
            return this.commitDocLocked(pspan, staged, doc, false, staged.type == StagedMutationType.INSERT, false);
        });
    }

    private String dbg(@Nullable MutationResult result) {
        if (result == null) {
            return "<unavailable>";
        }
        StringBuilder sb = new StringBuilder();
        sb.append("cas=");
        sb.append(result.cas());
        result.mutationToken().ifPresent(mt -> {
            sb.append(",seqno=");
            sb.append(mt.sequenceNumber());
            sb.append(",vbucket=");
            sb.append(mt.partitionID());
        });
        return sb.toString();
    }

    private Mono<Void> commitDocLocked(SpanWrapper pspan, StagedMutation staged, TransactionGetResult doc, boolean casZeroMode, boolean insertMode, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            this.assertLocked("commitDoc");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "commit.doc", pspan);
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "commit - committing doc %s, casZeroMode=%s, insertMode=%s, ambiguity-resolution=%s", DebugUtil.docId(doc), casZeroMode, insertMode, ambiguityResolutionMode);
                this.checkExpiryDuringCommitOrRollbackLocked(HOOK_COMMIT_DOC, Optional.of(doc.id()));
            }).then(this.beforeDocCommitted(this, doc.id())).then(Mono.defer(() -> {
                if (insertMode) {
                    return doc.collection().insert(doc.id(), (Object)staged.content, OptionsWrapperUtil.wrap(InsertOptions.insertOptions(), span, this.config, doc.collection().core()).transcoder((Transcoder)RawJsonTranscoder.INSTANCE));
                }
                return doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.upsert((String)"txn", null).xattr(), MutateInSpec.remove((String)"txn").xattr(), MutateInSpec.replace((String)"", (Object)staged.content)), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("commitDoc"))).cas(casZeroMode ? 0L : doc.cas()), span, doc.collection().core()));
            })).publishOn(SchedulerUtil.scheduler).flatMap(v -> this.afterDocCommittedBeforeSavingCAS(this, doc.id()).thenReturn(v)).flatMap(updatedDoc -> {
                this.LOGGER.info(this.attemptId, "commit - committed doc %s got %s", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc));
                doc.cas(updatedDoc.cas());
                updatedDoc.mutationToken().ifPresent(mt -> this.finalMutationTokens.add((MutationToken)mt));
                return this.afterDocCommitted(this, doc.id());
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                this.LOGGER.info(this.attemptId, "error while committing doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, HOOK_COMMIT_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).thenReturn((Object)0);
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    this.LOGGER.warn(this.attemptId, "%s while committing doc %s: as op is ambiguously successful, retrying op in ambiguity-resolution mode", DebugUtil.dbg(err), DebugUtil.docId(doc));
                    return this.commitDocLocked(span, staged, doc, casZeroMode, insertMode, true).thenReturn((Object)0);
                }
                if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    if (ambiguityResolutionMode) {
                        return Mono.error((Throwable)this.operationFailed(e.build()));
                    }
                    String msg = AttemptContextReactive.msgDocChangedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                    this.LOGGER.warn(this.attemptId, msg);
                    this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)SchedulerUtil.scheduler).then(this.commitDocLocked(span, staged, doc, true, false, ambiguityResolutionMode).thenReturn((Object)0));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    String msg = AttemptContextReactive.msgDocRemovedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                    this.LOGGER.warn(this.attemptId, msg);
                    this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)SchedulerUtil.scheduler).then(this.commitDocLocked(span, staged, doc, false, true, ambiguityResolutionMode).thenReturn((Object)0));
                }
                if (ec == ErrorClasses.FAIL_DOC_ALREADY_EXISTS) {
                    if (ambiguityResolutionMode) {
                        return Mono.error((Throwable)this.operationFailed(e.build()));
                    }
                    String msg = AttemptContextReactive.msgDocRemovedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                    this.LOGGER.warn(this.attemptId, msg);
                    this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)SchedulerUtil.scheduler).then(this.commitDocLocked(span, staged, doc, true, false, ambiguityResolutionMode).thenReturn((Object)0));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(e.build()));
                }
                return Mono.error((Throwable)this.operationFailed(e.build()));
            }).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> atrCommitAmbiguityResolutionLocked(AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.commit_ambiguity_resolution", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to fetch status of ATR %s to resolve ambiguity, expiryOvertimeMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode);
                overallStartTime.set(System.nanoTime());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_COMMIT_AMBIGUITY_RESOLUTION, Optional.empty());
            }).then(this.beforeAtrCommitAmbiguityResolution(this)).then(this.atrCollection.get().lookupIn(this.atrId.get(), Collections.singletonList(LookupInSpec.get((String)("attempts." + this.attemptId + "." + "st")).xattr()), (LookupInOptions)LookupInOptions.lookupInOptions().serializer((JsonSerializer)SerializationUtil.DEFAULT_JSON_SERIALIZER).parentSpan(span.span()))).flatMap(result -> {
                String status = (String)result.contentAs(0, String.class);
                this.LOGGER.info(this.attemptId, "got status of ATR %s: '%s'", this.getAtrDebug(this.atrCollection, this.atrId), status);
                AttemptStates state = AttemptStates.convert(status);
                switch (state) {
                    case COMMITTED: {
                        return Mono.empty();
                    }
                    case ABORTED: {
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).retryTransaction().build()));
                    }
                }
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).doNotRollbackAttempt().cause(new IllegalStateException("This transaction has been changed by another actor to be in unexpected state " + status)).build()));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause((Throwable)err);
                if (err instanceof RetryAtrCommit || ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                this.LOGGER.info(this.attemptId, "error while resolving ATR %s ambiguity in %dus: %s", this.getAtrDebug(this.atrCollection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(new AttemptExpired(this, (Throwable)err)).build()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT || ec == ErrorClasses.FAIL_OTHER) {
                    return Mono.error((Throwable)new RetryOperation());
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause(new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause(new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
                }
                return Mono.error((Throwable)this.operationFailed(builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).publishOn(SchedulerUtil.scheduler).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> atrCommitLocked(List<MutateInSpec> specs, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrCommit");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.commit", pspan);
            AtomicBoolean ambiguityResolutionMode = new AtomicBoolean(false);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to set ATR %s to Committed, expiryOvertimeMode=%s, ambiguityResolutionMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode, ambiguityResolutionMode);
                overallStartTime.set(System.nanoTime());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_COMMIT, Optional.empty());
            }).then(this.beforeAtrCommit(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), specs, (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrCommit")))).publishOn(SchedulerUtil.scheduler).flatMap(v -> this.afterAtrCommit(this)).doOnNext(v -> {
                this.setStateLocked(AttemptStates.COMMITTED);
                this.LOGGER.info(this.attemptId, "set ATR %s to Committed in %dus", this.getAtrDebug(this.atrCollection, this.atrId), TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - (Long)overallStartTime.get()));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while setting ATR %s to Committed in %dus: %s", this.getAtrDebug(this.atrCollection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    TransactionOperationFailed.FinalErrorToRaise toRaise = ambiguityResolutionMode.get() ? TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS : TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED;
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpired(this, (Throwable)err)).build()));
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    ambiguityResolutionMode.set(true);
                    return Mono.error((Throwable)new RetryOperation());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    if (ambiguityResolutionMode.get()) {
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause((Throwable)err).build()));
                    }
                    return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().build()));
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    if (ambiguityResolutionMode.get()) {
                        throw new RetryOperation();
                    }
                    return Mono.error((Throwable)this.operationFailed(builder.retryTransaction().build()));
                }
                if (ec == ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                    return this.atrCommitAmbiguityResolutionLocked(overallStartTime, pspan).onErrorResume(e -> {
                        if (e instanceof RetryAtrCommit) {
                            ambiguityResolutionMode.set(false);
                            throw new RetryOperation();
                        }
                        return Mono.error((Throwable)e);
                    });
                }
                Throwable cause = err;
                boolean rollback = true;
                switch (ec) {
                    case FAIL_PATH_NOT_FOUND: {
                        cause = new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_DOC_NOT_FOUND: {
                        cause = new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_ATR_FULL: {
                        cause = new ActiveTransactionRecordFull(this, (Throwable)err);
                        rollback = false;
                    }
                }
                if (ambiguityResolutionMode.get()) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(cause).build()));
                }
                return Mono.error((Throwable)this.operationFailed(builder.cause(cause).rollbackAttempt(rollback).build()));
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(SchedulerUtil.scheduler).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private <T> Mono<T> setExpiryOvertimeMode(String stage) {
        return Mono.fromRunnable(() -> {
            this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage %s", stage);
            this.expiryOvertimeMode = true;
        });
    }

    private <T> Mono<T> setExpiryOvertimeModeAndFail(Throwable err, String stage, ErrorClasses ec) {
        this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage %s, and raising error", stage);
        this.expiryOvertimeMode = true;
        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ec).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).cause(new AttemptExpired(this, err)).build()));
    }

    public Mono<Void> rollback() {
        return this.createMonoBridge("rollback", Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "user.rollback", this.attemptSpan);
            return this.waitForAllOpsThenDoUnderLock("app-rollback", span, () -> this.rollbackInternalLocked(true, span));
        }));
    }

    Mono<Void> rollbackAuto() {
        return this.createMonoBridge("rollbackAuto", Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "rollback.auto", this.attemptSpan);
            return this.waitForAllOpsThenDoUnderLock("auto-rollback", span, () -> this.rollbackInternalLocked(false, span));
        }));
    }

    private Mono<Void> rollbackInternalLocked(boolean isAppRollback, SpanWrapper span) {
        return Mono.defer(() -> {
            TransactionOperationFailed returnEarly = this.canPerformRollback("rollbackInternal", isAppRollback);
            if (returnEarly != null) {
                this.logger().info(this.attemptId, "rollback raising %s", DebugUtil.dbg(returnEarly));
                return Mono.error((Throwable)returnEarly);
            }
            int sb = TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED | TRANSACTION_STATE_BIT_APP_ROLLBACK_NOT_ALLOWED;
            this.setStateBits("rollback-" + (isAppRollback ? "app" : "auto"), sb, 0);
            if (this.state == AttemptStates.NOT_STARTED && !this.queryModeUnlocked()) {
                this.LOGGER.info(this.attemptId, "told to auto-rollback but in NOT_STARTED state, so nothing to do - skipping rollback");
                return Mono.empty();
            }
            if (this.queryModeLocked()) {
                return this.rollbackQueryLocked(isAppRollback);
            }
            return this.rollbackWithKVLocked(isAppRollback, span);
        }).subscribeOn(SchedulerUtil.scheduler);
    }

    private Mono<Void> rollbackWithKVLocked(boolean isAppRollback, SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("rollbackWithKV");
            this.LOGGER.info(this.attemptId, "rollback %s expiryOvertimeMode=%s isAppRollback=%s", this, this.expiryOvertimeMode, isAppRollback);
            if (!this.expiryOvertimeMode && this.hasExpiredClientSide(HOOK_ROLLBACK, Optional.empty())) {
                this.LOGGER.info(this.attemptId, "has expired before rollback, entering expiry-overtime mode");
                this.expiryOvertimeMode = true;
            }
            if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                return Mono.create(s -> {
                    this.LOGGER.info(this.attemptId, "Calling rollback when it's had no mutations, so nothing to do");
                    s.success();
                });
            }
            return this.rollbackWithKVActual(isAppRollback, span);
        });
    }

    private Mono<Void> rollbackWithKVActual(boolean isAppRollback, SpanWrapper span) {
        String prefix = "attempts." + this.attemptId;
        return this.atrAbortLocked(prefix, span, isAppRollback, false).then(this.rollbackDocsLocked(isAppRollback, span)).then(this.atrRollbackCompleteLocked(isAppRollback, prefix, span)).onErrorResume(err -> {
            if (err instanceof ActiveTransactionRecordNotFound) {
                this.LOGGER.info(this.attemptId, "ActiveTransactionRecordNotFound indicates that nothing needs to be done for this rollback: treating as successful rollback");
                return Mono.empty();
            }
            return Mono.error((Throwable)err);
        });
    }

    private Mono<Void> rollbackQueryLocked(boolean appRollback) {
        return Mono.defer(() -> {
            this.assertLocked("rollbackQuery");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "rollback_query", this.attemptSpan);
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapperLocked(statementIdx, null, "ROLLBACK", QueryOptions.queryOptions(), HOOK_QUERY_ROLLBACK, false, false, null, null, span, false, null, appRollback).then(Mono.fromRunnable(() -> this.setStateLocked(AttemptStates.ROLLED_BACK))).onErrorResume(err -> {
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof AttemptNotFoundOnQuery) {
                    return Mono.empty();
                }
                TransactionOperationFailed e = this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)err).doNotRollbackAttempt().build());
                return Mono.error((Throwable)e);
            }).doFinally(v -> span.finish()).then();
        });
    }

    private Mono<Void> atrRollbackCompleteLocked(boolean isAppRollback, String prefix, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrRollbackComplete");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.rollback_complete", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "marking ATR %s as rollback complete", this.getAtrDebug(this.atrCollection, this.atrId));
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_ROLLBACK_COMPLETE, Optional.empty());
            }).then(this.beforeAtrRolledBack(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), Collections.singletonList(MutateInSpec.remove((String)prefix).xattr()), (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext(HOOK_ATR_ROLLBACK_COMPLETE)))).publishOn(SchedulerUtil.scheduler).flatMap(v -> this.afterAtrRolledBack(this)).doOnNext(v -> {
                this.setStateLocked(AttemptStates.ROLLED_BACK);
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "rollback - atr rolled back in %dus", elapsed);
            }).onErrorResume(err -> {
                this.LOGGER.info(this.attemptId, "error while marking ATR %s as rollback complete in %dus: %s", this.getAtrDebug(this.atrCollection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt();
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(isAppRollback, HOOK_ATR_ROLLBACK_COMPLETE, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build()));
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND || ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, error.build()));
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(SchedulerUtil.scheduler).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> rollbackDocsLocked(boolean isAppRollback, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "rollback.docs", pspan);
            return Flux.fromIterable(this.stagedMutationsLocked).publishOn(SchedulerUtil.scheduler).concatMap(staged -> {
                TransactionGetResult doc = staged.doc;
                switch (staged.type) {
                    case INSERT: {
                        return this.rollbackStagedInsertLocked(isAppRollback, span, doc);
                    }
                }
                return this.rollbackStagedReplaceOrRemoveLocked(isAppRollback, span, doc);
            }).doOnNext(v -> this.LOGGER.info(this.attemptId, "rollback - docs rolled back")).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> rollbackStagedReplaceOrRemoveLocked(boolean isAppRollback, SpanWrapper pspan, TransactionGetResult doc) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "rollback.doc", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "rolling back doc %s with cas %d by removing staged mutation", DebugUtil.docId(doc), doc.cas());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ROLLBACK_DOC, Optional.of(doc.id()));
            }).then(this.beforeDocRolledBack(this, doc.id())).then(doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("rollbackDoc"))).cas(doc.cas()), span, doc.collection().core()))).publishOn(SchedulerUtil.scheduler).flatMap(updatedDoc -> this.afterRollbackReplaceOrRemove(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> this.LOGGER.info(this.attemptId, "rolled back doc %s, got cas %d and mt %s", DebugUtil.docId(doc), updatedDoc.cas(), updatedDoc.mutationToken())).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause((Throwable)err);
                this.logger().info(this.attemptId, "got error while rolling back doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(isAppRollback, HOOK_ROLLBACK_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeMode(HOOK_ROLLBACK_DOC).then(Mono.error((Throwable)new RetryOperation()));
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    this.LOGGER.info(this.attemptId, "got PATH_NOT_FOUND while cleaning up staged doc %s, it must have already been rolled back, continuing", DebugUtil.docId(doc));
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.build()));
                }
                if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.build()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(SchedulerUtil.scheduler).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Publisher<Void> rollbackStagedInsertLocked(boolean isAppRollback, SpanWrapper pspan, TransactionGetResult doc) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "rollback.insert", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "rolling back staged insert %s with cas %d", DebugUtil.docId(doc), doc.cas());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_DELETE_INSERTED, Optional.of(doc.id()));
            }).then(this.beforeRollbackDeleteInserted(this, doc.id())).then(Mono.defer(() -> doc.collection().mutateIn(doc.id(), Collections.singletonList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(MutateInOptions.mutateInOptions(), span, doc.collection().core()).accessDeleted(true).cas(doc.cas())))).publishOn(SchedulerUtil.scheduler).flatMap(updatedDoc -> this.afterRollbackDeleteInserted(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> this.LOGGER.info(this.attemptId, "deleted inserted doc %s, mt %s", DebugUtil.docId(doc), updatedDoc.mutationToken())).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while rolling back inserted doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(isAppRollback, HOOK_REMOVE_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeMode(HOOK_REMOVE).then(Mono.error((Throwable)new RetryOperation()));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    this.LOGGER.info(this.attemptId, "got %s while removing staged insert doc %s, it must have already been rolled back, continuing", new Object[]{ec, DebugUtil.docId(doc)});
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.doNotRollbackAttempt().build()));
                }
                if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(SchedulerUtil.scheduler).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> removeStagedInsert(TransactionGetResult doc, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertNotLocked("removeStagedInsert");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "staging.remove_staged_insert", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "removing staged insert %s with cas %d", DebugUtil.docId(doc), doc.cas());
                if (this.hasExpiredClientSide(HOOK_REMOVE_STAGED_INSERT, Optional.of(doc.id()))) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpired(this, "Attempt has expired in stage " + HOOK_REMOVE_STAGED_INSERT)).build()));
                }
                return Mono.empty();
            }).then(this.beforeRemoveStagedInsert(this, doc.id())).then(Mono.defer(() -> doc.collection().mutateIn(doc.id(), Collections.singletonList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(MutateInOptions.mutateInOptions(), pspan, doc.collection().core()).accessDeleted(true).cas(doc.cas())))).publishOn(SchedulerUtil.scheduler).flatMap(updatedDoc -> this.afterRemoveStagedInsert(this, doc.id()).thenReturn(updatedDoc)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).retryTransaction().cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while removing staged insert doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)this.operationFailed(builder.build()));
            }).flatMap(v -> {
                doc.cas(v.cas());
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "removed staged insert from doc %s in %dus", DebugUtil.docId(doc), elapsed);
                return this.doUnderLock("removeStagedInsert " + DebugUtil.docId(doc), null, () -> Mono.fromRunnable(() -> this.removeStagedMutationLocked(doc)));
            }).then();
        });
    }

    private Mono<Void> atrAbortLocked(String prefix, SpanWrapper pspan, boolean isAppRollback, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            this.assertLocked("atrAbort");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.abort", pspan);
            ArrayList<Object> spec = new ArrayList<Object>();
            spec.add(MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.ABORTED.name()).xattr());
            spec.add(MutateInSpec.upsert((String)(prefix + "." + "tsrs"), (Object)MutateInMacro.CAS));
            spec.addAll(this.addDocsToBuilder());
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "aborting ATR %s isAppRollback=%s ambiguityResolutionMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), isAppRollback, ambiguityResolutionMode);
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_ABORT, Optional.empty());
            }).then(this.beforeAtrAborted(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), spec, (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrAbort")))).publishOn(SchedulerUtil.scheduler).then(this.afterAtrAborted(this)).doOnNext(v -> {
                this.setStateLocked(AttemptStates.ABORTED);
                this.LOGGER.info(this.attemptId, "aborted ATR %s", this.getAtrDebug(this.atrCollection, this.atrId));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt();
                this.LOGGER.info(this.attemptId, "error %s while aborting ATR %s", DebugUtil.dbg(err), this.getAtrDebug(this.atrCollection, this.atrId));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(isAppRollback, HOOK_ATR_ABORT, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeMode(HOOK_ATR_ABORT).then(Mono.error((Throwable)new RetryOperation()));
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.cause(new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId)).build()));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.cause(new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId)).build()));
                }
                if (ec == ErrorClasses.FAIL_ATR_FULL) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.cause(new ActiveTransactionRecordFull(this)).build()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(SchedulerUtil.scheduler).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private void assertLocked(String dbg) {
        if (this.threadSafetyEnabled && !this.mutex.isLocked()) {
            throw new IllegalStateException("Internal bug hit: mutex must be locked in " + dbg + " but isn't");
        }
    }

    private void assertNotLocked(String dbg) {
        if (this.threadSafetyEnabled && this.mutex.debugAsSingleThreaded() && this.mutex.isLocked()) {
            throw new IllegalStateException("Internal bug hit: mutex must be unlocked in " + dbg + " but isn't");
        }
    }

    private void assertNotQueryMode(String dbg) {
        if (this.queryModeLocked()) {
            throw new IllegalStateException("Internal bug hit: must not be in queryMode in " + dbg);
        }
    }

    @Nullable
    TransactionOperationFailed canPerformOperation(String dbg) {
        return this.canPerformOperation(dbg, true);
    }

    @Nullable
    TransactionOperationFailed canPerformOperation(String dbg, boolean canPerformPendingCheck) {
        switch (this.state) {
            case NOT_STARTED: 
            case PENDING: {
                if (!canPerformPendingCheck || !this.hasStateBit(TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED)) break;
                this.logger().info(this.attemptId, "failing operation %s as not allowed to commit (probably as previous operations have failed)", dbg);
                return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new PreviousOperationFailed()).build();
            }
            case COMPLETED: 
            case COMMITTED: {
                return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new TransactionAlreadyCommitted()).doNotRollbackAttempt().build();
            }
            case ROLLED_BACK: 
            case ABORTED: {
                return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new TransactionAlreadyAborted()).doNotRollbackAttempt().build();
            }
        }
        return null;
    }

    @Nullable
    TransactionOperationFailed canPerformRollback(String dbg, boolean appRollback) {
        if (appRollback && this.hasStateBit(TRANSACTION_STATE_BIT_APP_ROLLBACK_NOT_ALLOWED)) {
            this.LOGGER.info(this.attemptId, "state bits indicate app-rollback is not allowed");
            return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new RollbackNotPermitted()).doNotRollbackAttempt().build();
        }
        TransactionOperationFailed out = this.canPerformOperation(dbg, false);
        if (out != null) {
            return out;
        }
        return null;
    }

    @Nullable
    TransactionOperationFailed canPerformCommit(String dbg) {
        if (this.hasStateBit(TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED)) {
            this.LOGGER.info(this.attemptId, "state bits indicate commit is not allowed");
            return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new CommitNotPermitted()).doNotRollbackAttempt().build();
        }
        TransactionOperationFailed out = this.canPerformOperation(dbg);
        if (out != null) {
            return out;
        }
        return null;
    }

    private boolean hasStateBit(int stateBit) {
        return (this.stateBits.get() & stateBit) != 0;
    }

    private void setStateBits(String dbg, int newBehaviourFlags, int newFinalErrorToRaise) {
        int oldValue = this.stateBits.get();
        int newValue = oldValue | newBehaviourFlags;
        if (newFinalErrorToRaise > (oldValue & STATE_BITS_MASK_FINAL_ERROR) >> STATE_BITS_POSITION_FINAL_ERROR) {
            newValue = newValue & STATE_BITS_MASK_BITS | newFinalErrorToRaise << STATE_BITS_POSITION_FINAL_ERROR;
        }
        while (!this.stateBits.compareAndSet(oldValue, newValue)) {
            oldValue = this.stateBits.get();
            newValue = oldValue | newBehaviourFlags;
            if (newFinalErrorToRaise <= (oldValue & STATE_BITS_MASK_FINAL_ERROR) >> STATE_BITS_POSITION_FINAL_ERROR) continue;
            newValue = newValue & STATE_BITS_MASK_BITS | newFinalErrorToRaise << STATE_BITS_POSITION_FINAL_ERROR;
        }
        boolean wasShouldNotRollback = (oldValue & TRANSACTION_STATE_BIT_SHOULD_NOT_ROLLBACK) != 0;
        boolean wasShouldNotRetry = (oldValue & TRANSACTION_STATE_BIT_SHOULD_NOT_RETRY) != 0;
        boolean wasShouldNotCommit = (oldValue & TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED) != 0;
        boolean wasAppRollbackNotAllowed = (oldValue & TRANSACTION_STATE_BIT_APP_ROLLBACK_NOT_ALLOWED) != 0;
        TransactionOperationFailed.FinalErrorToRaise wasToRaise = TransactionOperationFailed.FinalErrorToRaise.values()[(oldValue & STATE_BITS_MASK_FINAL_ERROR) >> STATE_BITS_POSITION_FINAL_ERROR];
        boolean shouldNotRollback = (newValue & TRANSACTION_STATE_BIT_SHOULD_NOT_ROLLBACK) != 0;
        boolean shouldNotRetry = (newValue & TRANSACTION_STATE_BIT_SHOULD_NOT_RETRY) != 0;
        boolean shouldNotCommit = (newValue & TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED) != 0;
        boolean appRollbackNotAllowed = (newValue & TRANSACTION_STATE_BIT_APP_ROLLBACK_NOT_ALLOWED) != 0;
        TransactionOperationFailed.FinalErrorToRaise toRaise = TransactionOperationFailed.FinalErrorToRaise.values()[(newValue & STATE_BITS_MASK_FINAL_ERROR) >> STATE_BITS_POSITION_FINAL_ERROR];
        StringBuilder sb = new StringBuilder("changed state bits in ").append(dbg).append(", changed");
        if (wasShouldNotRollback != shouldNotRollback) {
            sb.append(" shouldNotRollback to ").append(shouldNotRollback);
        }
        if (wasShouldNotRetry != shouldNotRetry) {
            sb.append(" shouldNotRetry to ").append(shouldNotRetry);
        }
        if (wasShouldNotCommit != shouldNotCommit) {
            sb.append(" shouldNotCommit to ").append(shouldNotCommit);
        }
        if (wasAppRollbackNotAllowed != appRollbackNotAllowed) {
            sb.append(" appRollbackNotAllowed to ").append(appRollbackNotAllowed);
        }
        if (wasToRaise != toRaise) {
            sb.append(" toRaise from ").append((Object)wasToRaise).append(" to ").append((Object)toRaise);
        }
        this.LOGGER.info(this.attemptId, sb.toString());
    }

    TransactionOperationFailed operationFailed(boolean updateInternalState, TransactionOperationFailed err) {
        if (updateInternalState) {
            return this.operationFailed(err);
        }
        return err;
    }

    TransactionOperationFailed operationFailed(TransactionOperationFailed err) {
        int sb = TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED;
        if (!err.autoRollbackAttempt()) {
            sb |= TRANSACTION_STATE_BIT_SHOULD_NOT_ROLLBACK;
        }
        if (!err.retryTransaction()) {
            sb |= TRANSACTION_STATE_BIT_SHOULD_NOT_RETRY;
        }
        this.setStateBits("operationFailed", sb, err.toRaise().ordinal());
        return err;
    }

    AttemptStates state() {
        return this.state;
    }

    boolean queryModeLocked() {
        this.assertLocked("queryMode");
        return this.queryTarget != null;
    }

    boolean queryModeUnlocked() {
        return this.queryTarget != null;
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(String statement) {
        return this.query(statement, TransactionQueryOptions.queryOptions());
    }

    Mono<QueryResult> queryBlocking(String statement, TransactionQueryOptions options) {
        return this.doQueryOperation("query blocking", (sidx, lockToken) -> this.queryWrapperLocked((int)sidx, null, statement, options.builder(), HOOK_QUERY, false, true, null, null, null, false, (AtomicReference<ReactiveLock.Waiter>)lockToken, true));
    }

    Mono<QueryResult> queryBlocking(Scope scope, String statement, TransactionQueryOptions options, boolean tximplicit) {
        return this.doQueryOperation("query blocking (2)", (sidx, lockToken) -> this.queryWrapperLocked((int)sidx, scope, statement, options.builder(), HOOK_QUERY, false, true, null, null, null, tximplicit, (AtomicReference<ReactiveLock.Waiter>)lockToken, true));
    }

    private String debugMetrics(QueryMetaData md) {
        StringBuilder sb = new StringBuilder();
        md.metrics().ifPresent(m -> sb.append(m));
        md.profile().ifPresent(p -> {
            sb.append(", profile=");
            sb.append(p);
        });
        return sb.toString();
    }

    private Duration queryTimeout() {
        return Duration.ofMillis(this.expiryRemainingMillis()).plus(this.config.keyValueTimeout().orElse(this.cluster().environment().timeoutConfig().kvDurableTimeout())).plusSeconds(1L);
    }

    private Mono<ReactiveQueryResult> queryInternalReactive(int sidx, @Nullable ReactiveScope scope, String statement, TransactionQueryOptions options, @Nullable SpanWrapper pspan, boolean tximplicit) {
        return this.beforeQuery(this, statement).then(Mono.defer(() -> {
            QueryOptions queryOptions = options.builder();
            if (tximplicit) {
                AttemptContextReactive.applyQueryOptions(this.config, queryOptions, this.expiryRemainingMillis());
            }
            queryOptions.metrics(true);
            QueryOptions.Built built = ((QueryOptions)queryOptions.parentSpan(pspan == null ? this.attemptSpan.span() : pspan.span())).build();
            JsonSerializer serializer = built.serializer() == null ? this.cluster().environment().jsonSerializer() : built.serializer();
            String queryContext = scope == null ? null : "`default`:`" + scope.bucketName() + "`.`" + scope.name() + "`";
            QueryRequest request = QueryAccessor.targetedQueryRequest(statement, built, queryContext, this.queryTarget, this.queryTimeout(), this.cluster(), tximplicit);
            return SDKAccessUtil.queryAccessor(this.cluster()).queryReactive(request, built, serializer).doOnNext(v -> {
                if (this.queryTarget == null) {
                    this.queryTarget = request.context().lastDispatchedToNode();
                    this.logger().info(this.attemptId, "q%d got query node to use for future queries %s", sidx, RedactableArgument.redactMeta((Object)this.queryTarget));
                }
            }).doFinally(ignore -> request.context().logicallyComplete());
        })).flatMap(result -> this.afterQuery(this, statement).thenReturn(result));
    }

    private Mono<QueryResult> queryInternal(int sidx, @Nullable Scope scope, String statement, QueryOptions options, @Nullable SpanWrapper pspan, boolean tximplicit) {
        return this.beforeQuery(this, statement).then(Mono.defer(() -> {
            QueryOptions.Built built;
            this.assertNotLocked("queryInternal");
            options.metrics(true);
            options.parentSpan(pspan == null ? this.attemptSpan.span() : pspan.span());
            if (tximplicit) {
                AttemptContextReactive.applyQueryOptions(this.config, options, this.expiryRemainingMillis());
            }
            JsonSerializer serializer = (built = options.build()).serializer() == null ? this.cluster().environment().jsonSerializer() : built.serializer();
            String queryContext = scope == null ? null : "`default`:`" + scope.bucketName() + "`.`" + scope.name() + "`";
            QueryRequest request = QueryAccessor.targetedQueryRequest(statement, built, queryContext, this.queryTarget, this.queryTimeout(), this.cluster(), tximplicit);
            return Mono.fromFuture((CompletableFuture)SDKAccessUtil.queryAccessor(this.cluster()).queryAsync(request, built, serializer)).doOnNext(v -> {
                if (this.queryTarget == null) {
                    this.queryTarget = request.context().lastDispatchedToNode();
                    this.logger().info(this.attemptId, "q%d got query node id %s", sidx, RedactableArgument.redactMeta((Object)this.queryTarget));
                }
            }).doFinally(ignore -> request.context().logicallyComplete());
        })).flatMap(result -> this.afterQuery(this, statement).thenReturn(result));
    }

    private Mono<QueryResult> queryWrapperLocked(int sidx, @Nullable Scope scope, String statement, QueryOptions options, String hookPoint, boolean isBeginWork, boolean existingErrorCheck, @Nullable JsonObject txdata, @Nullable JsonArray params, @Nullable SpanWrapper pspan, boolean tximplicit, AtomicReference<ReactiveLock.Waiter> lockToken, boolean updateInternalState) {
        return Mono.defer(() -> {
            this.assertLocked("queryWrapper q" + sidx);
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "query.wrapper", pspan != null ? pspan : this.attemptSpan).attribute("db.statement", statement).attribute("db.couchbase.transactions.tximplicit", tximplicit);
            this.logger().debug(this.attemptId, "q%d: '%s' params=%s txdata=%s tximplicit=%s", sidx, RedactableArgument.redactUser((Object)statement), RedactableArgument.redactUser((Object)params), RedactableArgument.redactUser((Object)txdata), tximplicit);
            Mono<Void> beginWorkIfNeeded = Mono.empty();
            if (!(tximplicit || this.queryModeLocked() || isBeginWork)) {
                beginWorkIfNeeded = this.beginWorkIfNeeded(sidx, statement, lockToken, span);
            }
            if (txdata != null) {
                options.raw("txdata", (Object)txdata);
            }
            return beginWorkIfNeeded.then(this.queryInternalPreLocked(sidx, statement, hookPoint, existingErrorCheck)).then(Mono.defer(() -> {
                if (!tximplicit && !isBeginWork) {
                    options.raw("txid", (Object)this.attemptId);
                }
                return this.queryInternal(sidx, scope, statement, options, span, tximplicit);
            })).onErrorResume(err -> {
                RuntimeException converted = this.convertQueryError(sidx, (Throwable)err, updateInternalState);
                long elapsed = span.finish();
                this.logger().warn(this.attemptId, "q%d got error %s after %dus, converted from %s", sidx, AttemptContextReactive.dbg(converted), elapsed, AttemptContextReactive.dbg(err));
                if (converted != null) {
                    return Mono.error((Throwable)converted);
                }
                return Mono.error((Throwable)err);
            }).flatMap(result -> {
                long elapsed = span.finish();
                this.logger().info(this.attemptId, "q%d returned with metrics %s after %dus", sidx, this.debugMetrics(result.metaData()), elapsed);
                if (result.metaData().status() == QueryStatus.FATAL) {
                    TransactionOperationFailed err = this.operationFailed(updateInternalState, TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).build());
                    return Mono.error((Throwable)err);
                }
                return Mono.just((Object)result);
            });
        });
    }

    private Mono<Void> beginWorkIfNeeded(int sidx, String statement, AtomicReference<ReactiveLock.Waiter> lockToken, SpanWrapper span) {
        return this.beforeUnlockQuery(this, statement).then(this.unlock(lockToken.get(), "before BEGIN WORK q" + sidx)).then(this.waitForAllKVOpsThenLock("queryWrapper q" + sidx)).flatMap(newLockToken -> {
            lockToken.set((ReactiveLock.Waiter)newLockToken);
            boolean stillNeedsBeginWork = !this.queryModeLocked();
            this.LOGGER.info(this.attemptId, "q%d after reacquiring lock stillNeedsBeginWork=%s", sidx, stillNeedsBeginWork);
            if (!this.queryModeLocked()) {
                return this.queryBeginWorkLocked(span);
            }
            return Mono.empty();
        });
    }

    private Mono<ReactiveQueryResult> queryWrapperReactiveLocked(int sidx, @Nullable ReactiveScope scope, String statement, TransactionQueryOptions options, boolean tximplicit, AtomicReference<ReactiveLock.Waiter> lockToken) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "user.query", this.attemptSpan).attribute("db.statement", statement).attribute("db.couchbase.transactions.tximplicit", tximplicit);
            long start = System.nanoTime();
            this.logger().debug(this.attemptId, "q%d: '%s' tximplicit=%s", sidx, RedactableArgument.redactUser((Object)statement), tximplicit);
            Mono<Void> beginWorkIfNeeded = Mono.empty();
            if (!tximplicit && !this.queryModeLocked()) {
                beginWorkIfNeeded = this.beginWorkIfNeeded(sidx, statement, lockToken, span);
            }
            return beginWorkIfNeeded.then(this.queryInternalPreLocked(sidx, statement, HOOK_QUERY, true)).then(Mono.defer(() -> {
                if (!tximplicit) {
                    options.raw("txid", this.attemptId);
                }
                return this.queryInternalReactive(sidx, scope, statement, options, span, tximplicit);
            })).flatMap(result -> {
                this.LOGGER.info(this.attemptId, "q%d async returned initial result after %dus", sidx, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start));
                return result.rowsAsObject().onErrorResume(err -> {
                    RuntimeException converted = this.convertQueryError(sidx, (Throwable)err, true);
                    long elapsed = span.finish();
                    this.logger().warn(this.attemptId, "q%d got error on rows stream %s after %dus, converted from %s", sidx, AttemptContextReactive.dbg(converted), elapsed, AttemptContextReactive.dbg(err));
                    if (converted != null) {
                        return Mono.error((Throwable)converted);
                    }
                    return Mono.error((Throwable)err);
                }).flatMap(ignore -> result.metaData().flatMap(metaData -> {
                    this.LOGGER.info(this.attemptId, "q%d async received query status of %s and finished streaming after %dus", sidx, metaData.status(), TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start));
                    if (metaData.status() == QueryStatus.FATAL) {
                        TransactionOperationFailed e = this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).build());
                        return Mono.error((Throwable)e);
                    }
                    return Mono.empty();
                })).then().thenReturn(result);
            }).onErrorResume(err -> {
                RuntimeException converted = this.convertQueryError(sidx, (Throwable)err, true);
                long elapsed = span.finish();
                this.logger().warn(this.attemptId, "q%d got error on main mono %s after %dus, converted from %s", sidx, AttemptContextReactive.dbg(converted), elapsed, AttemptContextReactive.dbg(err));
                if (converted != null) {
                    return Mono.error((Throwable)converted);
                }
                return Mono.error((Throwable)err);
            }).doFinally(ignore -> span.finish());
        });
    }

    private ErrorCodeAndMessage chooseQueryError(QueryErrorContext ctx) {
        for (ErrorCodeAndMessage err : ctx.errors()) {
            if (!err.context().containsKey("cause")) continue;
            return err;
        }
        for (ErrorCodeAndMessage err : ctx.errors()) {
            if (err.code() < 17000 || err.code() > 18000) continue;
            return err;
        }
        return (ErrorCodeAndMessage)ctx.errors().get(0);
    }

    private RuntimeException convertQueryError(int sidx, Throwable err, boolean updateInternalState) {
        QueryErrorContext ctx;
        CouchbaseException ce;
        if (err instanceof TimeoutException) {
            return new AttemptExpired(this, err);
        }
        if (err instanceof CouchbaseException && (ce = (CouchbaseException)err).context() instanceof QueryErrorContext && (ctx = (QueryErrorContext)ce.context()).errors().size() >= 1) {
            ErrorCodeAndMessage chosenError = this.chooseQueryError(ctx);
            int code = chosenError.code();
            switch (code) {
                case 1065: {
                    return this.operationFailed(updateInternalState, TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)new FeatureNotAvailableException("Unknown query parameter: note that query support in transactions is available from Couchbase Server 7.0 onwards", err)).build());
                }
                case 17004: {
                    return new AttemptNotFoundOnQuery();
                }
                case 1080: 
                case 17010: {
                    return this.operationFailed(updateInternalState, TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).cause(new AttemptExpired(this, err)).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build());
                }
                case 17012: {
                    return new DocumentExistsException((ErrorContext)ctx);
                }
                case 17014: {
                    return new DocumentNotFoundException((ErrorContext)ctx);
                }
                case 17015: {
                    return new CasMismatchException((ErrorContext)ctx);
                }
            }
            if (chosenError.context().containsKey("cause")) {
                Map cause = (Map)chosenError.context().get("cause");
                Boolean rollbackRaw = (Boolean)cause.get("rollback");
                Boolean retryRaw = (Boolean)cause.get("retry");
                String raise = (String)cause.get("raise");
                this.logger().info(this.attemptId, "q%d query code=%d cause=%s raise=%s", sidx, code, RedactableArgument.redactUser((Object)cause), raise);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(err);
                switch (raise) {
                    case "failed_post_commit": {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                        break;
                    }
                    case "commit_ambiguous": {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS);
                        break;
                    }
                    case "expired": {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                        break;
                    }
                    default: {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED);
                    }
                }
                if (retryRaw != null && retryRaw.booleanValue()) {
                    builder.retryTransaction();
                }
                if (rollbackRaw != null && !rollbackRaw.booleanValue()) {
                    builder.doNotRollbackAttempt();
                }
                return this.operationFailed(updateInternalState, builder.build());
            }
        }
        return null;
    }

    private static QueryOptions applyQueryOptions(MergedTransactionConfig config, QueryOptions options, long txtimeout) {
        String durabilityLevelString;
        QueryScanConsistency scanConsistency = null;
        if (config.scanConsistency().isPresent()) {
            scanConsistency = config.scanConsistency().get();
        }
        if (scanConsistency != null) {
            options.scanConsistency(scanConsistency);
        }
        if (scanConsistency == QueryScanConsistency.NOT_BOUNDED) {
            options.raw("scan_consistency", (Object)QueryScanConsistency.NOT_BOUNDED.toString());
        }
        TransactionDurabilityLevel durabilityLevel = config.transactionDurabilityLevel();
        switch (durabilityLevel) {
            case NONE: {
                durabilityLevelString = "none";
                break;
            }
            case MAJORITY: {
                durabilityLevelString = "majority";
                break;
            }
            case MAJORITY_AND_PERSIST_TO_ACTIVE: {
                durabilityLevelString = "majorityAndPersistActive";
                break;
            }
            case PERSIST_TO_MAJORITY: {
                durabilityLevelString = "persistToMajority";
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown durability level " + (Object)((Object)durabilityLevel));
            }
        }
        options.raw("durability_level", (Object)durabilityLevelString);
        options.raw("txtimeout", (Object)(txtimeout + "ms"));
        config.metadataCollection().ifPresent(metadataCollection -> options.raw("atrcollection", (Object)String.format("`%s`.`%s`.`%s`", metadataCollection.bucketName(), metadataCollection.scopeName(), metadataCollection.name())));
        options.raw("numatrs", (Object)config.numAtrs());
        return options;
    }

    private Mono<Void> queryBeginWorkLocked(SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("queryBeginWork");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "query.begin_work", pspan);
            JsonObject txdata = this.makeQueryTxDataLocked();
            QueryOptions options = QueryOptions.queryOptions();
            AttemptContextReactive.applyQueryOptions(this.config, options, this.expiryRemainingMillis());
            String statement = "BEGIN WORK";
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapperLocked(statementIdx, null, statement, options, HOOK_QUERY_BEGIN_WORK, true, true, txdata, null, span, false, null, true).doOnNext(v -> {
                this.assertLocked("beginWork");
                this.stagedMutationsLocked.clear();
                List rows = v.rowsAsObject();
                for (JsonObject row : rows) {
                    String txid = row.getString("txid");
                    this.logger().info(this.attemptId, "BEGIN WORK got txid %s", txid);
                }
            }).doFinally(ignore -> this.span().finish()).then();
        });
    }

    private Cluster cluster() {
        return this.parent.cleanup().clusterData().cluster();
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(String statement, TransactionQueryOptions options) {
        return this.query(null, statement, options);
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(ReactiveScope scope, String statement) {
        return this.query(scope, statement, null, false);
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(ReactiveScope scope, String statement, TransactionQueryOptions options) {
        return this.query(scope, statement, options, false);
    }

    @Stability.Internal
    Mono<ReactiveQueryResult> query(ReactiveScope scope, String statement, TransactionQueryOptions options, boolean tximplicit) {
        return this.doQueryOperation("query non-blocking", (sidx, lockToken) -> this.queryWrapperReactiveLocked((int)sidx, scope, statement, options, tximplicit, (AtomicReference<ReactiveLock.Waiter>)lockToken));
    }

    Mono<Void> queryInternalPreLocked(int sidx, String statement, String hookPoint, boolean existingErrorCheck) {
        return Mono.defer(() -> {
            boolean expiresSoon;
            TransactionOperationFailed returnEarly;
            this.assertLocked("queryInternalPre");
            if (existingErrorCheck && (returnEarly = this.canPerformOperation("queryInternalPre " + sidx)) != null) {
                return Mono.error((Throwable)returnEarly);
            }
            long remaining = this.expiryRemainingMillis();
            boolean bl = expiresSoon = remaining < (long)this.EXPIRY_THRESHOLD;
            if (this.hasExpiredClientSide(hookPoint, Optional.of(statement)) || expiresSoon) {
                this.logger().info(this.attemptId, "transaction has expired in stage '%s' remaining=%d threshold=%d", hookPoint, remaining, this.EXPIRY_THRESHOLD);
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().build()));
            }
            return Mono.empty();
        });
    }

    private List<DocRecord> toDocRecords(List<StagedMutation> mutations) {
        return mutations.stream().map(m -> new DocRecord(m.doc.collection().bucketName(), m.doc.collection().scopeName(), m.doc.collection().name(), m.doc.id())).collect(Collectors.toList());
    }

    @Stability.Internal
    private Mono<Void> addAttemptAndCleanup(TransactionsCleanup cleanup, @Nullable Throwable err) {
        return Mono.fromRunnable(() -> {
            TransactionAttempt ta = this.createFromContext(Optional.ofNullable(err));
            CleanupRequest cleanupRequest = this.createCleanupRequestIfNeeded();
            this.overall.addAttempt(ta);
            if (cleanupRequest != null) {
                cleanup.add(cleanupRequest);
            }
        });
    }

    TransactionAttempt createFromContext(Optional<Throwable> terminatedByException) {
        Objects.requireNonNull(terminatedByException);
        return new TransactionAttempt(this.atrCollection(), this.atrId(), this.state(), this.attemptId(), this.queryModeUnlocked() ? Collections.EMPTY_LIST : this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.INSERT).map(v -> v.doc.id()).collect(Collectors.toList()), this.queryModeUnlocked() ? Collections.EMPTY_LIST : this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REPLACE).map(v -> v.doc.id()).collect(Collectors.toList()), this.queryModeUnlocked() ? Collections.EMPTY_LIST : this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REMOVE).map(v -> v.doc.id()).collect(Collectors.toList()), terminatedByException, Duration.of(System.nanoTime(), ChronoUnit.NANOS).minus(this.startTimeClient()), this.queryModeUnlocked() ? Collections.EMPTY_LIST : this.finalMutationTokens(), this.queryModeUnlocked());
    }

    @Stability.Internal
    Mono<Void> lambdaEnd(TransactionsCleanup cleanup, @Nullable Throwable err, boolean singleQueryTransactionMode) {
        return Mono.defer(() -> {
            int sb = this.stateBits.get();
            boolean shouldNotRollback = (sb & TRANSACTION_STATE_BIT_SHOULD_NOT_ROLLBACK) != 0;
            int maskedFinalError = (sb & STATE_BITS_MASK_FINAL_ERROR) >> STATE_BITS_POSITION_FINAL_ERROR;
            TransactionOperationFailed.FinalErrorToRaise finalError = TransactionOperationFailed.FinalErrorToRaise.values()[maskedFinalError];
            boolean rollbackNeeded = finalError != TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_SUCCESS && !shouldNotRollback && !singleQueryTransactionMode;
            this.LOGGER.info(this.attemptId, "reached end of lambda in %dus, shouldNotRollback=%s finalError=%s rollbackNeeded=%s, err (only cause of this will be used)=%s tximplicit=%s", new Object[]{TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTimeClient.toNanos()), shouldNotRollback, finalError, rollbackNeeded, err, singleQueryTransactionMode});
            return Mono.defer(() -> {
                if (rollbackNeeded) {
                    return this.rollbackAuto().onErrorResume(er -> {
                        this.overall.LOGGER.info(this.attemptId, "rollback failed with %s. Original error will be raised as cause, and retry should be disabled", DebugUtil.dbg(er));
                        this.setStateBits("lambdaEnd", TRANSACTION_STATE_BIT_SHOULD_NOT_RETRY, 0);
                        return Mono.empty();
                    });
                }
                return Mono.empty();
            }).then(this.addAttemptAndCleanup(cleanup, err)).doOnTerminate(this.attemptSpan::finish).then(Mono.defer(() -> this.retryIfRequired(err)));
        });
    }

    private Mono<Void> retryIfRequired(Throwable err) {
        boolean retryNeeded;
        int sb = this.stateBits.get();
        boolean shouldNotRetry = (sb & TRANSACTION_STATE_BIT_SHOULD_NOT_RETRY) != 0;
        int maskedFinalError = (sb & STATE_BITS_MASK_FINAL_ERROR) >> STATE_BITS_POSITION_FINAL_ERROR;
        TransactionOperationFailed.FinalErrorToRaise finalError = TransactionOperationFailed.FinalErrorToRaise.values()[maskedFinalError];
        boolean bl = retryNeeded = finalError != TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_SUCCESS && !shouldNotRetry;
        if (retryNeeded && this.hasExpiredClientSide(HOOK_BEFORE_RETRY, Optional.empty())) {
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build()));
        }
        this.LOGGER.info(this.attemptId, "reached end of lambda post-rollback (if needed), shouldNotRetry=%s finalError=%s retryNeeded=%s", new Object[]{shouldNotRetry, finalError, retryNeeded});
        if (retryNeeded) {
            return Mono.error((Throwable)new RetryTransaction());
        }
        if (err != null) {
            return Mono.error((Throwable)err);
        }
        return Mono.empty();
    }

    @Stability.Internal
    Mono<TransactionResult> transactionEnd(@Nullable Throwable err) {
        return Mono.defer(() -> {
            TransactionResult result = new TransactionResult(this.overall.attempts(), this.overall.LOGGER, Duration.of(System.nanoTime() - this.overall.startTimeClient().toNanos(), ChronoUnit.NANOS), this.overall.transactionId(), this.overall.serialized());
            int sb = this.stateBits.get();
            int maskedFinalError = (sb & STATE_BITS_MASK_FINAL_ERROR) >> STATE_BITS_POSITION_FINAL_ERROR;
            TransactionOperationFailed.FinalErrorToRaise finalError = TransactionOperationFailed.FinalErrorToRaise.values()[maskedFinalError];
            this.LOGGER.info(this.attemptId, "reached end of transaction, toRaise=%s, err=%s", new Object[]{finalError, err});
            Throwable cause = null;
            if (err != null) {
                if (!(err instanceof TransactionOperationFailed)) {
                    this.logger().info(this.attemptId, "Non-TransactionOperationFailed '" + DebugUtil.dbg(err) + "' received, this is a bug");
                } else {
                    TransactionOperationFailed e = (TransactionOperationFailed)err;
                    cause = e.getCause();
                }
            }
            TransactionFailed ret = null;
            switch (finalError) {
                case TRANSACTION_FAILED_POST_COMMIT: 
                case TRANSACTION_SUCCESS: {
                    break;
                }
                case TRANSACTION_EXPIRED: {
                    String msg = "Transaction has expired configured timeout of " + this.overall.expirationTime().toMillis() + "msecs.  The transaction is not committed.";
                    ret = new TransactionExpired(cause, result, msg);
                    break;
                }
                case TRANSACTION_COMMIT_AMBIGUOUS: {
                    String msg = "It is ambiguous whether the transaction committed";
                    ret = new TransactionCommitAmbiguous(cause, result, msg);
                    break;
                }
                default: {
                    ret = new TransactionFailed(cause, result);
                }
            }
            if (ret != null) {
                this.LOGGER.info(this.attemptId, "raising final error %s based on state bits %d masked %d", ret, sb, maskedFinalError);
                return Mono.error((Throwable)ret);
            }
            return Mono.just((Object)result);
        });
    }

    @Stability.Internal
    TransactionOperationFailed convertToOperationFailedIfNeeded(Throwable e) {
        if (e instanceof TransactionOperationFailed) {
            return (TransactionOperationFailed)e;
        }
        TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ErrorClasses.classify(e)).cause(e);
        if (e instanceof RetryTransaction) {
            builder.retryTransaction();
        }
        TransactionOperationFailed out = builder.build();
        this.logger().info(this.attemptId(), "Caught exception from application's lambda %s, converted it to %s", DebugUtil.dbg(e), DebugUtil.dbg(out));
        return this.operationFailed(out);
    }

    protected Mono<Integer> beforeAtrCommit(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrCommitAmbiguityResolution(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrCommit(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocCommitted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocRolledBack(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocCommittedBeforeSavingCAS(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocCommitted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocsCommitted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocRemoved(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocRemovedPreRetry(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocRemovedPostRetry(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocsRemoved(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrPending(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrPending(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrComplete(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrComplete(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrRolledBack(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterGetComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeRollbackDeleteInserted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedReplaceComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedRemoveComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedRemove(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedReplace(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedInsertComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrAborted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrAborted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrRolledBack(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRollbackReplaceOrRemove(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRollbackDeleteInserted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeCheckATREntryForBlockingDoc(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocGet(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeGetDocInExistsDuringStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeRemoveStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRemoveStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Boolean hasExpiredClientSideHook(AttemptContextReactive self, String place, Optional<String> docId) {
        return false;
    }

    protected Mono<Integer> beforeQuery(AttemptContextReactive self, String statement) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterQuery(AttemptContextReactive self, String statement) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeOverwritingStagedInsertRemoval(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeUnlockGet(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeUnlockInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeUnlockReplace(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeUnlockRemove(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeUnlockQuery(AttemptContextReactive self, String statement) {
        return Mono.just((Object)0);
    }

    public TransactionLogger logger() {
        return this.LOGGER;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("AttemptContextReactive{");
        sb.append("id=").append(this.attemptId.substring(0, 5));
        sb.append(",state=").append((Object)this.state);
        sb.append(",atr=").append(ATRUtil.getAtrDebug(this.atrCollection, this.atrId));
        sb.append(",staged=").append(this.stagedMutationsLocked.stream().map(StagedMutation::toString).collect(Collectors.toList()));
        sb.append('}');
        return sb.toString();
    }

    private static /* synthetic */ Mono lambda$doUnderLock$110(Supplier whileLocked) {
        return (Mono)whileLocked.get();
    }

    private /* synthetic */ Mono lambda$waitForAllOpsThenDoUnderLock$44(String dbg, SpanWrapper span, Supplier doUnderLock, ReactiveLock.Waiter lockToken) {
        if (this.kvOps.waitingCount() > 0) {
            return this.unlock(lockToken, dbg + " still waiting for ops").then(this.waitForAllOpsThenDoUnderLock(dbg + " still waiting for ops", span, doUnderLock));
        }
        return ((Mono)doUnderLock.get()).then(this.unlock(lockToken, "after doUnderLock")).onErrorResume(err -> this.unlock(lockToken, "onError doUnderLock").then(Mono.error((Throwable)err)));
    }
}

